diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index 2b4cac65..ea45846b 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -88,33 +88,36 @@ async def search_online( search_results = await asyncio.gather(*search_tasks) response_dict = {subquery: search_result for subquery, search_result in search_results} - # Gather distinct web page data from organic results of each subquery without an instant answer. + # Gather distinct web pages from organic results for subqueries without an instant answer. # Content of web pages is directly available when Jina is used for search. - webpages = { - (organic.get("link"), subquery, organic.get("content")) - for subquery in response_dict - for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] - if "answerBox" not in response_dict[subquery] - } + webpages: Dict[str, Dict] = {} + for subquery in response_dict: + if "answerBox" in response_dict[subquery]: + continue + for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]: + link = organic.get("link") + if link in webpages: + webpages[link]["queries"].add(subquery) + else: + webpages[link] = {"queries": {subquery}, "content": organic.get("content")} # Read, extract relevant info from the retrieved web pages if webpages: - webpage_links = set([link for link, _, _ in webpages]) - logger.info(f"Reading web pages at: {list(webpage_links)}") + logger.info(f"Reading web pages at: {webpages.keys()}") if send_status_func: - webpage_links_str = "\n- " + "\n- ".join(list(webpage_links)) + webpage_links_str = "\n- " + "\n- ".join(webpages.keys()) async for event in send_status_func(f"**Reading web pages**: {webpage_links_str}"): yield {ChatEvent.STATUS: event} tasks = [ - read_webpage_and_extract_content(subquery, link, content, user=user, agent=agent) - for link, subquery, content in webpages + read_webpage_and_extract_content(data["queries"], link, data["content"], user=user, agent=agent) + for link, data in webpages.items() ] results = await asyncio.gather(*tasks) # Collect extracted info from the retrieved web pages - for subquery, webpage_extract, url in results: + for subqueries, url, webpage_extract in results: if webpage_extract is not None: - response_dict[subquery]["webpages"] = {"link": url, "snippet": webpage_extract} + response_dict[subqueries.pop()]["webpages"] = {"link": url, "snippet": webpage_extract} yield response_dict @@ -161,26 +164,26 @@ async def read_webpages( webpage_links_str = "\n- " + "\n- ".join(list(urls)) async for event in send_status_func(f"**Reading web pages**: {webpage_links_str}"): yield {ChatEvent.STATUS: event} - tasks = [read_webpage_and_extract_content(query, url, user=user, agent=agent) for url in urls] + tasks = [read_webpage_and_extract_content({query}, url, user=user, agent=agent) for url in urls] results = await asyncio.gather(*tasks) response: Dict[str, Dict] = defaultdict(dict) response[query]["webpages"] = [ - {"query": q, "link": url, "snippet": web_extract} for q, web_extract, url in results if web_extract is not None + {"query": qs.pop(), "link": url, "snippet": extract} for qs, url, extract in results if extract is not None ] yield response async def read_webpage_and_extract_content( - subquery: str, url: str, content: str = None, user: KhojUser = None, agent: Agent = None -) -> Tuple[str, Union[None, str], str]: + subqueries: set[str], url: str, content: str = None, user: KhojUser = None, agent: Agent = None +) -> Tuple[set[str], str, Union[None, str]]: extracted_info = None try: if is_none_or_empty(content): with timer(f"Reading web page at '{url}' took", logger): if FIRECRAWL_API_KEY: if FIRECRAWL_TO_EXTRACT: - extracted_info = await read_webpage_and_extract_content_with_firecrawl(url, subquery, agent) + extracted_info = await read_webpage_and_extract_content_with_firecrawl(url, subqueries, agent) else: content = await read_webpage_with_firecrawl(url) elif OLOSTEP_API_KEY: @@ -189,11 +192,11 @@ async def read_webpage_and_extract_content( content = await read_webpage_with_jina(url) if is_none_or_empty(extracted_info): with timer(f"Extracting relevant information from web page at '{url}' took", logger): - extracted_info = await extract_relevant_info(subquery, content, user=user, agent=agent) + extracted_info = await extract_relevant_info(subqueries, content, user=user, agent=agent) except Exception as e: logger.error(f"Failed to read web page at '{url}' with {e}") - return subquery, extracted_info, url + return subqueries, url, extracted_info async def read_webpage_at_url(web_url: str) -> str: @@ -247,7 +250,7 @@ async def read_webpage_with_firecrawl(web_url: str) -> str: return response_json["data"]["markdown"] -async def read_webpage_and_extract_content_with_firecrawl(web_url: str, query: str, agent: Agent = None) -> str: +async def read_webpage_and_extract_content_with_firecrawl(web_url: str, queries: set[str], agent: Agent = None) -> str: firecrawl_api_url = f"{FIRECRAWL_API_URL}/v1/scrape" headers = {"Content-Type": "application/json", "Authorization": f"Bearer {FIRECRAWL_API_KEY}"} schema = { @@ -267,7 +270,7 @@ async def read_webpage_and_extract_content_with_firecrawl(web_url: str, query: s {prompts.system_prompt_extract_relevant_information} {personality_context} -User Query: {query} +User Query: {", ".join(queries)} Collate only relevant information from the website to answer the target query and in the provided JSON schema. """.strip() diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index a80864ba..4edef61d 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -551,12 +551,14 @@ async def schedule_query( raise AssertionError(f"Invalid response for scheduling query: {raw_response}") -async def extract_relevant_info(q: str, corpus: str, user: KhojUser = None, agent: Agent = None) -> Union[str, None]: +async def extract_relevant_info( + qs: set[str], corpus: str, user: KhojUser = None, agent: Agent = None +) -> Union[str, None]: """ Extract relevant information for a given query from the target corpus """ - if is_none_or_empty(corpus) or is_none_or_empty(q): + if is_none_or_empty(corpus) or is_none_or_empty(qs): return None personality_context = ( @@ -564,7 +566,7 @@ async def extract_relevant_info(q: str, corpus: str, user: KhojUser = None, agen ) extract_relevant_information = prompts.extract_relevant_information.format( - query=q, + query=", ".join(qs), corpus=corpus.strip(), personality_context=personality_context, )