khoj/tests/test_markdown_to_entries.py
2024-04-04 02:41:55 +05:30

247 lines
7.3 KiB
Python

import os
from pathlib import Path
from khoj.processor.content.markdown.markdown_to_entries import MarkdownToEntries
from khoj.utils.fs_syncer import get_markdown_files
from khoj.utils.rawconfig import TextContentConfig
def test_extract_markdown_with_no_headings(tmp_path):
"Convert markdown file with no heading to entry format."
# Arrange
entry = f"""
- Bullet point 1
- Bullet point 2
"""
data = {
f"{tmp_path}": entry,
}
expected_heading = f"# {tmp_path}"
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 1
# Ensure raw entry with no headings do not get heading prefix prepended
assert not entries[0].raw.startswith("#")
# Ensure compiled entry has filename prepended as top level heading
assert entries[0].compiled.startswith(expected_heading)
# Ensure compiled entry also includes the file name
assert str(tmp_path) in entries[0].compiled
def test_extract_single_markdown_entry(tmp_path):
"Convert markdown from single file to entry format."
# Arrange
entry = f"""### Heading
\t\r
Body Line 1
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 1
def test_extract_multiple_markdown_entries(tmp_path):
"Convert multiple markdown from single file to entry format."
# Arrange
entry = f"""
### Heading 1
\t\r
Heading 1 Body Line 1
### Heading 2
\t\r
Heading 2 Body Line 2
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 2
# Ensure entry compiled strings include the markdown files they originate from
assert all([tmp_path.stem in entry.compiled for entry in entries])
def test_extract_entries_with_different_level_headings(tmp_path):
"Extract markdown entries with different level headings."
# Arrange
entry = f"""
# Heading 1
## Sub-Heading 1.1
# Heading 2
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 2
assert entries[0].raw == "# Heading 1\n## Sub-Heading 1.1", "Ensure entry includes heading ancestory"
assert entries[1].raw == "# Heading 2\n"
def test_extract_entries_with_non_incremental_heading_levels(tmp_path):
"Extract markdown entries when deeper child level before shallower child level."
# Arrange
entry = f"""
# Heading 1
#### Sub-Heading 1.1
## Sub-Heading 1.2
# Heading 2
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 3
assert entries[0].raw == "# Heading 1\n#### Sub-Heading 1.1", "Ensure entry includes heading ancestory"
assert entries[1].raw == "# Heading 1\n## Sub-Heading 1.2", "Ensure entry includes heading ancestory"
assert entries[2].raw == "# Heading 2\n"
def test_extract_entries_with_text_before_headings(tmp_path):
"Extract markdown entries with some text before any headings."
# Arrange
entry = f"""
Text before headings
# Heading 1
body line 1
## Heading 2
body line 2
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=3)
# Assert
assert len(entries) == 3
assert entries[0].raw == "\nText before headings"
assert entries[1].raw == "# Heading 1\nbody line 1"
assert entries[2].raw == "# Heading 1\n## Heading 2\nbody line 2\n", "Ensure raw entry includes heading ancestory"
def test_parse_markdown_file_into_single_entry_if_small(tmp_path):
"Parse markdown file into single entry if it fits within the token limits."
# Arrange
entry = f"""
# Heading 1
body line 1
## Subheading 1.1
body line 1.1
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=12)
# Assert
assert len(entries) == 1
assert entries[0].raw == entry
def test_parse_markdown_entry_with_children_as_single_entry_if_small(tmp_path):
"Parse markdown entry with child headings as single entry if it fits within the tokens limits."
# Arrange
entry = f"""
# Heading 1
body line 1
## Subheading 1.1
body line 1.1
# Heading 2
body line 2
## Subheading 2.1
longer body line 2.1
"""
data = {
f"{tmp_path}": entry,
}
# Act
# Extract Entries from specified Markdown files
entries = MarkdownToEntries.extract_markdown_entries(markdown_files=data, max_tokens=12)
# Assert
assert len(entries) == 3
assert (
entries[0].raw == "# Heading 1\nbody line 1\n## Subheading 1.1\nbody line 1.1"
), "First entry includes children headings"
assert entries[1].raw == "# Heading 2\nbody line 2", "Second entry does not include children headings"
assert (
entries[2].raw == "# Heading 2\n## Subheading 2.1\nlonger body line 2.1\n"
), "Third entry is second entries child heading"
def test_get_markdown_files(tmp_path):
"Ensure Markdown files specified via input-filter, input-files extracted"
# Arrange
# Include via input-filter globs
group1_file1 = create_file(tmp_path, filename="group1-file1.md")
group1_file2 = create_file(tmp_path, filename="group1-file2.md")
group2_file1 = create_file(tmp_path, filename="group2-file1.markdown")
group2_file2 = create_file(tmp_path, filename="group2-file2.markdown")
# Include via input-file field
file1 = create_file(tmp_path, filename="notes.md")
# Not included by any filter
create_file(tmp_path, filename="not-included-markdown.md")
create_file(tmp_path, filename="not-included-text.txt")
expected_files = set(
[os.path.join(tmp_path, file.name) for file in [group1_file1, group1_file2, group2_file1, group2_file2, file1]]
)
# Setup input-files, input-filters
input_files = [tmp_path / "notes.md"]
input_filter = [tmp_path / "group1*.md", tmp_path / "group2*.markdown"]
markdown_config = TextContentConfig(
input_files=input_files,
input_filter=[str(filter) for filter in input_filter],
compressed_jsonl=tmp_path / "test.jsonl",
embeddings_file=tmp_path / "test_embeddings.jsonl",
)
# Act
extracted_org_files = get_markdown_files(markdown_config)
# Assert
assert len(extracted_org_files) == 5
assert set(extracted_org_files.keys()) == expected_files
# Helper Functions
def create_file(tmp_path: Path, entry=None, filename="test.md"):
markdown_file = tmp_path / filename
markdown_file.touch()
if entry:
markdown_file.write_text(entry)
return markdown_file