Use single func to handle indexing from scratch and incrementally

Previous regenerate mechanism did not deduplicate entries with same key
So entries looked different between regenerate and update
Having single func, mark_entries_for_update, to handle both scenarios
will avoid this divergence

Update all text_to_jsonl methods to use the above method for
generating index from scratch
This commit is contained in:
Debanjum Singh Solanky 2023-07-15 20:03:19 -07:00
parent 1673bb5558
commit 9bcca43299
8 changed files with 24 additions and 42 deletions

View file

@ -15,7 +15,6 @@ from khoj.processor.org_mode.org_to_jsonl import OrgToJsonl
from khoj.processor.text_to_jsonl import TextToJsonl
from khoj.utils.jsonl import dump_jsonl, compress_jsonl_data
from khoj.utils.rawconfig import Entry
from khoj.utils import state
logger = logging.getLogger(__name__)
@ -38,7 +37,7 @@ class GithubToJsonl(TextToJsonl):
else:
return
def process(self, previous_entries=None):
def process(self, previous_entries=[]):
current_entries = []
for repo in self.config.repos:
current_entries += self.process_repo(repo)

View file

@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
class JsonlToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
def process(self, previous_entries=[]):
# Extract required fields from config
input_jsonl_files, input_jsonl_filter, output_file = (
self.config.input_files,
@ -38,15 +38,9 @@ class JsonlToJsonl(TextToJsonl):
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries,
previous_entries,
key="compiled",
logger=logger,
)
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
with timer("Write entries to JSONL file", logger):
# Process Each Entry from All Notes Files

View file

@ -23,7 +23,7 @@ class MarkdownToJsonl(TextToJsonl):
self.config = config
# Define Functions
def process(self, previous_entries=None):
def process(self, previous_entries=[]):
# Extract required fields from config
markdown_files, markdown_file_filter, output_file = (
self.config.input_files,
@ -51,12 +51,9 @@ class MarkdownToJsonl(TextToJsonl):
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
with timer("Write markdown entries to JSONL file", logger):
# Process Each Entry from All Notes Files

View file

@ -80,7 +80,7 @@ class NotionToJsonl(TextToJsonl):
self.body_params = {"page_size": 100}
def process(self, previous_entries=None):
def process(self, previous_entries=[]):
current_entries = []
# Get all pages
@ -240,12 +240,9 @@ class NotionToJsonl(TextToJsonl):
def update_entries_with_ids(self, current_entries, previous_entries):
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
with timer("Write Notion entries to JSONL file", logger):
# Process Each Entry from all Notion entries

View file

@ -22,7 +22,7 @@ class OrgToJsonl(TextToJsonl):
self.config = config
# Define Functions
def process(self, previous_entries: List[Entry] = None):
def process(self, previous_entries: List[Entry] = []):
# Extract required fields from config
org_files, org_file_filter, output_file = (
self.config.input_files,
@ -51,9 +51,7 @@ class OrgToJsonl(TextToJsonl):
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256)
# Identify, mark and merge any new entries with previous entries
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
with timer("Identify new or updated entries", logger):
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)

View file

@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
class PdfToJsonl(TextToJsonl):
# Define Functions
def process(self, previous_entries=None):
def process(self, previous_entries=[]):
# Extract required fields from config
pdf_files, pdf_file_filter, output_file = (
self.config.input_files,
@ -45,12 +45,9 @@ class PdfToJsonl(TextToJsonl):
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
if not previous_entries:
entries_with_ids = list(enumerate(current_entries))
else:
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
entries_with_ids = TextToJsonl.mark_entries_for_update(
current_entries, previous_entries, key="compiled", logger=logger
)
with timer("Write PDF entries to JSONL file", logger):
# Process Each Entry from All Notes Files

View file

@ -17,7 +17,7 @@ class TextToJsonl(ABC):
self.config = config
@abstractmethod
def process(self, previous_entries: List[Entry] = None) -> List[Tuple[int, Entry]]:
def process(self, previous_entries: List[Entry] = []) -> List[Tuple[int, Entry]]:
...
@staticmethod

View file

@ -176,10 +176,10 @@ def setup(
) -> TextContent:
# Map notes in text files to (compressed) JSONL formatted file
config.compressed_jsonl = resolve_absolute_path(config.compressed_jsonl)
previous_entries = (
extract_entries(config.compressed_jsonl) if config.compressed_jsonl.exists() and not regenerate else None
)
entries_with_indices = text_to_jsonl(config).process(previous_entries or [])
previous_entries = []
if config.compressed_jsonl.exists() and not regenerate:
previous_entries = extract_entries(config.compressed_jsonl)
entries_with_indices = text_to_jsonl(config).process(previous_entries)
# Extract Updated Entries
entries = extract_entries(config.compressed_jsonl)