mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-11-30 19:03:01 +01:00
Fix, Improve Indexing, Deleting Files (#840)
### Fix - Fix degrade in speed when indexing large files - Resolve org-mode indexing bug by splitting current section only once by heading - Improve summarization by fixing formatting of text in indexed files ### Improve - Improve scaling user, admin flows to delete all entries for a user
This commit is contained in:
commit
08b379c2ab
9 changed files with 95 additions and 43 deletions
|
@ -112,7 +112,7 @@ ASGI_APPLICATION = "app.asgi.application"
|
||||||
|
|
||||||
# Database
|
# Database
|
||||||
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
|
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
|
||||||
|
DATA_UPLOAD_MAX_NUMBER_FIELDS = 20000
|
||||||
DATABASES = {
|
DATABASES = {
|
||||||
"default": {
|
"default": {
|
||||||
"ENGINE": "django.db.backends.postgresql",
|
"ENGINE": "django.db.backends.postgresql",
|
||||||
|
|
|
@ -956,7 +956,7 @@ class FileObjectAdapters:
|
||||||
return FileObject.objects.create(user=user, file_name=file_name, raw_text=raw_text)
|
return FileObject.objects.create(user=user, file_name=file_name, raw_text=raw_text)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_file_objects_by_name(user: KhojUser, file_name: str):
|
def get_file_object_by_name(user: KhojUser, file_name: str):
|
||||||
return FileObject.objects.filter(user=user, file_name=file_name).first()
|
return FileObject.objects.filter(user=user, file_name=file_name).first()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -1012,27 +1012,35 @@ class EntryAdapters:
|
||||||
return deleted_count
|
return deleted_count
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_all_entries_by_type(user: KhojUser, file_type: str = None):
|
def get_entries_by_batch(user: KhojUser, batch_size: int, file_type: str = None, file_source: str = None):
|
||||||
if file_type is None:
|
queryset = Entry.objects.filter(user=user)
|
||||||
deleted_count, _ = Entry.objects.filter(user=user).delete()
|
|
||||||
else:
|
if file_type is not None:
|
||||||
deleted_count, _ = Entry.objects.filter(user=user, file_type=file_type).delete()
|
queryset = queryset.filter(file_type=file_type)
|
||||||
|
|
||||||
|
if file_source is not None:
|
||||||
|
queryset = queryset.filter(file_source=file_source)
|
||||||
|
|
||||||
|
while queryset.exists():
|
||||||
|
batch_ids = list(queryset.values_list("id", flat=True)[:batch_size])
|
||||||
|
yield Entry.objects.filter(id__in=batch_ids)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def delete_all_entries(user: KhojUser, file_type: str = None, file_source: str = None, batch_size=1000):
|
||||||
|
deleted_count = 0
|
||||||
|
for batch in EntryAdapters.get_entries_by_batch(user, batch_size, file_type, file_source):
|
||||||
|
count, _ = batch.delete()
|
||||||
|
deleted_count += count
|
||||||
return deleted_count
|
return deleted_count
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_all_entries(user: KhojUser, file_source: str = None):
|
async def adelete_all_entries(user: KhojUser, file_type: str = None, file_source: str = None, batch_size=1000):
|
||||||
if file_source is None:
|
deleted_count = 0
|
||||||
deleted_count, _ = Entry.objects.filter(user=user).delete()
|
async for batch in EntryAdapters.get_entries_by_batch(user, batch_size, file_type, file_source):
|
||||||
else:
|
count, _ = await batch.adelete()
|
||||||
deleted_count, _ = Entry.objects.filter(user=user, file_source=file_source).delete()
|
deleted_count += count
|
||||||
return deleted_count
|
return deleted_count
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def adelete_all_entries(user: KhojUser, file_source: str = None):
|
|
||||||
if file_source is None:
|
|
||||||
return await Entry.objects.filter(user=user).adelete()
|
|
||||||
return await Entry.objects.filter(user=user, file_source=file_source).adelete()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_existing_entry_hashes_by_file(user: KhojUser, file_path: str):
|
def get_existing_entry_hashes_by_file(user: KhojUser, file_path: str):
|
||||||
return Entry.objects.filter(user=user, file_path=file_path).values_list("hashed_value", flat=True)
|
return Entry.objects.filter(user=user, file_path=file_path).values_list("hashed_value", flat=True)
|
||||||
|
|
|
@ -125,7 +125,10 @@ class EntryAdmin(admin.ModelAdmin):
|
||||||
"file_path",
|
"file_path",
|
||||||
)
|
)
|
||||||
search_fields = ("id", "user__email", "user__username", "file_path")
|
search_fields = ("id", "user__email", "user__username", "file_path")
|
||||||
list_filter = ("file_type",)
|
list_filter = (
|
||||||
|
"file_type",
|
||||||
|
"user__email",
|
||||||
|
)
|
||||||
ordering = ("-created_at",)
|
ordering = ("-created_at",)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
</h2>
|
</h2>
|
||||||
<div class="section-manage-files">
|
<div class="section-manage-files">
|
||||||
<div id="delete-all-files" class="delete-all-files">
|
<div id="delete-all-files" class="delete-all-files">
|
||||||
<button id="delete-all-files" type="submit" title="Remove all computer files from Khoj">🗑️ Delete all</button>
|
<button id="delete-all-files-button" type="submit" title="Remove all computer files from Khoj">🗑️ Delete all</button>
|
||||||
</div>
|
</div>
|
||||||
<div class="indexed-files">
|
<div class="indexed-files">
|
||||||
</div>
|
</div>
|
||||||
|
@ -115,9 +115,13 @@
|
||||||
// Get all currently indexed files on page load
|
// Get all currently indexed files on page load
|
||||||
getAllComputerFilenames();
|
getAllComputerFilenames();
|
||||||
|
|
||||||
let deleteAllComputerFilesButton = document.getElementById("delete-all-files");
|
let deleteAllComputerFilesButton = document.getElementById("delete-all-files-button");
|
||||||
deleteAllComputerFilesButton.addEventListener("click", function(event) {
|
deleteAllComputerFilesButton.addEventListener("click", function(event) {
|
||||||
event.preventDefault();
|
event.preventDefault();
|
||||||
|
originalDeleteAllComputerFilesButtonText = deleteAllComputerFilesButton.textContent;
|
||||||
|
deleteAllComputerFilesButton.textContent = "🗑️ Deleting...";
|
||||||
|
deleteAllComputerFilesButton.disabled = true;
|
||||||
|
|
||||||
fetch('/api/config/data/content-source/computer', {
|
fetch('/api/config/data/content-source/computer', {
|
||||||
method: 'DELETE',
|
method: 'DELETE',
|
||||||
headers: {
|
headers: {
|
||||||
|
@ -125,11 +129,11 @@
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.then(response => response.json())
|
.then(response => response.json())
|
||||||
.then(data => {
|
.finally(() => {
|
||||||
if (data.status == "ok") {
|
|
||||||
getAllComputerFilenames();
|
getAllComputerFilenames();
|
||||||
}
|
deleteAllComputerFilesButton.textContent = originalDeleteAllComputerFilesButtonText;
|
||||||
})
|
deleteAllComputerFilesButton.disabled = false;
|
||||||
|
});
|
||||||
});
|
});
|
||||||
</script>
|
</script>
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
|
@ -146,7 +146,7 @@ class MarkdownToEntries(TextToEntries):
|
||||||
else:
|
else:
|
||||||
entry_filename = str(Path(raw_filename))
|
entry_filename = str(Path(raw_filename))
|
||||||
|
|
||||||
heading = parsed_entry.splitlines()[0] if re.search("^#+\s", parsed_entry) else ""
|
heading = parsed_entry.splitlines()[0] if re.search(r"^#+\s", parsed_entry) else ""
|
||||||
# Append base filename to compiled entry for context to model
|
# Append base filename to compiled entry for context to model
|
||||||
# Increment heading level for heading entries and make filename as its top level heading
|
# Increment heading level for heading entries and make filename as its top level heading
|
||||||
prefix = f"# {entry_filename}\n#" if heading else f"# {entry_filename}\n"
|
prefix = f"# {entry_filename}\n#" if heading else f"# {entry_filename}\n"
|
||||||
|
|
|
@ -115,14 +115,20 @@ class OrgToEntries(TextToEntries):
|
||||||
return entries, entry_to_file_map
|
return entries, entry_to_file_map
|
||||||
|
|
||||||
# Split this entry tree into sections by the next heading level in it
|
# Split this entry tree into sections by the next heading level in it
|
||||||
# Increment heading level until able to split entry into sections
|
# Increment heading level until able to split entry into sections or reach max heading level
|
||||||
# A successful split will result in at least 2 sections
|
# A successful split will result in at least 2 sections
|
||||||
|
max_heading_level = 100
|
||||||
next_heading_level = len(ancestry)
|
next_heading_level = len(ancestry)
|
||||||
sections: List[str] = []
|
sections: List[str] = []
|
||||||
while len(sections) < 2:
|
while len(sections) < 2 and next_heading_level < max_heading_level:
|
||||||
next_heading_level += 1
|
next_heading_level += 1
|
||||||
sections = re.split(rf"(\n|^)(?=[*]{{{next_heading_level}}} .+\n?)", org_content, flags=re.MULTILINE)
|
sections = re.split(rf"(\n|^)(?=[*]{{{next_heading_level}}} .+\n?)", org_content, flags=re.MULTILINE)
|
||||||
|
|
||||||
|
# If unable to split entry into sections, log error and skip indexing it
|
||||||
|
if next_heading_level == max_heading_level:
|
||||||
|
logger.error(f"Unable to split current entry chunk: {org_content_with_ancestry[:20]}. Skip indexing it.")
|
||||||
|
return entries, entry_to_file_map
|
||||||
|
|
||||||
# Recurse down each non-empty section after parsing its body, heading and ancestry
|
# Recurse down each non-empty section after parsing its body, heading and ancestry
|
||||||
for section in sections:
|
for section in sections:
|
||||||
# Skip empty sections
|
# Skip empty sections
|
||||||
|
@ -135,7 +141,7 @@ class OrgToEntries(TextToEntries):
|
||||||
# If first non-empty line is a heading with expected heading level
|
# If first non-empty line is a heading with expected heading level
|
||||||
if re.search(rf"^\*{{{next_heading_level}}}\s", first_non_empty_line):
|
if re.search(rf"^\*{{{next_heading_level}}}\s", first_non_empty_line):
|
||||||
# Extract the section body without the heading
|
# Extract the section body without the heading
|
||||||
current_section_body = "\n".join(section.split(first_non_empty_line)[1:])
|
current_section_body = "\n".join(section.split(first_non_empty_line, 1)[1:])
|
||||||
# Parse the section heading into current section ancestry
|
# Parse the section heading into current section ancestry
|
||||||
current_section_title = first_non_empty_line[next_heading_level:].strip()
|
current_section_title = first_non_empty_line[next_heading_level:].strip()
|
||||||
current_ancestry[next_heading_level] = current_section_title
|
current_ancestry[next_heading_level] = current_section_title
|
||||||
|
|
|
@ -124,7 +124,7 @@ class TextToEntries(ABC):
|
||||||
deletion_filenames: Set[str] = None,
|
deletion_filenames: Set[str] = None,
|
||||||
user: KhojUser = None,
|
user: KhojUser = None,
|
||||||
regenerate: bool = False,
|
regenerate: bool = False,
|
||||||
file_to_text_map: dict[str, List[str]] = None,
|
file_to_text_map: dict[str, str] = None,
|
||||||
):
|
):
|
||||||
with timer("Constructed current entry hashes in", logger):
|
with timer("Constructed current entry hashes in", logger):
|
||||||
hashes_by_file = dict[str, set[str]]()
|
hashes_by_file = dict[str, set[str]]()
|
||||||
|
@ -137,7 +137,7 @@ class TextToEntries(ABC):
|
||||||
if regenerate:
|
if regenerate:
|
||||||
with timer("Cleared existing dataset for regeneration in", logger):
|
with timer("Cleared existing dataset for regeneration in", logger):
|
||||||
logger.debug(f"Deleting all entries for file type {file_type}")
|
logger.debug(f"Deleting all entries for file type {file_type}")
|
||||||
num_deleted_entries = EntryAdapters.delete_all_entries_by_type(user, file_type)
|
num_deleted_entries = EntryAdapters.delete_all_entries(user, file_type=file_type)
|
||||||
|
|
||||||
hashes_to_process = set()
|
hashes_to_process = set()
|
||||||
with timer("Identified entries to add to database in", logger):
|
with timer("Identified entries to add to database in", logger):
|
||||||
|
@ -192,16 +192,17 @@ class TextToEntries(ABC):
|
||||||
logger.debug(f"Added {len(added_entries)} {file_type} entries to database")
|
logger.debug(f"Added {len(added_entries)} {file_type} entries to database")
|
||||||
|
|
||||||
if file_to_text_map:
|
if file_to_text_map:
|
||||||
# get the list of file_names using added_entries
|
with timer("Indexed text of modified file in", logger):
|
||||||
filenames_to_update = [entry.file_path for entry in added_entries]
|
# get the set of modified files from added_entries
|
||||||
# for each file_name in filenames_to_update, try getting the file object and updating raw_text and if it fails create a new file object
|
modified_files = {entry.file_path for entry in added_entries}
|
||||||
for file_name in filenames_to_update:
|
# create or update text of each updated file indexed on DB
|
||||||
raw_text = " ".join(file_to_text_map[file_name])
|
for modified_file in modified_files:
|
||||||
file_object = FileObjectAdapters.get_file_objects_by_name(user, file_name)
|
raw_text = file_to_text_map[modified_file]
|
||||||
|
file_object = FileObjectAdapters.get_file_object_by_name(user, modified_file)
|
||||||
if file_object:
|
if file_object:
|
||||||
FileObjectAdapters.update_raw_text(file_object, raw_text)
|
FileObjectAdapters.update_raw_text(file_object, raw_text)
|
||||||
else:
|
else:
|
||||||
FileObjectAdapters.create_file_object(user, file_name, raw_text)
|
FileObjectAdapters.create_file_object(user, modified_file, raw_text)
|
||||||
|
|
||||||
new_dates = []
|
new_dates = []
|
||||||
with timer("Indexed dates from added entries in", logger):
|
with timer("Indexed dates from added entries in", logger):
|
||||||
|
|
|
@ -183,7 +183,7 @@ async def remove_content_source_data(
|
||||||
raise ValueError(f"Invalid content source: {content_source}")
|
raise ValueError(f"Invalid content source: {content_source}")
|
||||||
elif content_object != "Computer":
|
elif content_object != "Computer":
|
||||||
await content_object.objects.filter(user=user).adelete()
|
await content_object.objects.filter(user=user).adelete()
|
||||||
await sync_to_async(EntryAdapters.delete_all_entries)(user, content_source)
|
await sync_to_async(EntryAdapters.delete_all_entries)(user, file_source=content_source)
|
||||||
|
|
||||||
enabled_content = await sync_to_async(EntryAdapters.get_unique_file_types)(user)
|
enabled_content = await sync_to_async(EntryAdapters.get_unique_file_types)(user)
|
||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
|
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
|
||||||
from khoj.processor.content.text_to_entries import TextToEntries
|
from khoj.processor.content.text_to_entries import TextToEntries
|
||||||
|
@ -41,6 +42,35 @@ def test_configure_indexing_heading_only_entries(tmp_path):
|
||||||
assert is_none_or_empty(entries[1])
|
assert is_none_or_empty(entries[1])
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_entries_when_child_headings_have_same_prefix():
|
||||||
|
"""Extract org entries from entries having child headings with same prefix.
|
||||||
|
Prevents regressions like the one fixed in PR #840.
|
||||||
|
"""
|
||||||
|
# Arrange
|
||||||
|
tmp_path = "tests/data/org/same_prefix_headings.org"
|
||||||
|
entry: str = """
|
||||||
|
** 1
|
||||||
|
*** 1.1
|
||||||
|
**** 1.1.2
|
||||||
|
""".strip()
|
||||||
|
data = {
|
||||||
|
f"{tmp_path}": entry,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Act
|
||||||
|
# Extract Entries from specified Org files
|
||||||
|
start = time.time()
|
||||||
|
entries = OrgToEntries.extract_org_entries(org_files=data, max_tokens=2)
|
||||||
|
end = time.time()
|
||||||
|
indexing_time = end - start
|
||||||
|
|
||||||
|
# Assert
|
||||||
|
explanation_msg = (
|
||||||
|
"It should not take more than 6 seconds to index. Entry extraction may have gone into an infinite loop."
|
||||||
|
)
|
||||||
|
assert indexing_time < 6 * len(entries), explanation_msg
|
||||||
|
|
||||||
|
|
||||||
def test_entry_split_when_exceeds_max_tokens():
|
def test_entry_split_when_exceeds_max_tokens():
|
||||||
"Ensure entries with compiled words exceeding max_tokens are split."
|
"Ensure entries with compiled words exceeding max_tokens are split."
|
||||||
# Arrange
|
# Arrange
|
||||||
|
|
Loading…
Reference in a new issue