Read, extract information from web pages in parallel to lower response time

- Time reading webpage, extract info from webpage steps for perf
  analysis
- Deduplicate webpages to read gathered across separate google
  searches
- Use aiohttp to make API requests non-blocking, pair with asyncio to
  parallelize all the online search webpage read and extract calls
This commit is contained in:
Debanjum Singh Solanky 2024-03-08 16:41:19 +05:30
parent b7fad04870
commit ca2f962e95
2 changed files with 49 additions and 43 deletions

View file

@ -1,12 +1,14 @@
import asyncio
import json import json
import logging import logging
import os import os
from typing import Dict, List, Union from typing import Dict, Union
import aiohttp
import requests import requests
from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries
from khoj.utils.helpers import is_none_or_empty from khoj.utils.helpers import is_none_or_empty, timer
from khoj.utils.rawconfig import LocationData from khoj.utils.rawconfig import LocationData
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,17 +23,18 @@ OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI"
OLOSTEP_QUERY_PARAMS = { OLOSTEP_QUERY_PARAMS = {
"timeout": 35, # seconds "timeout": 35, # seconds
"waitBeforeScraping": 1, # seconds "waitBeforeScraping": 1, # seconds
"saveHtml": False, "saveHtml": "False",
"saveMarkdown": True, "saveMarkdown": "True",
"removeCSSselectors": "default", "removeCSSselectors": "default",
"htmlTransformer": "none", "htmlTransformer": "none",
"removeImages": True, "removeImages": "True",
"fastLane": True, "fastLane": "True",
# Similar to Stripe's API, the expand parameters avoid the need to make a second API call # Similar to Stripe's API, the expand parameters avoid the need to make a second API call
# to retrieve the dataset (from the dataset API) if you only need the markdown or html. # to retrieve the dataset (from the dataset API) if you only need the markdown or html.
"expandMarkdown": True, "expandMarkdown": "True",
"expandHtml": False, "expandHtml": "False",
} }
MAX_WEBPAGES_TO_READ = 1
async def search_with_google(query: str, conversation_history: dict, location: LocationData): async def search_with_google(query: str, conversation_history: dict, location: LocationData):
@ -65,52 +68,55 @@ async def search_with_google(query: str, conversation_history: dict, location: L
# Breakdown the query into subqueries to get the correct answer # Breakdown the query into subqueries to get the correct answer
subqueries = await generate_online_subqueries(query, conversation_history, location) subqueries = await generate_online_subqueries(query, conversation_history, location)
response_dict = {} response_dict = {}
for subquery in subqueries: for subquery in subqueries:
logger.info(f"Searching with Google for '{subquery}'") logger.info(f"Searching with Google for '{subquery}'")
response_dict[subquery] = _search_with_google(subquery) response_dict[subquery] = _search_with_google(subquery)
extracted_content: Dict[str, List] = {} # Gather distinct web pages from organic search results of each subquery without an instant answer
if is_none_or_empty(OLOSTEP_API_KEY): webpage_links = {
logger.warning("OLOSTEP_API_KEY is not set. Skipping web scraping.") result["link"]
return response_dict for subquery in response_dict
for result in response_dict[subquery].get("organic")[:MAX_WEBPAGES_TO_READ]
if is_none_or_empty(response_dict[subquery].get("answerBox"))
}
for subquery in response_dict: # Read, extract relevant info from the retrieved web pages
# If a high quality answer is not found, search the web pages of the first 3 organic results tasks = []
if is_none_or_empty(response_dict[subquery].get("answerBox")): for webpage_link in webpage_links:
extracted_content[subquery] = [] logger.info(f"Reading web page at '{webpage_link}'")
for result in response_dict[subquery].get("organic")[:1]: task = read_webpage_and_extract_content(subquery, webpage_link)
logger.info(f"Searching web page of '{result['link']}'") tasks.append(task)
try: results = await asyncio.gather(*tasks)
extracted_content[subquery].append(search_with_olostep(result["link"]).strip())
except Exception as e: # Collect extracted info from the retrieved web pages
logger.error(f"Error while searching web page of '{result['link']}': {e}", exc_info=True) for subquery, extracted_webpage_content in results:
continue if extracted_webpage_content is not None:
extracted_relevant_content = await extract_relevant_info(subquery, extracted_content) response_dict[subquery]["extracted_content"] = extracted_webpage_content
response_dict[subquery]["extracted_content"] = extracted_relevant_content
return response_dict return response_dict
def search_with_olostep(web_url: str) -> str: async def read_webpage_and_extract_content(subquery, url):
if OLOSTEP_API_KEY is None: try:
raise ValueError("OLOSTEP_API_KEY is not set") with timer(f"Reading web page at '{url}' took", logger):
content = await read_webpage_with_olostep(url)
with timer(f"Extracting relevant information from web page at '{url}' took", logger):
extracted_info = await extract_relevant_info(subquery, {subquery: [content.strip()]}) if content else None
return subquery, extracted_info
except Exception as e:
logger.error(f"Failed to read web page at '{url}': {e}", exc_info=True)
return subquery, None
async def read_webpage_with_olostep(web_url: str) -> str:
headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"}
web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore
web_scraping_params["url"] = web_url web_scraping_params["url"] = web_url
try: async with aiohttp.ClientSession() as session:
response = requests.request("GET", OLOSTEP_API_URL, params=web_scraping_params, headers=headers) async with session.get(OLOSTEP_API_URL, params=web_scraping_params, headers=headers) as response:
response.raise_for_status()
if response.status_code != 200: response_json = await response.json()
logger.error(response, exc_info=True) return response_json["markdown_content"]
return None
except Exception as e:
logger.error(f"Error while searching with Olostep: {e}", exc_info=True)
return None
return response.json()["markdown_content"]

View file

@ -7,7 +7,7 @@ import pytest
from scipy.stats import linregress from scipy.stats import linregress
from khoj.processor.embeddings import EmbeddingsModel from khoj.processor.embeddings import EmbeddingsModel
from khoj.processor.tools.online_search import search_with_olostep from khoj.processor.tools.online_search import read_webpage_with_olostep
from khoj.utils import helpers from khoj.utils import helpers
@ -90,7 +90,7 @@ def test_olostep_api():
website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire"
# Act # Act
response = search_with_olostep(website) response = read_webpage_with_olostep(website)
# Assert # Assert
assert ( assert (