mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-11-27 17:35:07 +01:00
28105ee027
- Convert extract_org_entries function to actually extract org entries Previously it was extracting intermediary org-node objects instead Now it extracts the org-node objects from files and converts them into entries - Create separate, new function to extract_org_nodes from files - Similarly create wrapper funcs for md, pdf, plaintext to entries - Update org, md, pdf, plaintext to entries tests to use the new simplified wrapper function to extract org entries
228 lines
6.9 KiB
Python
228 lines
6.9 KiB
Python
import json
|
|
import os
|
|
|
|
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
|
|
from khoj.processor.content.text_to_entries import TextToEntries
|
|
from khoj.utils.fs_syncer import get_org_files
|
|
from khoj.utils.helpers import is_none_or_empty
|
|
from khoj.utils.rawconfig import Entry, TextContentConfig
|
|
|
|
|
|
def test_configure_heading_entry_to_jsonl(tmp_path):
|
|
"""Ensure entries with empty body are ignored, unless explicitly configured to index heading entries.
|
|
Property drawers not considered Body. Ignore control characters for evaluating if Body empty."""
|
|
# Arrange
|
|
entry = f"""*** Heading
|
|
:PROPERTIES:
|
|
:ID: 42-42-42
|
|
:END:
|
|
\t \r
|
|
"""
|
|
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
|
|
for index_heading_entries in [True, False]:
|
|
# Act
|
|
# Extract entries into jsonl from specified Org files
|
|
jsonl_string = OrgToEntries.convert_org_entries_to_jsonl(
|
|
OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=index_heading_entries)
|
|
)
|
|
jsonl_data = [json.loads(json_string) for json_string in jsonl_string.splitlines()]
|
|
|
|
# Assert
|
|
if index_heading_entries:
|
|
# Entry with empty body indexed when index_heading_entries set to True
|
|
assert len(jsonl_data) == 1
|
|
else:
|
|
# Entry with empty body ignored when index_heading_entries set to False
|
|
assert is_none_or_empty(jsonl_data)
|
|
|
|
|
|
def test_entry_split_when_exceeds_max_words():
|
|
"Ensure entries with compiled words exceeding max_words are split."
|
|
# Arrange
|
|
tmp_path = "/tmp/test.org"
|
|
entry = f"""*** Heading
|
|
\t\r
|
|
Body Line
|
|
"""
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
expected_heading = f"* Path: {tmp_path}\n** Heading"
|
|
|
|
# Act
|
|
# Extract Entries from specified Org files
|
|
entries = OrgToEntries.extract_org_entries(org_files=data)
|
|
|
|
# Split each entry from specified Org files by max words
|
|
jsonl_string = OrgToEntries.convert_org_entries_to_jsonl(
|
|
TextToEntries.split_entries_by_max_tokens(entries, max_tokens=4)
|
|
)
|
|
jsonl_data = [json.loads(json_string) for json_string in jsonl_string.splitlines()]
|
|
|
|
# Assert
|
|
assert len(jsonl_data) == 2
|
|
# Ensure compiled entries split by max_words start with entry heading (for search context)
|
|
assert all([entry["compiled"].startswith(expected_heading) for entry in jsonl_data])
|
|
|
|
|
|
def test_entry_split_drops_large_words():
|
|
"Ensure entries drops words larger than specified max word length from compiled version."
|
|
# Arrange
|
|
entry_text = f"""*** Heading
|
|
\t\r
|
|
Body Line 1
|
|
"""
|
|
entry = Entry(raw=entry_text, compiled=entry_text)
|
|
|
|
# Act
|
|
# Split entry by max words and drop words larger than max word length
|
|
processed_entry = TextToEntries.split_entries_by_max_tokens([entry], max_word_length=5)[0]
|
|
|
|
# Assert
|
|
# "Heading" dropped from compiled version because its over the set max word limit
|
|
assert len(processed_entry.compiled.split()) == len(entry_text.split()) - 1
|
|
|
|
|
|
def test_entry_with_body_to_jsonl(tmp_path):
|
|
"Ensure entries with valid body text are loaded."
|
|
# Arrange
|
|
entry = f"""*** Heading
|
|
:PROPERTIES:
|
|
:ID: 42-42-42
|
|
:END:
|
|
\t\r
|
|
Body Line 1
|
|
"""
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
|
|
# Act
|
|
# Extract Entries from specified Org files
|
|
jsonl_string = OrgToEntries.convert_org_entries_to_jsonl(OrgToEntries.extract_org_entries(org_files=data))
|
|
jsonl_data = [json.loads(json_string) for json_string in jsonl_string.splitlines()]
|
|
|
|
# Assert
|
|
assert len(jsonl_data) == 1
|
|
|
|
|
|
def test_file_with_entry_after_intro_text_to_jsonl(tmp_path):
|
|
"Ensure intro text before any headings is indexed."
|
|
# Arrange
|
|
entry = f"""
|
|
Intro text
|
|
|
|
* Entry Heading
|
|
entry body
|
|
"""
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
|
|
# Act
|
|
# Extract Entries from specified Org files
|
|
entries = OrgToEntries.extract_org_entries(org_files=data)
|
|
|
|
# Process Each Entry from All Notes Files
|
|
jsonl_string = OrgToEntries.convert_org_entries_to_jsonl(entries)
|
|
jsonl_data = [json.loads(json_string) for json_string in jsonl_string.splitlines()]
|
|
|
|
# Assert
|
|
assert len(jsonl_data) == 2
|
|
|
|
|
|
def test_file_with_no_headings_to_jsonl(tmp_path):
|
|
"Ensure files with no heading, only body text are loaded."
|
|
# Arrange
|
|
entry = f"""
|
|
- Bullet point 1
|
|
- Bullet point 2
|
|
"""
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
|
|
# Act
|
|
# Extract Entries from specified Org files
|
|
entries = OrgToEntries.extract_org_entries(org_files=data)
|
|
|
|
# Process Each Entry from All Notes Files
|
|
jsonl_string = OrgToEntries.convert_org_entries_to_jsonl(entries)
|
|
jsonl_data = [json.loads(json_string) for json_string in jsonl_string.splitlines()]
|
|
|
|
# Assert
|
|
assert len(jsonl_data) == 1
|
|
|
|
|
|
def test_get_org_files(tmp_path):
|
|
"Ensure Org files specified via input-filter, input-files extracted"
|
|
# Arrange
|
|
# Include via input-filter globs
|
|
group1_file1 = create_file(tmp_path, filename="group1-file1.org")
|
|
group1_file2 = create_file(tmp_path, filename="group1-file2.org")
|
|
group2_file1 = create_file(tmp_path, filename="group2-file1.org")
|
|
group2_file2 = create_file(tmp_path, filename="group2-file2.org")
|
|
# Include via input-file field
|
|
orgfile1 = create_file(tmp_path, filename="orgfile1.org")
|
|
# Not included by any filter
|
|
create_file(tmp_path, filename="orgfile2.org")
|
|
create_file(tmp_path, filename="text1.txt")
|
|
|
|
expected_files = set(
|
|
[
|
|
os.path.join(tmp_path, file.name)
|
|
for file in [group1_file1, group1_file2, group2_file1, group2_file2, orgfile1]
|
|
]
|
|
)
|
|
|
|
# Setup input-files, input-filters
|
|
input_files = [tmp_path / "orgfile1.org"]
|
|
input_filter = [tmp_path / "group1*.org", tmp_path / "group2*.org"]
|
|
|
|
org_config = TextContentConfig(
|
|
input_files=input_files,
|
|
input_filter=[str(filter) for filter in input_filter],
|
|
compressed_jsonl=tmp_path / "test.jsonl",
|
|
embeddings_file=tmp_path / "test_embeddings.jsonl",
|
|
)
|
|
|
|
# Act
|
|
extracted_org_files = get_org_files(org_config)
|
|
|
|
# Assert
|
|
assert len(extracted_org_files) == 5
|
|
assert set(extracted_org_files.keys()) == expected_files
|
|
|
|
|
|
def test_extract_entries_with_different_level_headings(tmp_path):
|
|
"Extract org entries with different level headings."
|
|
# Arrange
|
|
entry = f"""
|
|
* Heading 1
|
|
** Heading 2
|
|
"""
|
|
data = {
|
|
f"{tmp_path}": entry,
|
|
}
|
|
|
|
# Act
|
|
# Extract Entries from specified Org files
|
|
entries = OrgToEntries.extract_org_entries(org_files=data, index_heading_entries=True)
|
|
|
|
# Assert
|
|
assert len(entries) == 2
|
|
assert f"{entries[0].raw}".startswith("* Heading 1")
|
|
assert f"{entries[1].raw}".startswith("** Heading 2")
|
|
|
|
|
|
# Helper Functions
|
|
def create_file(tmp_path, entry=None, filename="test.org"):
|
|
org_file = tmp_path / filename
|
|
org_file.touch()
|
|
if entry:
|
|
org_file.write_text(entry)
|
|
return org_file
|