Merge pull request #256 from khoj-ai/features/improve-telemetry

Add additional request headers to improve telemetry
This commit is contained in:
sabaimran 2023-06-30 20:35:41 -07:00 committed by GitHub
commit a443af3a71
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 9 deletions

View file

@ -8,8 +8,7 @@ from datetime import datetime
from typing import List, Optional, Union from typing import List, Optional, Union
# External Packages # External Packages
from fastapi import APIRouter from fastapi import APIRouter, HTTPException, Header, Request
from fastapi import HTTPException
from sentence_transformers import util from sentence_transformers import util
# Internal Packages # Internal Packages
@ -135,12 +134,16 @@ async def set_processor_conversation_config_data(updated_config: ConversationPro
@api.get("/search", response_model=List[SearchResponse]) @api.get("/search", response_model=List[SearchResponse])
async def search( async def search(
q: str, q: str,
request: Request,
n: Optional[int] = 5, n: Optional[int] = 5,
t: Optional[SearchType] = SearchType.All, t: Optional[SearchType] = SearchType.All,
r: Optional[bool] = False, r: Optional[bool] = False,
score_threshold: Optional[Union[float, None]] = None, score_threshold: Optional[Union[float, None]] = None,
dedupe: Optional[bool] = True, dedupe: Optional[bool] = True,
client: Optional[str] = None, client: Optional[str] = None,
user_agent: Optional[str] = Header(None),
referer: Optional[str] = Header(None),
host: Optional[str] = Header(None),
): ):
start_time = time.time() start_time = time.time()
@ -323,10 +326,19 @@ async def search(
# Cache results # Cache results
state.query_cache[query_cache_key] = results state.query_cache[query_cache_key] = results
user_state = {
"client": request.client.host,
"user_agent": user_agent,
"referer": referer,
"host": host,
}
# Only log telemetry if query is new and not a continuation of previous query # Only log telemetry if query is new and not a continuation of previous query
if state.previous_query is None or state.previous_query not in user_query: if state.previous_query is None or state.previous_query not in user_query:
state.telemetry += [ state.telemetry += [
log_telemetry(telemetry_type="api", api="search", client=client, app_config=state.config.app) log_telemetry(
telemetry_type="api", api="search", client=client, app_config=state.config.app, properties=user_state
)
] ]
state.previous_query = user_query state.previous_query = user_query
@ -337,7 +349,15 @@ async def search(
@api.get("/update") @api.get("/update")
def update(t: Optional[SearchType] = None, force: Optional[bool] = False, client: Optional[str] = None): def update(
request: Request,
t: Optional[SearchType] = None,
force: Optional[bool] = False,
client: Optional[str] = None,
user_agent: Optional[str] = Header(None),
referer: Optional[str] = Header(None),
host: Optional[str] = Header(None),
):
try: try:
state.search_index_lock.acquire() state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=force or False, t=t) state.model = configure_search(state.model, state.config, regenerate=force or False, t=t)
@ -356,13 +376,31 @@ def update(t: Optional[SearchType] = None, force: Optional[bool] = False, client
else: else:
logger.info("📬 Processor reconfigured via API") logger.info("📬 Processor reconfigured via API")
state.telemetry += [log_telemetry(telemetry_type="api", api="update", client=client, app_config=state.config.app)] user_state = {
"client": request.client.host,
"user_agent": user_agent,
"referer": referer,
"host": host,
}
state.telemetry += [
log_telemetry(
telemetry_type="api", api="update", client=client, app_config=state.config.app, properties=user_state
)
]
return {"status": "ok", "message": "khoj reloaded"} return {"status": "ok", "message": "khoj reloaded"}
@api.get("/chat") @api.get("/chat")
async def chat(q: Optional[str] = None, client: Optional[str] = None): async def chat(
request: Request,
q: Optional[str] = None,
client: Optional[str] = None,
user_agent: Optional[str] = Header(None),
referer: Optional[str] = Header(None),
host: Optional[str] = Header(None),
):
if ( if (
state.processor_config is None state.processor_config is None
or state.processor_config.conversation is None or state.processor_config.conversation is None
@ -398,7 +436,9 @@ async def chat(q: Optional[str] = None, client: Optional[str] = None):
with timer("Searching knowledge base took", logger): with timer("Searching knowledge base took", logger):
result_list = [] result_list = []
for query in inferred_queries: for query in inferred_queries:
result_list.extend(await search(query, n=5, r=True, score_threshold=-5.0, dedupe=False)) result_list.extend(
await search(query, request=request, n=5, r=True, score_threshold=-5.0, dedupe=False)
)
compiled_references = [item.additional["compiled"] for item in result_list] compiled_references = [item.additional["compiled"] for item in result_list]
# Switch to general conversation type if no relevant notes found for the given query # Switch to general conversation type if no relevant notes found for the given query
@ -423,6 +463,17 @@ async def chat(q: Optional[str] = None, client: Optional[str] = None):
conversation_log=meta_log.get("chat", []), conversation_log=meta_log.get("chat", []),
) )
state.telemetry += [log_telemetry(telemetry_type="api", api="chat", client=client, app_config=state.config.app)] user_state = {
"client": request.client.host,
"user_agent": user_agent,
"referer": referer,
"host": host,
}
state.telemetry += [
log_telemetry(
telemetry_type="api", api="chat", client=client, app_config=state.config.app, properties=user_state
)
]
return {"status": status, "response": gpt_response, "context": compiled_references} return {"status": status, "response": gpt_response, "context": compiled_references}

View file

@ -174,7 +174,9 @@ def get_server_id():
return server_id return server_id
def log_telemetry(telemetry_type: str, api: str = None, client: str = None, app_config: AppConfig = None): def log_telemetry(
telemetry_type: str, api: str = None, client: str = None, app_config: AppConfig = None, properties: dict = None
):
"""Log basic app usage telemetry like client, os, api called""" """Log basic app usage telemetry like client, os, api called"""
# Do not log usage telemetry, if telemetry is disabled via app config # Do not log usage telemetry, if telemetry is disabled via app config
if not app_config or not app_config.should_log_telemetry: if not app_config or not app_config.should_log_telemetry:
@ -188,6 +190,7 @@ def log_telemetry(telemetry_type: str, api: str = None, client: str = None, app_
"os": platform.system(), "os": platform.system(),
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
} }
request_body.update(properties or {})
if api: if api:
# API endpoint on server called by client # API endpoint on server called by client
request_body["api"] = api request_body["api"] = api