diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index f1ff8e5d..33589eac 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -1,12 +1,14 @@ +import asyncio import json import logging import os -from typing import Dict, List, Union +from typing import Dict, Union +import aiohttp import requests from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries -from khoj.utils.helpers import is_none_or_empty +from khoj.utils.helpers import is_none_or_empty, timer from khoj.utils.rawconfig import LocationData logger = logging.getLogger(__name__) @@ -21,17 +23,18 @@ OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_QUERY_PARAMS = { "timeout": 35, # seconds "waitBeforeScraping": 1, # seconds - "saveHtml": False, - "saveMarkdown": True, + "saveHtml": "False", + "saveMarkdown": "True", "removeCSSselectors": "default", "htmlTransformer": "none", - "removeImages": True, - "fastLane": True, + "removeImages": "True", + "fastLane": "True", # Similar to Stripe's API, the expand parameters avoid the need to make a second API call # to retrieve the dataset (from the dataset API) if you only need the markdown or html. - "expandMarkdown": True, - "expandHtml": False, + "expandMarkdown": "True", + "expandHtml": "False", } +MAX_WEBPAGES_TO_READ = 1 async def search_with_google(query: str, conversation_history: dict, location: LocationData): @@ -65,52 +68,55 @@ async def search_with_google(query: str, conversation_history: dict, location: L # Breakdown the query into subqueries to get the correct answer subqueries = await generate_online_subqueries(query, conversation_history, location) - response_dict = {} for subquery in subqueries: logger.info(f"Searching with Google for '{subquery}'") response_dict[subquery] = _search_with_google(subquery) - extracted_content: Dict[str, List] = {} - if is_none_or_empty(OLOSTEP_API_KEY): - logger.warning("OLOSTEP_API_KEY is not set. Skipping web scraping.") - return response_dict + # Gather distinct web pages from organic search results of each subquery without an instant answer + webpage_links = { + result["link"] + for subquery in response_dict + for result in response_dict[subquery].get("organic")[:MAX_WEBPAGES_TO_READ] + if is_none_or_empty(response_dict[subquery].get("answerBox")) + } - for subquery in response_dict: - # If a high quality answer is not found, search the web pages of the first 3 organic results - if is_none_or_empty(response_dict[subquery].get("answerBox")): - extracted_content[subquery] = [] - for result in response_dict[subquery].get("organic")[:1]: - logger.info(f"Searching web page of '{result['link']}'") - try: - extracted_content[subquery].append(search_with_olostep(result["link"]).strip()) - except Exception as e: - logger.error(f"Error while searching web page of '{result['link']}': {e}", exc_info=True) - continue - extracted_relevant_content = await extract_relevant_info(subquery, extracted_content) - response_dict[subquery]["extracted_content"] = extracted_relevant_content + # Read, extract relevant info from the retrieved web pages + tasks = [] + for webpage_link in webpage_links: + logger.info(f"Reading web page at '{webpage_link}'") + task = read_webpage_and_extract_content(subquery, webpage_link) + tasks.append(task) + results = await asyncio.gather(*tasks) + + # Collect extracted info from the retrieved web pages + for subquery, extracted_webpage_content in results: + if extracted_webpage_content is not None: + response_dict[subquery]["extracted_content"] = extracted_webpage_content return response_dict -def search_with_olostep(web_url: str) -> str: - if OLOSTEP_API_KEY is None: - raise ValueError("OLOSTEP_API_KEY is not set") +async def read_webpage_and_extract_content(subquery, url): + try: + with timer(f"Reading web page at '{url}' took", logger): + content = await read_webpage_with_olostep(url) + with timer(f"Extracting relevant information from web page at '{url}' took", logger): + extracted_info = await extract_relevant_info(subquery, {subquery: [content.strip()]}) if content else None + return subquery, extracted_info + except Exception as e: + logger.error(f"Failed to read web page at '{url}': {e}", exc_info=True) + return subquery, None + +async def read_webpage_with_olostep(web_url: str) -> str: headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} - web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore web_scraping_params["url"] = web_url - try: - response = requests.request("GET", OLOSTEP_API_URL, params=web_scraping_params, headers=headers) - - if response.status_code != 200: - logger.error(response, exc_info=True) - return None - except Exception as e: - logger.error(f"Error while searching with Olostep: {e}", exc_info=True) - return None - - return response.json()["markdown_content"] + async with aiohttp.ClientSession() as session: + async with session.get(OLOSTEP_API_URL, params=web_scraping_params, headers=headers) as response: + response.raise_for_status() + response_json = await response.json() + return response_json["markdown_content"] diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 215c1430..e48259ad 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest from scipy.stats import linregress from khoj.processor.embeddings import EmbeddingsModel -from khoj.processor.tools.online_search import search_with_olostep +from khoj.processor.tools.online_search import read_webpage_with_olostep from khoj.utils import helpers @@ -90,7 +90,7 @@ def test_olostep_api(): website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" # Act - response = search_with_olostep(website) + response = read_webpage_with_olostep(website) # Assert assert (