mirror of
https://github.com/khoj-ai/khoj.git
synced 2025-02-17 08:04:21 +00:00
Initial (hacky) solution to support search for Panchayat db
This commit is contained in:
parent
81d975affa
commit
84e3211a09
8 changed files with 648 additions and 2 deletions
|
@ -95,7 +95,7 @@
|
|||
|
||||
function populate_type_dropdown() {
|
||||
// Populate type dropdown field with enabled search types only
|
||||
var possible_search_types = ["org", "markdown", "ledger", "music", "image"];
|
||||
var possible_search_types = ["org", "markdown", "ledger", "music", "image", "yaml"];
|
||||
fetch("/config/data")
|
||||
.then(response => response.json())
|
||||
.then(data => {
|
||||
|
|
18
src/main.py
18
src/main.py
|
@ -8,6 +8,7 @@ from functools import lru_cache
|
|||
# External Packages
|
||||
import uvicorn
|
||||
import torch
|
||||
import importlib
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import HTMLResponse, FileResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
@ -18,6 +19,7 @@ from src.search_type import image_search, text_search
|
|||
from src.processor.org_mode.org_to_jsonl import org_to_jsonl
|
||||
from src.processor.ledger.beancount_to_jsonl import beancount_to_jsonl
|
||||
from src.processor.markdown.markdown_to_jsonl import markdown_to_jsonl
|
||||
from src.processor.yaml.yaml_to_jsonl import yaml_to_jsonl
|
||||
from src.utils.helpers import get_absolute_path, get_from_dict
|
||||
from src.utils.cli import cli
|
||||
from src.utils.config import SearchType, SearchModels, ProcessorConfigModel, ConversationProcessorConfigModel
|
||||
|
@ -109,6 +111,17 @@ def search(q: str, n: Optional[int] = 5, t: Optional[SearchType] = None, r: Opti
|
|||
results = text_search.collate_results(hits, entries, results_count)
|
||||
collate_end = time.time()
|
||||
|
||||
if (t == SearchType.Yaml or t == None) and model.yaml_search:
|
||||
# query yaml files
|
||||
query_start = time.time()
|
||||
hits, entries = text_search.query(user_query, model.yaml_search, rank_results=r, device=device, filters=[ExplicitFilter(), DateFilter()], verbose=verbose)
|
||||
query_end = time.time()
|
||||
|
||||
# collate and return results
|
||||
collate_start = time.time()
|
||||
results = text_search.collate_results(hits, entries, results_count)
|
||||
collate_end = time.time()
|
||||
|
||||
if (t == SearchType.Ledger or t == None) and model.ledger_search:
|
||||
# query transactions
|
||||
query_start = time.time()
|
||||
|
@ -219,6 +232,11 @@ def initialize_search(config: FullConfig, regenerate: bool, t: SearchType = None
|
|||
# Extract Entries, Generate Markdown Embeddings
|
||||
model.markdown_search = text_search.setup(markdown_to_jsonl, config.content_type.markdown, search_config=config.search_type.asymmetric, regenerate=regenerate, device=device, verbose=verbose)
|
||||
|
||||
# Initialize Yaml Search
|
||||
if (t == SearchType.Yaml or t == None) and config.content_type.yaml:
|
||||
# Extract Entries, Generate Yaml Embeddings
|
||||
model.yaml_search = text_search.setup(yaml_to_jsonl, config.content_type.yaml, search_config=config.search_type.asymmetric, regenerate=regenerate, device=device, verbose=verbose)
|
||||
|
||||
# Initialize Ledger Search
|
||||
if (t == SearchType.Ledger or t == None) and config.content_type.ledger:
|
||||
# Extract Entries, Generate Ledger Embeddings
|
||||
|
|
2
src/processor/util.py
Normal file
2
src/processor/util.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
def get_relevant_files(absolute_files=None, file_filter=None, verbose=0):
|
||||
print("hi")
|
0
src/processor/yaml/__init__.py
Normal file
0
src/processor/yaml/__init__.py
Normal file
487
src/processor/yaml/vdb.py
Normal file
487
src/processor/yaml/vdb.py
Normal file
|
@ -0,0 +1,487 @@
|
|||
"""
|
||||
Virtual Database that acts as an abstraction to the actual database.
|
||||
VDB is the python representation of the on disk database.
|
||||
VDB exposes methods to read/edit the database.
|
||||
VDB can be serialized/deserialized to on disk db.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import re
|
||||
from enum import Enum, auto
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import yaml
|
||||
|
||||
# from libgravatar import Gravatar # type: ignore
|
||||
|
||||
|
||||
class Visibility(Enum):
|
||||
"""
|
||||
Enum to represent visibility levels for a post
|
||||
"""
|
||||
Aham = auto() # only visible to the author
|
||||
Gram = auto() # visible to all logged in users
|
||||
Lok = auto() # visible to everyone without log in
|
||||
|
||||
|
||||
class VDB:
|
||||
"""
|
||||
Python abstraction of panchayat DB
|
||||
"""
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
def __init__(self, outfile: str = None):
|
||||
self.users = UserList()
|
||||
self.posts = PostTree()
|
||||
self.outfile = outfile
|
||||
|
||||
def commit(self):
|
||||
"""
|
||||
serialize the virtual database to disk overwriting existing file
|
||||
"""
|
||||
if not self.outfile:
|
||||
raise RuntimeError("Outfile is empty")
|
||||
|
||||
with open(self.outfile, 'w') as outfile:
|
||||
yaml.dump(self, outfile)
|
||||
|
||||
# git commit
|
||||
|
||||
|
||||
class User:
|
||||
"""
|
||||
Class to represent a user on panchayat
|
||||
"""
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
def __init__(self,
|
||||
username: str,
|
||||
password: str,
|
||||
token: str = None,
|
||||
email: str = None,
|
||||
email_updates: bool = False):
|
||||
# pylint: disable=too-many-arguments
|
||||
self.username = username # primary key
|
||||
self.password = password #hash
|
||||
self.token = token
|
||||
self.email = email
|
||||
self.email_updates = email_updates
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.username
|
||||
|
||||
def gravatar_url(self) -> str:
|
||||
"""
|
||||
Return gravatar image url for the user.
|
||||
If user has email, then email is used to generate image.
|
||||
Else username is used to generate image.
|
||||
"""
|
||||
key = self.email if self.email else self.username
|
||||
return key
|
||||
# libgrav = Gravatar(key)
|
||||
# return libgrav.get_image(size=200, default="identicon", use_ssl=True)
|
||||
|
||||
|
||||
class UserList(list):
|
||||
"""
|
||||
List of users
|
||||
"""
|
||||
def find(self, username: str) -> Optional[User]:
|
||||
"""
|
||||
Find user by username
|
||||
"""
|
||||
user = [user for user in self if user.username == username]
|
||||
if not user:
|
||||
return None
|
||||
if len(user) != 1:
|
||||
raise RuntimeError("More than one user found for username")
|
||||
return user[0]
|
||||
|
||||
|
||||
class Post: # pylint: disable=too-many-instance-attributes
|
||||
"""
|
||||
Class to represent a post on panchayat.
|
||||
Inherited by LinkPost and TextPost
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
author: User,
|
||||
title: str,
|
||||
body: str,
|
||||
visibility: Visibility = Visibility.Gram,
|
||||
upvotes=None,
|
||||
downvotes=None,
|
||||
created=None,
|
||||
parent: "Post" = None,
|
||||
post_id: int = None,
|
||||
):
|
||||
# pylint: disable=too-many-arguments
|
||||
self.post_id = post_id # need id for permalink
|
||||
self.author = author
|
||||
self.created = created \
|
||||
if created is not None else datetime.datetime.now()
|
||||
self.title = title
|
||||
self.body = body
|
||||
self.upvotes = upvotes if upvotes else set()
|
||||
self.downvotes = downvotes if downvotes else set()
|
||||
self.children: List[Post] = []
|
||||
self.parent = parent
|
||||
self.depth: int = parent.depth + 1 if parent else 0
|
||||
|
||||
if (self.parent and self.parent.visibility == visibility.Aham
|
||||
and self.parent.author != self.author):
|
||||
raise RuntimeError("Cannot reply to someone else's aham post")
|
||||
self.visibility = visibility # set visibility using setter
|
||||
|
||||
@property
|
||||
def target_visibility(self):
|
||||
"""
|
||||
Getter method for visibility
|
||||
"""
|
||||
return self._visibility
|
||||
|
||||
@property
|
||||
def visibility(self):
|
||||
"""
|
||||
Getter method for visibility
|
||||
|
||||
Visibility can be lower than target if some ancestor has lower visibility.
|
||||
When the ancestor reaches the requested target visibility,
|
||||
self will automatically reach target visibility as well.
|
||||
"""
|
||||
if self.parent and self.parent.visibility.value < self._visibility.value:
|
||||
return self.parent.visibility
|
||||
return self._visibility
|
||||
|
||||
@visibility.setter
|
||||
def visibility(self, other: Visibility):
|
||||
"""
|
||||
Setter method for visibility
|
||||
If self is being made aham then parent and all descendants must be by same author
|
||||
While setting visibility, all descendants are capped to self visibility level
|
||||
|
||||
This setter sets _visibility property. This sets the target visibility.
|
||||
But, the actual visibility can stay lower if some ancestor has lower visibility.
|
||||
"""
|
||||
if other == Visibility.Aham:
|
||||
if any([
|
||||
descendant.author != self.author
|
||||
for descendant in self.descendants
|
||||
]):
|
||||
raise RuntimeError(
|
||||
"Cannot make post Aham if there are children owned by others"
|
||||
)
|
||||
|
||||
self._visibility = other
|
||||
|
||||
def visibility_detail_string(self):
|
||||
"""
|
||||
The detailed string for visibility
|
||||
"(Visibility.name requested)" if some descendant has a higher target visibility
|
||||
"(Visibility.name pending)" if some ancestor is preventing this post from target visibility
|
||||
"""
|
||||
ret = ''
|
||||
if self.target_visibility != self.visibility:
|
||||
ret += f'({self.target_visibility.name} pending)'
|
||||
if self.children:
|
||||
max_visibility_request = max([
|
||||
descendant.target_visibility for descendant in self.descendants
|
||||
],
|
||||
key=lambda x: x.value)
|
||||
if max_visibility_request.value > self.target_visibility.value:
|
||||
ret += f'({max_visibility_request.name} requested)'
|
||||
return ret
|
||||
|
||||
def is_visible_to(self, user: User = None) -> bool:
|
||||
"""
|
||||
Returns True if self is visible to user, else False
|
||||
"""
|
||||
if self.visibility == Visibility.Lok:
|
||||
return True
|
||||
if self.visibility == Visibility.Gram and user:
|
||||
return True
|
||||
if self.visibility == Visibility.Aham and self.author == user:
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def descendants(self) -> List["Post"]:
|
||||
"""
|
||||
Return all my descendants with inorder traversal
|
||||
Does not include self
|
||||
"""
|
||||
my_descendants = [] # list(self.children)
|
||||
for child in sorted(self.children, key=lambda post: post.created):
|
||||
my_descendants.append(child)
|
||||
my_descendants.extend(child.descendants)
|
||||
return my_descendants
|
||||
|
||||
@property
|
||||
def family(self) -> List["Post"]:
|
||||
"""
|
||||
Return list of posts in family
|
||||
Two posts belong to same family if they share the same TLP
|
||||
"""
|
||||
return self.tlp.descendants_and_i
|
||||
|
||||
@property
|
||||
def descendants_and_i(self) -> List["Post"]:
|
||||
"""
|
||||
Return all my descendants with inorder traversal
|
||||
Includes self
|
||||
"""
|
||||
return [self] + self.descendants
|
||||
|
||||
@property
|
||||
def ancestors(self) -> List["Post"]:
|
||||
"""
|
||||
Return all my ancestors oldest first
|
||||
Does not include self
|
||||
"""
|
||||
if self.parent:
|
||||
return self.parent.ancestors + [self.parent]
|
||||
return []
|
||||
|
||||
@property
|
||||
def ancestry(self) -> List["Post"]:
|
||||
"""
|
||||
Return all my ancestors including self
|
||||
"""
|
||||
if self.parent:
|
||||
return self.parent.ancestry + [self]
|
||||
return [self]
|
||||
|
||||
@property
|
||||
def tlp(self) -> "Post":
|
||||
"""
|
||||
Return my top level post
|
||||
"""
|
||||
if self.is_tlp():
|
||||
return self
|
||||
return self.parent.tlp #type: ignore
|
||||
|
||||
def is_tlp(self) -> bool:
|
||||
"""
|
||||
Return True if I am a top level post
|
||||
"""
|
||||
return self.depth == 0
|
||||
|
||||
def is_leaf(self) -> bool:
|
||||
"""
|
||||
Return True if I am a leaf post
|
||||
"""
|
||||
return not self.children
|
||||
|
||||
@property
|
||||
def vote_count(self) -> int:
|
||||
"""
|
||||
Return the effective vote count of this post. upvote - downvote
|
||||
"""
|
||||
return len(self.upvotes) - len(self.downvotes)
|
||||
|
||||
def upvote_string(self) -> str:
|
||||
"""
|
||||
Return string of all users who have upvoted this post
|
||||
"""
|
||||
return ', '.join([user.username for user in self.upvotes])
|
||||
|
||||
def downvote_string(self) -> str:
|
||||
"""
|
||||
Return string of all users who have downvoted this post
|
||||
"""
|
||||
return ', '.join([user.username for user in self.downvotes])
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.title:
|
||||
return self.title
|
||||
return self.body
|
||||
|
||||
def nullvote(self, user: User):
|
||||
"""
|
||||
Remove user's vote from this post
|
||||
"""
|
||||
self.upvotes.discard(user)
|
||||
self.downvotes.discard(user)
|
||||
|
||||
def upvote(self, user: User):
|
||||
"""
|
||||
Upvote this post. Upvote is done by voiding previous vote and creating new one.
|
||||
"""
|
||||
self.nullvote(user)
|
||||
self.upvotes.add(user)
|
||||
|
||||
def downvote(self, user: User):
|
||||
"""
|
||||
Downvote this post. Downvote is done by voiding previous vote and creating new one.
|
||||
"""
|
||||
self.nullvote(user)
|
||||
self.downvotes.add(user)
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
Delete this post. Does not remove the post from db,
|
||||
but only overwrites title and body with 'DELETED'.
|
||||
This is done to not break other posts that have reference to the deleted one.
|
||||
"""
|
||||
self.title = "DELETED"
|
||||
self.body = "DELETED"
|
||||
|
||||
def family_last_modified(self) -> datetime.datetime:
|
||||
"""
|
||||
Return when the post family was last modified
|
||||
Max of created for all posts in family
|
||||
"""
|
||||
return max([post.created for post in self.family])
|
||||
|
||||
|
||||
class LinkPost(Post):
|
||||
"""
|
||||
Class to represent a link post on panchayat
|
||||
"""
|
||||
def is_url(self) -> bool: # pylint: disable=missing-function-docstring, no-self-use
|
||||
return True
|
||||
|
||||
|
||||
class TextPost(Post):
|
||||
"""
|
||||
Class to represent a text post on panchayat
|
||||
"""
|
||||
def is_url(self) -> bool: #pylint: disable=missing-function-docstring, no-self-use
|
||||
return False
|
||||
|
||||
@property
|
||||
def html_body(self) -> str:
|
||||
"""
|
||||
Return html string with all urls in body converted to hrefs
|
||||
Regex taken from https://urlregex.com/
|
||||
Trailing period and parenthesis was appended to remove false positives
|
||||
"""
|
||||
# pylint: disable=line-too-long
|
||||
url_regex = re.compile(
|
||||
r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+#]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+[^\. \)])'''
|
||||
)
|
||||
return url_regex.sub(r'<a href="\1" target="_blank">\1</a>', self.body)
|
||||
|
||||
|
||||
class PostTree:
|
||||
"""
|
||||
Class to represent a tree of posts
|
||||
"""
|
||||
def __init__(self):
|
||||
self.tlps = []
|
||||
|
||||
def zig_zag(self) -> List[Post]:
|
||||
"""
|
||||
Return all posts in zig zag order.
|
||||
TLPs are in reverse chronological order.
|
||||
Comments are ordered chrnonologically.
|
||||
"""
|
||||
all_posts = []
|
||||
reverse_chrono_tlps = sorted(self.tlps,
|
||||
key=lambda post: post.created,
|
||||
reverse=True)
|
||||
for tlp in reverse_chrono_tlps:
|
||||
all_posts.append(tlp)
|
||||
all_posts.extend(tlp.descendants)
|
||||
return all_posts
|
||||
|
||||
def compressed_reverse_chrono_ancestry(
|
||||
self, requesting_user: User) -> List[Tuple[Post, bool, bool]]:
|
||||
"""
|
||||
Returns a list of all posts with their ancestors.
|
||||
The post is attached to two boolean fields wrapped inside a tuple
|
||||
for use by the jinja template.
|
||||
First boolean indicates whether this post must be highlighted.
|
||||
Second boolean indicates whether a new TLP boundary has reached.
|
||||
Ancestry is not repeated when the subsequent post shares ancestors.
|
||||
This query is used in the activity view.
|
||||
"""
|
||||
ret: List[Tuple[Post, bool, bool]] = []
|
||||
prev_ancestors: List[Post] = []
|
||||
prev_tlp: Optional[Post] = None
|
||||
for post in self.reverse_chrono():
|
||||
if not post.is_visible_to(requesting_user):
|
||||
continue
|
||||
if prev_tlp and post.tlp is not prev_tlp:
|
||||
# make tlp_switch true for the previous post
|
||||
ret[-1] = (ret[-1][0], ret[-1][1], True)
|
||||
ret.extend([(ancestor, False, False) for ancestor in post.ancestors
|
||||
if ancestor not in prev_ancestors])
|
||||
# add current post with highlight true
|
||||
ret.append((post, True, False))
|
||||
prev_ancestors = post.ancestry
|
||||
prev_tlp = post.tlp
|
||||
return ret
|
||||
|
||||
def all(self) -> List[Post]:
|
||||
"""
|
||||
Return list of all posts in any order.
|
||||
Currently zig_zag order.
|
||||
"""
|
||||
return self.zig_zag()
|
||||
|
||||
def reverse_chrono(self) -> List[Post]:
|
||||
"""
|
||||
Return all posts in reverse chronological order
|
||||
"""
|
||||
return sorted(self.all(), key=lambda post: post.created, reverse=True)
|
||||
|
||||
def find(self, post_id: int) -> Optional[Post]:
|
||||
"""
|
||||
Find a post by post id
|
||||
"""
|
||||
post = [post for post in self.all() if post.post_id == post_id]
|
||||
if not post:
|
||||
return None
|
||||
if len(post) != 1:
|
||||
raise RuntimeError(
|
||||
"There should only have been one post with a given id")
|
||||
return post[0]
|
||||
|
||||
def insert(self, post: Post):
|
||||
"""
|
||||
Insert a post into the posttree.
|
||||
If the post does not have an id already assign the smallest available one.
|
||||
If post has a parent add the post as child of parent.
|
||||
Else add the post as a TLP.
|
||||
"""
|
||||
if post.post_id is None:
|
||||
post.post_id = max( #type:ignore
|
||||
[post.post_id for post in self.all()],
|
||||
default=0) + 1 #type: ignore
|
||||
|
||||
if self.find(post.post_id) is not None:
|
||||
raise RuntimeError("Posttree already contains post with id")
|
||||
|
||||
if post.parent is None:
|
||||
self.tlps.append(post)
|
||||
else:
|
||||
post.parent.children.append(post)
|
||||
|
||||
def tlp_count(self, user: User) -> int:
|
||||
"""
|
||||
Return #TLPs by the user
|
||||
"""
|
||||
return len([post for post in self.tlps if post.author == user])
|
||||
|
||||
def comment_count(self, user: User) -> int:
|
||||
"""
|
||||
Return #comments by user
|
||||
"""
|
||||
return len([
|
||||
post for post in self.all()
|
||||
if post.depth != 0 and post.author == user
|
||||
])
|
||||
|
||||
def upvote_count(self, user: User) -> int:
|
||||
"""
|
||||
Return #upvotes by user
|
||||
"""
|
||||
return len([post for post in self.all() if user in post.upvotes])
|
||||
|
||||
def downvote_count(self, user: User) -> int:
|
||||
"""
|
||||
Return #downvotes by user
|
||||
"""
|
||||
return len([post for post in self.all() if user in post.downvotes])
|
||||
|
137
src/processor/yaml/yaml_to_jsonl.py
Normal file
137
src/processor/yaml/yaml_to_jsonl.py
Normal file
|
@ -0,0 +1,137 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# Standard Packages
|
||||
import json
|
||||
import argparse
|
||||
import pathlib
|
||||
import glob
|
||||
import re
|
||||
import yaml
|
||||
|
||||
# Internal Packages
|
||||
from src.processor.yaml import vdb
|
||||
from src.utils.helpers import get_absolute_path, is_none_or_empty
|
||||
from src.utils.constants import empty_escape_sequences
|
||||
from src.utils.jsonl import dump_jsonl, compress_jsonl_data
|
||||
|
||||
|
||||
def panchayat_constructor(loader, node):
|
||||
fields = loader.construct_mapping(node)
|
||||
return vdb.VDB(**fields)
|
||||
|
||||
|
||||
# Define Functions
|
||||
def yaml_to_jsonl(yaml_files, yaml_file_filter, output_file, verbose=0):
|
||||
|
||||
# yaml.add_constructor("!python.vdb.VDB", panchayat_constructor)
|
||||
# Input Validation
|
||||
if is_none_or_empty(yaml_files) and is_none_or_empty(yaml_file_filter):
|
||||
print("At least one of markdown-files or markdown-file-filter is required to be specified")
|
||||
exit(1)
|
||||
|
||||
# Get Markdown Files to Process
|
||||
yaml_files = get_yaml_files(yaml_files, yaml_file_filter, verbose)
|
||||
|
||||
# Extract Entries from specified Markdown files
|
||||
entries = extract_yaml_entries(yaml_files)
|
||||
|
||||
# Process Each Entry from All Notes Files
|
||||
jsonl_data = convert_yaml_entries_to_jsonl(entries, verbose=verbose)
|
||||
|
||||
# Compress JSONL formatted Data
|
||||
if output_file.suffix == ".gz":
|
||||
compress_jsonl_data(jsonl_data, output_file, verbose=verbose)
|
||||
elif output_file.suffix == ".jsonl":
|
||||
dump_jsonl(jsonl_data, output_file, verbose=verbose)
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def get_yaml_files(yaml_files=None, yaml_file_filter=None, verbose=0):
|
||||
"Get Yaml files to process"
|
||||
absolute_yaml_files, filtered_yaml_files = set(), set()
|
||||
if yaml_files:
|
||||
absolute_yaml_files = {get_absolute_path(yaml_file) for yaml_file in yaml_files}
|
||||
if yaml_file_filter:
|
||||
filtered_yaml_files = set(glob.glob(get_absolute_path(yaml_file_filter)))
|
||||
|
||||
all_yaml_files = absolute_yaml_files | filtered_yaml_files
|
||||
|
||||
files_with_non_yaml_extensions = {
|
||||
yaml_file
|
||||
for yaml_file
|
||||
in all_yaml_files
|
||||
if not yaml_file.endswith(".yaml") and not yaml_file.endswith(".yml")
|
||||
}
|
||||
|
||||
if any(files_with_non_yaml_extensions):
|
||||
print(f"[Warning] There maybe non markdown-mode files in the input set: {files_with_non_yaml_extensions}")
|
||||
|
||||
if verbose > 0:
|
||||
print(f'Processing files: {all_yaml_files}')
|
||||
|
||||
return all_yaml_files
|
||||
|
||||
|
||||
def extract_yaml_entries(yaml_files):
|
||||
"Extract entries by post from specified Yaml files"
|
||||
|
||||
entries = []
|
||||
for yaml_file in yaml_files:
|
||||
with open(yaml_file) as f:
|
||||
|
||||
# try:
|
||||
raw_data = yaml.load(f, Loader=yaml.UnsafeLoader)
|
||||
|
||||
# print(raw_data)
|
||||
# print(raw_data.posts.zig_zag())
|
||||
|
||||
seen_ids = set()
|
||||
|
||||
for post in raw_data.posts.zig_zag():
|
||||
all_subposts = post.descendants_and_i
|
||||
for subpost in all_subposts:
|
||||
if subpost.post_id not in seen_ids:
|
||||
seen_ids.add(subpost.post_id)
|
||||
entry = {
|
||||
"author": subpost.author.username,
|
||||
"title": subpost.title,
|
||||
"body": subpost.body}
|
||||
entries.append(entry)
|
||||
|
||||
# except yaml.YAMLError as exception:
|
||||
# print(f"Exception encountered while parsing {yaml_file}: {exception}")
|
||||
|
||||
# markdown_content = f.read()
|
||||
# entries.extend([f'#{entry.strip(empty_escape_sequences)}'
|
||||
# for entry
|
||||
# in re.split(markdown_heading_regex, markdown_content, flags=re.MULTILINE)])
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def convert_yaml_entries_to_jsonl(entries, verbose=0):
|
||||
"Convert each Markdown entries to JSON and collate as JSONL"
|
||||
jsonl = ''
|
||||
for entry in entries:
|
||||
entry_dict = {'compiled': entry, 'raw': entry}
|
||||
# Convert Dictionary to JSON and Append to JSONL string
|
||||
jsonl += f'{json.dumps(entry_dict, ensure_ascii=False)}\n'
|
||||
|
||||
if verbose > 0:
|
||||
print(f"Converted {len(entries)} to jsonl format")
|
||||
|
||||
return jsonl
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Setup Argument Parser
|
||||
parser = argparse.ArgumentParser(description="Map Yaml entries into (compressed) JSONL format")
|
||||
parser.add_argument('--output-file', '-o', type=pathlib.Path, required=True, help="Output file for (compressed) JSONL formatted notes. Expected file extensions: jsonl or jsonl.gz")
|
||||
parser.add_argument('--input-files', '-i', nargs='*', help="List of yaml files to process")
|
||||
parser.add_argument('--input-filter', type=str, default=None, help="Regex filter for yaml files to process")
|
||||
parser.add_argument('--verbose', '-v', action='count', default=0, help="Show verbose conversion logs, Default: 0")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Map notes in Yaml files to (compressed) JSONL formatted file
|
||||
yaml_to_jsonl(args.input_files, args.input_filter, args.output_file, args.verbose)
|
|
@ -6,7 +6,7 @@ import pathlib
|
|||
import yaml
|
||||
|
||||
# Internal Packages
|
||||
from src.utils.helpers import is_none_or_empty, get_absolute_path, resolve_absolute_path, merge_dicts
|
||||
from src.utils.helpers import get_absolute_path, resolve_absolute_path
|
||||
from src.utils.rawconfig import FullConfig
|
||||
|
||||
def cli(args=None):
|
||||
|
|
|
@ -12,6 +12,7 @@ class SearchType(str, Enum):
|
|||
Ledger = "ledger"
|
||||
Music = "music"
|
||||
Markdown = "markdown"
|
||||
Yaml = "yaml"
|
||||
Image = "image"
|
||||
|
||||
|
||||
|
@ -41,6 +42,7 @@ class SearchModels():
|
|||
ledger_search: TextSearchModel = None
|
||||
music_search: TextSearchModel = None
|
||||
markdown_search: TextSearchModel = None
|
||||
yaml_search: TextSearchModel = None
|
||||
image_search: ImageSearchModel = None
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue