Use multi-part form to receive files to index on server

- This uses existing HTTP affordance to process files
  - Better handling of binary file formats as removes need to url encode/decode
  - Less memory utilization than streaming json as files get
    automatically written to disk once memory utilization exceeds preset limits
  - No manual parsing of raw files streams required
This commit is contained in:
Debanjum Singh Solanky 2023-10-11 17:14:15 -07:00
parent 9ba173bc2d
commit 60e9a61647
3 changed files with 21 additions and 35 deletions

View file

@ -40,6 +40,7 @@ dependencies = [
"dateparser >= 1.1.1",
"defusedxml == 0.7.1",
"fastapi == 0.77.1",
"python-multipart >= 0.0.5",
"jinja2 == 3.1.2",
"openai >= 0.27.0, < 1.0.0",
"tiktoken >= 0.3.2",

View file

@ -1,10 +1,9 @@
# Standard Packages
import logging
import sys
from typing import Optional, Union, Dict
# External Packages
from fastapi import APIRouter, HTTPException, Header, Request, Body, Response
from fastapi import APIRouter, HTTPException, Header, Response, UploadFile
from pydantic import BaseModel
# Internal Packages
@ -58,7 +57,7 @@ class IndexerInput(BaseModel):
@indexer.post("/batch")
async def index_batch(
request: Request,
files: list[UploadFile],
x_api_key: str = Header(None),
regenerate: bool = False,
search_type: Optional[Union[state.SearchType, str]] = None,
@ -67,32 +66,14 @@ async def index_batch(
raise HTTPException(status_code=401, detail="Invalid API Key")
state.config_lock.acquire()
try:
logger.info(f"Received batch indexing request")
index_batch_request_acc = b""
async for chunk in request.stream():
index_batch_request_acc += chunk
data_bytes = sys.getsizeof(index_batch_request_acc)
unit = "KB"
data_size = data_bytes / 1024
if data_size > 1000:
unit = "MB"
data_size = data_size / 1024
if data_size > 1000:
unit = "GB"
data_size = data_size / 1024
data_size_metric = f"{data_size:.2f} {unit}"
logger.info(f"Received {data_size_metric} of data")
index_batch_request = IndexBatchRequest.parse_raw(index_batch_request_acc)
logger.info(f"Received {len(index_batch_request.files)} files")
logger.info("📬 Updating content index via API")
org_files: Dict[str, str] = {}
markdown_files: Dict[str, str] = {}
pdf_files: Dict[str, str] = {}
plaintext_files: Dict[str, str] = {}
for file in index_batch_request.files:
file_type = get_file_type(file.path)
for file in files:
file_type = get_file_type(file.content_type)
dict_to_update = None
if file_type == "org":
dict_to_update = org_files
@ -104,9 +85,9 @@ async def index_batch(
dict_to_update = plaintext_files
if dict_to_update is not None:
dict_to_update[file.path] = file.content
dict_to_update[file.filename] = file.file.read().decode("utf-8")
else:
logger.info(f"Skipping unsupported streamed file: {file.path}")
logger.warning(f"Skipped indexing unsupported file type sent by client: {file.filename}")
indexer_input = IndexerInput(
org=org_files,

View file

@ -66,20 +66,24 @@ def merge_dicts(priority_dict: dict, default_dict: dict):
return merged_dict
def get_file_type(filepath: str) -> str:
"Get file type from file path"
file_type = Path(filepath).suffix[1:]
def get_file_type(file_type: str) -> str:
"Get file type from file mime type"
if file_type in ["md", "markdown"]:
file_type = file_type.split(";")[0].strip() if ";" in file_type else file_type
if file_type in ["text/markdown"]:
return "markdown"
elif file_type in ["org", "orgmode"]:
elif file_type in ["text/org"]:
return "org"
elif file_type in ["txt", "text", "html", "xml", "htm", "rst"]:
return "plaintext"
elif file_type in ["pdf"]:
elif file_type in ["application/pdf"]:
return "pdf"
return file_type
elif file_type in ["image/jpeg"]:
return "jpeg"
elif file_type in ["image/png"]:
return "png"
elif file_type in ["text/plain", "text/html", "application/xml", "text/x-rst"]:
return "plaintext"
else:
return "other"
def load_model(