Increase rate limit in the api/content vs deprecated indexer API

This commit is contained in:
Debanjum Singh Solanky 2024-07-30 16:09:26 +05:30
parent f0bb6883f8
commit f58cff5bcc
2 changed files with 2 additions and 351 deletions

View file

@ -90,7 +90,7 @@ async def put_content(
indexed_data_limiter: ApiIndexedDataLimiter = Depends(
ApiIndexedDataLimiter(
incoming_entries_size_limit=10,
subscribed_incoming_entries_size_limit=25,
subscribed_incoming_entries_size_limit=75,
total_entries_size_limit=10,
subscribed_total_entries_size_limit=100,
)
@ -112,7 +112,7 @@ async def patch_content(
indexed_data_limiter: ApiIndexedDataLimiter = Depends(
ApiIndexedDataLimiter(
incoming_entries_size_limit=10,
subscribed_incoming_entries_size_limit=25,
subscribed_incoming_entries_size_limit=75,
total_entries_size_limit=10,
subscribed_total_entries_size_limit=100,
)

View file

@ -1,349 +0,0 @@
import asyncio
import logging
from typing import Dict, Optional, Union
from fastapi import APIRouter, Depends, Header, Request, Response, UploadFile
from pydantic import BaseModel
from starlette.authentication import requires
from khoj.database.models import GithubConfig, KhojUser, NotionConfig
from khoj.processor.content.docx.docx_to_entries import DocxToEntries
from khoj.processor.content.github.github_to_entries import GithubToEntries
from khoj.processor.content.images.image_to_entries import ImageToEntries
from khoj.processor.content.markdown.markdown_to_entries import MarkdownToEntries
from khoj.processor.content.notion.notion_to_entries import NotionToEntries
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
from khoj.processor.content.pdf.pdf_to_entries import PdfToEntries
from khoj.processor.content.plaintext.plaintext_to_entries import PlaintextToEntries
from khoj.routers.helpers import ApiIndexedDataLimiter, update_telemetry_state
from khoj.search_type import text_search
from khoj.utils import constants, state
from khoj.utils.config import SearchModels
from khoj.utils.helpers import LRU, get_file_type
from khoj.utils.rawconfig import ContentConfig, FullConfig, SearchConfig
from khoj.utils.yaml import save_config_to_file_updated_state
logger = logging.getLogger(__name__)
indexer = APIRouter()
class File(BaseModel):
path: str
content: Union[str, bytes]
class IndexBatchRequest(BaseModel):
files: list[File]
class IndexerInput(BaseModel):
org: Optional[dict[str, str]] = None
markdown: Optional[dict[str, str]] = None
pdf: Optional[dict[str, bytes]] = None
plaintext: Optional[dict[str, str]] = None
image: Optional[dict[str, bytes]] = None
docx: Optional[dict[str, bytes]] = None
@indexer.post("/update")
@requires(["authenticated"])
async def update(
request: Request,
files: list[UploadFile],
force: bool = False,
t: Optional[Union[state.SearchType, str]] = state.SearchType.All,
client: Optional[str] = None,
user_agent: Optional[str] = Header(None),
referer: Optional[str] = Header(None),
host: Optional[str] = Header(None),
indexed_data_limiter: ApiIndexedDataLimiter = Depends(
ApiIndexedDataLimiter(
incoming_entries_size_limit=10,
subscribed_incoming_entries_size_limit=75,
total_entries_size_limit=10,
subscribed_total_entries_size_limit=100,
)
),
):
user = request.user.object
index_files: Dict[str, Dict[str, str]] = {
"org": {},
"markdown": {},
"pdf": {},
"plaintext": {},
"image": {},
"docx": {},
}
try:
logger.info(f"📬 Updating content index via API call by {client} client")
for file in files:
file_content = file.file.read()
file_type, encoding = get_file_type(file.content_type, file_content)
if file_type in index_files:
index_files[file_type][file.filename] = file_content.decode(encoding) if encoding else file_content
else:
logger.warning(f"Skipped indexing unsupported file type sent by {client} client: {file.filename}")
indexer_input = IndexerInput(
org=index_files["org"],
markdown=index_files["markdown"],
pdf=index_files["pdf"],
plaintext=index_files["plaintext"],
image=index_files["image"],
docx=index_files["docx"],
)
if state.config == None:
logger.info("📬 Initializing content index on first run.")
default_full_config = FullConfig(
content_type=None,
search_type=SearchConfig.model_validate(constants.default_config["search-type"]),
processor=None,
)
state.config = default_full_config
default_content_config = ContentConfig(
org=None,
markdown=None,
pdf=None,
docx=None,
image=None,
github=None,
notion=None,
plaintext=None,
)
state.config.content_type = default_content_config
save_config_to_file_updated_state()
configure_search(state.search_models, state.config.search_type)
# Extract required fields from config
loop = asyncio.get_event_loop()
success = await loop.run_in_executor(
None,
configure_content,
indexer_input.model_dump(),
force,
t,
False,
user,
)
if not success:
raise RuntimeError("Failed to update content index")
logger.info(f"Finished processing batch indexing request")
except Exception as e:
logger.error(f"Failed to process batch indexing request: {e}", exc_info=True)
logger.error(
f'🚨 Failed to {"force " if force else ""}update {t} content index triggered via API call by {client} client: {e}',
exc_info=True,
)
return Response(content="Failed", status_code=500)
indexing_metadata = {
"num_org": len(index_files["org"]),
"num_markdown": len(index_files["markdown"]),
"num_pdf": len(index_files["pdf"]),
"num_plaintext": len(index_files["plaintext"]),
"num_image": len(index_files["image"]),
"num_docx": len(index_files["docx"]),
}
update_telemetry_state(
request=request,
telemetry_type="api",
api="index/update",
client=client,
user_agent=user_agent,
referer=referer,
host=host,
metadata=indexing_metadata,
)
logger.info(f"📪 Content index updated via API call by {client} client")
indexed_filenames = ",".join(file for ctype in index_files for file in index_files[ctype]) or ""
return Response(content=indexed_filenames, status_code=200)
def configure_search(search_models: SearchModels, search_config: Optional[SearchConfig]) -> Optional[SearchModels]:
# Run Validation Checks
if search_models is None:
search_models = SearchModels()
return search_models
def configure_content(
files: Optional[dict[str, dict[str, str]]],
regenerate: bool = False,
t: Optional[state.SearchType] = state.SearchType.All,
full_corpus: bool = True,
user: KhojUser = None,
) -> bool:
success = True
if t == None:
t = state.SearchType.All
if t is not None and t in [type.value for type in state.SearchType]:
t = state.SearchType(t)
if t is not None and not t.value in [type.value for type in state.SearchType]:
logger.warning(f"🚨 Invalid search type: {t}")
return False
search_type = t.value if t else None
no_documents = all([not files.get(file_type) for file_type in files])
if files is None:
logger.warning(f"🚨 No files to process for {search_type} search.")
return True
try:
# Initialize Org Notes Search
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Org.value) and files["org"]:
logger.info("🦄 Setting up search for orgmode notes")
# Extract Entries, Generate Notes Embeddings
text_search.setup(
OrgToEntries,
files.get("org"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup org: {e}", exc_info=True)
success = False
try:
# Initialize Markdown Search
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Markdown.value) and files[
"markdown"
]:
logger.info("💎 Setting up search for markdown notes")
# Extract Entries, Generate Markdown Embeddings
text_search.setup(
MarkdownToEntries,
files.get("markdown"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup markdown: {e}", exc_info=True)
success = False
try:
# Initialize PDF Search
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Pdf.value) and files["pdf"]:
logger.info("🖨️ Setting up search for pdf")
# Extract Entries, Generate PDF Embeddings
text_search.setup(
PdfToEntries,
files.get("pdf"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup PDF: {e}", exc_info=True)
success = False
try:
# Initialize Plaintext Search
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Plaintext.value) and files[
"plaintext"
]:
logger.info("📄 Setting up search for plaintext")
# Extract Entries, Generate Plaintext Embeddings
text_search.setup(
PlaintextToEntries,
files.get("plaintext"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup plaintext: {e}", exc_info=True)
success = False
try:
if no_documents:
github_config = GithubConfig.objects.filter(user=user).prefetch_related("githubrepoconfig").first()
if (
search_type == state.SearchType.All.value or search_type == state.SearchType.Github.value
) and github_config is not None:
logger.info("🐙 Setting up search for github")
# Extract Entries, Generate Github Embeddings
text_search.setup(
GithubToEntries,
None,
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
config=github_config,
)
except Exception as e:
logger.error(f"🚨 Failed to setup GitHub: {e}", exc_info=True)
success = False
try:
if no_documents:
# Initialize Notion Search
notion_config = NotionConfig.objects.filter(user=user).first()
if (
search_type == state.SearchType.All.value or search_type == state.SearchType.Notion.value
) and notion_config:
logger.info("🔌 Setting up search for notion")
text_search.setup(
NotionToEntries,
None,
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
config=notion_config,
)
except Exception as e:
logger.error(f"🚨 Failed to setup Notion: {e}", exc_info=True)
success = False
try:
# Initialize Image Search
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Image.value) and files[
"image"
]:
logger.info("🖼️ Setting up search for images")
# Extract Entries, Generate Image Embeddings
text_search.setup(
ImageToEntries,
files.get("image"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup images: {e}", exc_info=True)
success = False
try:
if (search_type == state.SearchType.All.value or search_type == state.SearchType.Docx.value) and files["docx"]:
logger.info("📄 Setting up search for docx")
text_search.setup(
DocxToEntries,
files.get("docx"),
regenerate=regenerate,
full_corpus=full_corpus,
user=user,
)
except Exception as e:
logger.error(f"🚨 Failed to setup docx: {e}", exc_info=True)
success = False
# Invalidate Query Cache
if user:
state.query_cache[user.uuid] = LRU()
return success