Update logical splitting of org-mode text into entries

- Major
  - Do not split org file, entry if it fits within the max token limits
    - Recurse down org file entries, one heading level at a time until
      reach leaf node or the current parent tree fits context window
    - Update `process_single_org_file' func logic to do this recursion

  - Convert extracted org nodes with children into entries
    - Previously org node to entry code just had to handle leaf entries
    - Now it recieve list of org node trees
    - Only add ancestor path to root org-node of each tree
    - Indent each entry trees headings by +1 level from base level (=2)

- Minor
  - Stop timing org-node parsing vs org-node to entry conversion
    Just time the wrapping function for org-mode entry extraction
    This standardizes what is being timed across at md, org etc.
  - Move try/catch to `extract_org_nodes' from `parse_single_org_file'
    func to standardize this also across md, org
This commit is contained in:
Debanjum Singh Solanky 2024-02-12 11:27:35 +05:30
parent eaa27ca841
commit 44b3247869
2 changed files with 303 additions and 73 deletions

View file

@ -1,4 +1,5 @@
import logging
import re
from pathlib import Path
from typing import Dict, List, Tuple
@ -30,11 +31,12 @@ class OrgToEntries(TextToEntries):
deletion_file_names = None
# Extract Entries from specified Org files
max_tokens = 256
with timer("Extract entries from specified Org files", logger):
current_entries = self.extract_org_entries(files)
current_entries = self.extract_org_entries(files, max_tokens=max_tokens)
with timer("Split entries by max token size supported by model", logger):
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=256)
current_entries = self.split_entries_by_max_tokens(current_entries, max_tokens=max_tokens)
# Identify, mark and merge any new entries with previous entries
with timer("Identify new or updated entries", logger):
@ -52,96 +54,173 @@ class OrgToEntries(TextToEntries):
return num_new_embeddings, num_deleted_embeddings
@staticmethod
def extract_org_entries(org_files: dict[str, str], index_heading_entries: bool = False) -> List[Entry]:
def extract_org_entries(
org_files: dict[str, str], index_heading_entries: bool = False, max_tokens=256
) -> List[Entry]:
"Extract entries from specified Org files"
with timer("Parse entries from org files into OrgNode objects", logger):
entry_nodes, file_to_entries = OrgToEntries.extract_org_nodes(org_files)
with timer("Convert OrgNodes into list of entries", logger):
return OrgToEntries.convert_org_nodes_to_entries(entry_nodes, file_to_entries, index_heading_entries)
entries, entry_to_file_map = OrgToEntries.extract_org_nodes(org_files, max_tokens)
return OrgToEntries.convert_org_nodes_to_entries(entries, entry_to_file_map, index_heading_entries)
@staticmethod
def extract_org_nodes(org_files: dict[str, str]) -> Tuple[List[Orgnode], Dict[Orgnode, str]]:
def extract_org_nodes(org_files: dict[str, str], max_tokens) -> Tuple[List[List[Orgnode]], Dict[Orgnode, str]]:
"Extract org nodes from specified org files"
entry_nodes: List[Orgnode] = []
entries: List[List[Orgnode]] = []
entry_to_file_map: List[Tuple[Orgnode, str]] = []
for org_file in org_files:
org_content = org_files[org_file]
entry_nodes, entry_to_file_map = OrgToEntries.process_single_org_file(
org_content, org_file, entry_nodes, entry_to_file_map
)
try:
org_content = org_files[org_file]
entries, entry_to_file_map = OrgToEntries.process_single_org_file(
org_content, org_file, entries, entry_to_file_map, max_tokens
)
except Exception as e:
logger.error(f"Unable to process file: {org_file}. Skipped indexing it.\nError; {e}", exc_info=True)
return entry_nodes, dict(entry_to_file_map)
return entries, dict(entry_to_file_map)
@staticmethod
def process_single_org_file(
org_content: str,
org_file: str,
entries: List[Orgnode],
entries: List[List[Orgnode]],
entry_to_file_map: List[Tuple[Orgnode, str]],
) -> Tuple[List[Orgnode], List[Tuple[Orgnode, str]]]:
# Process single org file. The org parser assumes that the file is a single org file and reads it from a buffer.
# We'll split the raw content of this file by new line to mimic the same behavior.
try:
org_file_entries = orgnode.makelist(org_content, org_file)
entry_to_file_map += zip(org_file_entries, [org_file] * len(org_file_entries))
entries.extend(org_file_entries)
except Exception as e:
logger.error(f"Unable to process file: {org_file}. Skipped indexing it.\nError; {e}", exc_info=True)
max_tokens=256,
ancestry: Dict[int, str] = {},
) -> Tuple[List[List[Orgnode]], List[Tuple[Orgnode, str]]]:
"""Parse org_content from org_file into OrgNode entries
Recurse down org file entries, one heading level at a time,
until reach a leaf entry or the current entry tree fits max_tokens.
Parse recursion terminating entry (trees) into (a list of) OrgNode objects.
"""
# Prepend the org section's heading ancestry
ancestry_string = "\n".join([f"{'*' * key} {ancestry[key]}" for key in sorted(ancestry.keys())])
org_content_with_ancestry = f"{ancestry_string}{org_content}"
# If content is small or content has no children headings, save it as a single entry
# Note: This is the terminating condition for this recursive function
if len(TextToEntries.tokenizer(org_content_with_ancestry)) <= max_tokens or not re.search(
rf"^\*{{{len(ancestry)+1},}}\s", org_content, re.MULTILINE
):
orgnode_content_with_ancestry = orgnode.makelist(org_content_with_ancestry, org_file)
entry_to_file_map += zip(orgnode_content_with_ancestry, [org_file] * len(orgnode_content_with_ancestry))
entries.extend([orgnode_content_with_ancestry])
return entries, entry_to_file_map
# Split this entry tree into sections by the next heading level in it
# Increment heading level until able to split entry into sections
# A successful split will result in at least 2 sections
next_heading_level = len(ancestry)
sections: List[str] = []
while len(sections) < 2:
next_heading_level += 1
sections = re.split(rf"(\n|^)(?=[*]{{{next_heading_level}}} .+\n?)", org_content, re.MULTILINE)
# Recurse down each non-empty section after parsing its body, heading and ancestry
for section in sections:
# Skip empty sections
if section.strip() == "":
continue
# Extract the section body and (when present) the heading
current_ancestry = ancestry.copy()
first_non_empty_line = [line for line in section.split("\n") if line.strip() != ""][0]
# If first non-empty line is a heading with expected heading level
if re.search(rf"^\*{{{next_heading_level}}}\s", first_non_empty_line):
# Extract the section body without the heading
current_section_body = "\n".join(section.split(first_non_empty_line)[1:])
# Parse the section heading into current section ancestry
current_section_title = first_non_empty_line[next_heading_level:].strip()
current_ancestry[next_heading_level] = current_section_title
# Else process the section as just body text
else:
current_section_body = section
# Recurse down children of the current entry
OrgToEntries.process_single_org_file(
current_section_body,
org_file,
entries,
entry_to_file_map,
max_tokens,
current_ancestry,
)
return entries, entry_to_file_map
@staticmethod
def convert_org_nodes_to_entries(
parsed_entries: List[orgnode.Orgnode], entry_to_file_map, index_heading_entries=False
parsed_entries: List[List[Orgnode]],
entry_to_file_map: Dict[Orgnode, str],
index_heading_entries: bool = False,
) -> List[Entry]:
"Convert Org-Mode nodes into list of Entry objects"
"""
Convert OrgNode lists into list of Entry objects
Each list of OrgNodes is a parsed parent org tree or leaf node.
Convert each list of these OrgNodes into a single Entry.
"""
entries: List[Entry] = []
for parsed_entry in parsed_entries:
if not parsed_entry.hasBody and not index_heading_entries:
# Ignore title notes i.e notes with just headings and empty body
continue
for entry_group in parsed_entries:
entry_heading, entry_compiled, entry_raw = "", "", ""
for parsed_entry in entry_group:
if not parsed_entry.hasBody and not index_heading_entries:
# Ignore title notes i.e notes with just headings and empty body
continue
todo_str = f"{parsed_entry.todo} " if parsed_entry.todo else ""
todo_str = f"{parsed_entry.todo} " if parsed_entry.todo else ""
# Prepend ancestor headings, filename as top heading to entry for context
ancestors_trail = " / ".join(parsed_entry.ancestors) or Path(entry_to_file_map[parsed_entry])
if parsed_entry.heading:
heading = f"* Path: {ancestors_trail}\n** {todo_str}{parsed_entry.heading}."
else:
heading = f"* Path: {ancestors_trail}."
# Set base level to current org-node tree's root heading level
if not entry_heading and parsed_entry.level > 0:
base_level = parsed_entry.level
# Indent entry by 1 heading level as ancestry is prepended as top level heading
heading = f"{'*' * (parsed_entry.level-base_level+2)} {todo_str}" if parsed_entry.level > 0 else ""
if parsed_entry.heading:
heading += f"{parsed_entry.heading}."
compiled = heading
if state.verbose > 2:
logger.debug(f"Title: {heading}")
# Prepend ancestor headings, filename as top heading to root parent entry for context
# Children nodes do not need ancestors trail as root parent node will have it
if not entry_heading:
ancestors_trail = " / ".join(parsed_entry.ancestors) or Path(entry_to_file_map[parsed_entry])
heading = f"* Path: {ancestors_trail}\n{heading}" if heading else f"* Path: {ancestors_trail}."
if parsed_entry.tags:
tags_str = " ".join(parsed_entry.tags)
compiled += f"\t {tags_str}."
compiled = heading
if state.verbose > 2:
logger.debug(f"Tags: {tags_str}")
logger.debug(f"Title: {heading}")
if parsed_entry.closed:
compiled += f'\n Closed on {parsed_entry.closed.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Closed: {parsed_entry.closed.strftime("%Y-%m-%d")}')
if parsed_entry.tags:
tags_str = " ".join(parsed_entry.tags)
compiled += f"\t {tags_str}."
if state.verbose > 2:
logger.debug(f"Tags: {tags_str}")
if parsed_entry.scheduled:
compiled += f'\n Scheduled for {parsed_entry.scheduled.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Scheduled: {parsed_entry.scheduled.strftime("%Y-%m-%d")}')
if parsed_entry.closed:
compiled += f'\n Closed on {parsed_entry.closed.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Closed: {parsed_entry.closed.strftime("%Y-%m-%d")}')
if parsed_entry.hasBody:
compiled += f"\n {parsed_entry.body}"
if state.verbose > 2:
logger.debug(f"Body: {parsed_entry.body}")
if parsed_entry.scheduled:
compiled += f'\n Scheduled for {parsed_entry.scheduled.strftime("%Y-%m-%d")}.'
if state.verbose > 2:
logger.debug(f'Scheduled: {parsed_entry.scheduled.strftime("%Y-%m-%d")}')
if compiled:
if parsed_entry.hasBody:
compiled += f"\n {parsed_entry.body}"
if state.verbose > 2:
logger.debug(f"Body: {parsed_entry.body}")
# Add the sub-entry contents to the entry
entry_compiled += f"{compiled}"
entry_raw += f"{parsed_entry}"
if not entry_heading:
entry_heading = heading
if entry_compiled:
entries.append(
Entry(
compiled=compiled,
raw=f"{parsed_entry}",
heading=f"{heading}",
compiled=entry_compiled,
raw=entry_raw,
heading=f"{entry_heading}",
file=f"{entry_to_file_map[parsed_entry]}",
)
)

