Identify file type by content & allow server to index all text files

- Use Magika's AI for a tiny, portable and better file type
  identification system
- Existing file type identification tools like `file' and `magic'
  require system level packages, that may not be installed by default
  on all operating systems (e.g `file' command on Windows)
This commit is contained in:
Debanjum Singh Solanky 2024-04-03 10:24:16 +05:30
parent 1c229dad91
commit 89915dcb4c
4 changed files with 35 additions and 13 deletions

View file

@ -46,6 +46,7 @@ dependencies = [
"openai >= 1.0.0",
"tiktoken >= 0.3.2",
"tenacity >= 8.2.2",
"magika ~= 0.5.1",
"pillow ~= 9.5.0",
"pydantic >= 2.0.0",
"pyyaml ~= 6.0",

View file

@ -67,11 +67,10 @@ async def update(
try:
logger.info(f"📬 Updating content index via API call by {client} client")
for file in files:
file_type, encoding = get_file_type(file.content_type)
file_content = file.file.read()
file_type, encoding = get_file_type(file.content_type, file_content)
if file_type in index_files:
index_files[file_type][file.filename] = (
file.file.read().decode("utf-8") if encoding == "utf-8" else file.file.read() # type: ignore
)
index_files[file_type][file.filename] = file_content.decode(encoding) if encoding else file_content
else:
logger.warning(f"Skipped indexing unsupported file type sent by {client} client: {file.filename}")

View file

@ -1,9 +1,11 @@
import glob
import logging
import os
from pathlib import Path
from typing import Optional
from bs4 import BeautifulSoup
from magika import Magika
from khoj.database.models import (
LocalMarkdownConfig,
@ -16,6 +18,7 @@ from khoj.utils.helpers import get_absolute_path, is_none_or_empty
from khoj.utils.rawconfig import TextContentConfig
logger = logging.getLogger(__name__)
magika = Magika()
def collect_files(search_type: Optional[SearchType] = SearchType.All, user=None) -> dict:
@ -47,6 +50,11 @@ def construct_config_from_db(db_config) -> TextContentConfig:
def get_plaintext_files(config: TextContentConfig) -> dict[str, str]:
def is_plaintextfile(file: str):
"Check if file is plaintext file"
# Check if file path exists
mime_type = magika.identify_path(Path(file)).output.mime_type
if mime_type != "inode/x-empty" and mime_type != "application/unknown":
return mime_type.startswith("text/")
# Use file extension to decide plaintext if file content is not identifiable
return file.endswith(("txt", "md", "markdown", "org", "mbox", "rst", "html", "htm", "xml"))
def extract_html_content(html_content: str):
@ -65,7 +73,7 @@ def get_plaintext_files(config: TextContentConfig) -> dict[str, str]:
logger.debug("At least one of input-files or input-file-filter is required to be specified")
return {}
"Get all files to process"
# Get all plain text files to process
absolute_plaintext_files, filtered_plaintext_files = set(), set()
if input_files:
absolute_plaintext_files = {get_absolute_path(jsonl_file) for jsonl_file in input_files}
@ -209,7 +217,7 @@ def get_pdf_files(config: TextContentConfig):
logger.debug("At least one of pdf-files or pdf-file-filter is required to be specified")
return {}
"Get PDF files to process"
# Get PDF files to process
absolute_pdf_files, filtered_pdf_files = set(), set()
if pdf_files:
absolute_pdf_files = {get_absolute_path(pdf_file) for pdf_file in pdf_files}

View file

@ -19,6 +19,7 @@ from urllib.parse import urlparse
import torch
from asgiref.sync import sync_to_async
from magika import Magika
from khoj.utils import constants
@ -29,6 +30,10 @@ if TYPE_CHECKING:
from khoj.utils.rawconfig import AppConfig
# Initialize Magika for file type identification
magika = Magika()
class AsyncIteratorWrapper:
def __init__(self, obj):
self._it = iter(obj)
@ -88,22 +93,31 @@ def merge_dicts(priority_dict: dict, default_dict: dict):
return merged_dict
def get_file_type(file_type: str) -> tuple[str, str]:
def get_file_type(file_type: str, file_content: bytes) -> tuple[str, str]:
"Get file type from file mime type"
# Extract encoding from file_type
encoding = file_type.split("=")[1].strip().lower() if ";" in file_type else None
file_type = file_type.split(";")[0].strip() if ";" in file_type else file_type
if file_type in ["text/markdown"]:
# Infer content type from reading file content
try:
content_type = magika.identify_bytes(file_content).output.mime_type
except Exception:
# Fallback to using just file type if content type cannot be inferred
content_type = file_type
if file_type in ["text/markdown"] and content_type.startswith("text/"):
return "markdown", encoding
elif file_type in ["text/org"]:
elif file_type in ["text/org"] and content_type.startswith("text/"):
return "org", encoding
elif file_type in ["application/pdf"]:
elif file_type in ["application/pdf"] and content_type == "application/pdf":
return "pdf", encoding
elif file_type in ["image/jpeg"]:
elif file_type in ["image/jpeg"] and content_type == "image/jpeg":
return "jpeg", encoding
elif file_type in ["image/png"]:
elif file_type in ["image/png"] and content_type == "image/png":
return "png", encoding
elif file_type in ["text/plain", "text/html", "application/xml", "text/x-rst"]:
elif content_type.startswith("text/"):
return "plaintext", encoding
else:
return "other", encoding