View file

@ -1,5 +1,5 @@
import json
import os
import re
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
from khoj.processor.content.text_to_entries import TextToEntries
@ -8,7 +8,7 @@ from khoj.utils.helpers import is_none_or_empty
from khoj.utils.rawconfig import Entry, TextContentConfig
def test_configure_heading_entry_to_jsonl(tmp_path):
def test_configure_indexing_heading_only_entries(tmp_path):
"""Ensure entries with empty body are ignored, unless explicitly configured to index heading entries.
Property drawers not considered Body. Ignore control characters for evaluating if Body empty."""
# Arrange
@ -26,7 +26,9 @@ def test_configure_heading_entry_to_jsonl(tmp_path):
for index_heading_entries in [True, False]:
# Act
# Extract entries into jsonl from specified Org files
entries = OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=index_heading_entries)
entries = OrgToEntries.extract_org_entries(
org_files=data, index_heading_entries=index_heading_entries, max_tokens=3
)
# Assert
if index_heading_entries:
@ -77,10 +79,153 @@ def test_entry_split_drops_large_words():
processed_entry = TextToEntries.split_entries_by_max_tokens([entry], max_word_length=5)[0]
# Assert
# "Heading" dropped from compiled version because its over the set max word limit
# (Only) "Heading" dropped from compiled version because its over the set max word limit
assert "Heading" not in processed_entry.compiled
assert len(processed_entry.compiled.split()) == len(entry_text.split()) - 1
def test_parse_org_file_into_single_entry_if_small(tmp_path):
"Parse org file into single entry if it fits within the token limits."
# Arrange
original_entry = f"""
* Heading 1
body line 1
** Subheading 1.1
body line 1.1
"""
data = {
f"{tmp_path}": original_entry,
}
expected_entry = f"""
* Heading 1
body line 1
** Subheading 1.1
body line 1.1
""".lstrip()
# Act
# Extract Entries from specified Org files
extracted_entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=12)
for entry in extracted_entries:
entry.raw = clean(entry.raw)
# Assert
assert len(extracted_entries) == 1
assert entry.raw == expected_entry
def test_parse_org_entry_with_children_as_single_entry_if_small(tmp_path):
"Parse org entry with child headings as single entry only if it fits within the tokens limits."
# Arrange
entry = f"""
* Heading 1
body line 1
** Subheading 1.1
body line 1.1
* Heading 2
body line 2
** Subheading 2.1
longer body line 2.1
"""
data = {
f"{tmp_path}": entry,
}
first_expected_entry = f"""
* Path: {tmp_path}
** Heading 1.
body line 1
*** Subheading 1.1.
body line 1.1
""".lstrip()
second_expected_entry = f"""
* Path: {tmp_path}
** Heading 2.
body line 2
""".lstrip()
third_expected_entry = f"""
* Path: {tmp_path} / Heading 2
** Subheading 2.1.
longer body line 2.1
""".lstrip()
# Act
# Extract Entries from specified Org files
extracted_entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=12)
# Assert
assert len(extracted_entries) == 3
assert extracted_entries[0].compiled == first_expected_entry, "First entry includes children headings"
assert extracted_entries[1].compiled == second_expected_entry, "Second entry does not include children headings"
assert extracted_entries[2].compiled == third_expected_entry, "Third entry is second entries child heading"
def test_separate_sibling_org_entries_if_all_cannot_fit_in_token_limit(tmp_path):
"Parse org sibling entries as separate entries only if it fits within the tokens limits."
# Arrange
entry = f"""
* Heading 1
body line 1
** Subheading 1.1
body line 1.1
* Heading 2
body line 2
** Subheading 2.1
body line 2.1
* Heading 3
body line 3
** Subheading 3.1
body line 3.1
"""
data = {
f"{tmp_path}": entry,
}
first_expected_entry = f"""
* Path: {tmp_path}
** Heading 1.
body line 1
*** Subheading 1.1.
body line 1.1
""".lstrip()
second_expected_entry = f"""
* Path: {tmp_path}
** Heading 2.
body line 2
*** Subheading 2.1.
body line 2.1
""".lstrip()
third_expected_entry = f"""
* Path: {tmp_path}
** Heading 3.
body line 3
*** Subheading 3.1.
body line 3.1
""".lstrip()
# Act
# Extract Entries from specified Org files
# Max tokens = 30 is in the middle of 2 entry (24 tokens) and 3 entry (36 tokens) tokens boundary
# Where each sibling entry contains 12 tokens per sibling entry * 3 entries = 36 tokens
extracted_entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=30)
# Assert
assert len(extracted_entries) == 3
assert extracted_entries[0].compiled == first_expected_entry, "First entry includes children headings"
assert extracted_entries[1].compiled == second_expected_entry, "Second entry includes children headings"
assert extracted_entries[2].compiled == third_expected_entry, "Third entry includes children headings"
def test_entry_with_body_to_entry(tmp_path):
"Ensure entries with valid body text are loaded."
# Arrange
@ -118,13 +263,13 @@ Intro text
# Act
# Extract Entries from specified Org files
entries = OrgToEntries.extract_org_entries(org_files=data)
entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=3)
# Assert
assert len(entries) == 2
def test_file_with_no_headings_to_jsonl(tmp_path):
def test_file_with_no_headings_to_entry(tmp_path):
"Ensure files with no heading, only body text are loaded."
# Arrange
entry = f"""
@ -137,7 +282,7 @@ def test_file_with_no_headings_to_jsonl(tmp_path):
# Act
# Extract Entries from specified Org files
entries = OrgToEntries.extract_org_entries(org_files=data)
entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=3)
# Assert
assert len(entries) == 1
@ -197,13 +342,14 @@ def test_extract_entries_with_different_level_headings(tmp_path):
# Act
# Extract Entries from specified Org files
entries = OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=True)
entries = OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=True, max_tokens=3)
for entry in entries:
entry.raw = clean(f"{entry.raw}")
# Assert
assert len(entries) == 3
assert f"{entries[0].raw}".startswith("* Heading 1")
assert f"{entries[1].raw}".startswith("** Sub-Heading 1.1")
assert f"{entries[2].raw}".startswith("* Heading 2")
assert len(entries) == 2
assert entries[0].raw == "* Heading 1\n** Sub-Heading 1.1\n", "Ensure entry includes heading ancestory"
assert entries[1].raw == "* Heading 2\n"
# Helper Functions
@ -213,3 +359,8 @@ def create_file(tmp_path, entry=None, filename="test.org"):
if entry:
org_file.write_text(entry)
return org_file
def clean(entry):
"Remove properties from entry for easier comparison."
return re.sub(r"\n:PROPERTIES:(.*?):END:", "", entry, flags=re.DOTALL)