Aggregate webpage extract queries to run once for each distinct webpage

This should reduce webpage read and response generation time.

Previously, we'd run separate webpage read and extract relevant
content pipes for each distinct (query, url) pair.

Now we aggregate all queries for each url to extract information from
and run the webpage read and extract relevant content pipes once for
each distinct url.

Even though the webpage content extraction pipes were previously being
in parallel. They increased response time by
1. adding more context for the response generation chat actor to
   respond from
2. and by being more susceptible to page read and extract latencies of
   the parallel jobs

The aggregated retrieval of context for all queries for a given
webpage could result in some hit to context quality. But it should
improve and reduce variability in response time, quality and costs.
This commit is contained in:
Debanjum Singh Solanky 2024-10-15 16:07:31 -07:00
parent 98f99fa6f8
commit e47922e53a
2 changed files with 31 additions and 26 deletions

View file

@ -88,33 +88,36 @@ async def search_online(
search_results = await asyncio.gather(*search_tasks)
response_dict = {subquery: search_result for subquery, search_result in search_results}
# Gather distinct web page data from organic results of each subquery without an instant answer.
# Gather distinct web pages from organic results for subqueries without an instant answer.
# Content of web pages is directly available when Jina is used for search.
webpages = {
(organic.get("link"), subquery, organic.get("content"))
for subquery in response_dict
for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]
if "answerBox" not in response_dict[subquery]
}
webpages: Dict[str, Dict] = {}
for subquery in response_dict:
if "answerBox" in response_dict[subquery]:
continue
for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]:
link = organic.get("link")
if link in webpages:
webpages[link]["queries"].add(subquery)
else:
webpages[link] = {"queries": {subquery}, "content": organic.get("content")}
# Read, extract relevant info from the retrieved web pages
if webpages:
webpage_links = set([link for link, _, _ in webpages])
logger.info(f"Reading web pages at: {list(webpage_links)}")
logger.info(f"Reading web pages at: {webpages.keys()}")
if send_status_func:
webpage_links_str = "\n- " + "\n- ".join(list(webpage_links))
webpage_links_str = "\n- " + "\n- ".join(webpages.keys())
async for event in send_status_func(f"**Reading web pages**: {webpage_links_str}"):
yield {ChatEvent.STATUS: event}
tasks = [
read_webpage_and_extract_content(subquery, link, content, user=user, agent=agent)
for link, subquery, content in webpages
read_webpage_and_extract_content(data["queries"], link, data["content"], user=user, agent=agent)
for link, data in webpages.items()
]
results = await asyncio.gather(*tasks)
# Collect extracted info from the retrieved web pages
for subquery, webpage_extract, url in results:
for subqueries, url, webpage_extract in results:
if webpage_extract is not None:
response_dict[subquery]["webpages"] = {"link": url, "snippet": webpage_extract}
response_dict[subqueries.pop()]["webpages"] = {"link": url, "snippet": webpage_extract}
yield response_dict
@ -161,26 +164,26 @@ async def read_webpages(
webpage_links_str = "\n- " + "\n- ".join(list(urls))
async for event in send_status_func(f"**Reading web pages**: {webpage_links_str}"):
yield {ChatEvent.STATUS: event}
tasks = [read_webpage_and_extract_content(query, url, user=user, agent=agent) for url in urls]
tasks = [read_webpage_and_extract_content({query}, url, user=user, agent=agent) for url in urls]
results = await asyncio.gather(*tasks)
response: Dict[str, Dict] = defaultdict(dict)
response[query]["webpages"] = [
{"query": q, "link": url, "snippet": web_extract} for q, web_extract, url in results if web_extract is not None
{"query": qs.pop(), "link": url, "snippet": extract} for qs, url, extract in results if extract is not None
]
yield response
async def read_webpage_and_extract_content(
subquery: str, url: str, content: str = None, user: KhojUser = None, agent: Agent = None
) -> Tuple[str, Union[None, str], str]:
subqueries: set[str], url: str, content: str = None, user: KhojUser = None, agent: Agent = None
) -> Tuple[set[str], str, Union[None, str]]:
extracted_info = None
try:
if is_none_or_empty(content):
with timer(f"Reading web page at '{url}' took", logger):
if FIRECRAWL_API_KEY:
if FIRECRAWL_TO_EXTRACT:
extracted_info = await read_webpage_and_extract_content_with_firecrawl(url, subquery, agent)
extracted_info = await read_webpage_and_extract_content_with_firecrawl(url, subqueries, agent)
else:
content = await read_webpage_with_firecrawl(url)
elif OLOSTEP_API_KEY:
@ -189,11 +192,11 @@ async def read_webpage_and_extract_content(
content = await read_webpage_with_jina(url)
if is_none_or_empty(extracted_info):
with timer(f"Extracting relevant information from web page at '{url}' took", logger):
extracted_info = await extract_relevant_info(subquery, content, user=user, agent=agent)
extracted_info = await extract_relevant_info(subqueries, content, user=user, agent=agent)
except Exception as e:
logger.error(f"Failed to read web page at '{url}' with {e}")
return subquery, extracted_info, url
return subqueries, url, extracted_info
async def read_webpage_at_url(web_url: str) -> str:
@ -247,7 +250,7 @@ async def read_webpage_with_firecrawl(web_url: str) -> str:
return response_json["data"]["markdown"]
async def read_webpage_and_extract_content_with_firecrawl(web_url: str, query: str, agent: Agent = None) -> str:
async def read_webpage_and_extract_content_with_firecrawl(web_url: str, queries: set[str], agent: Agent = None) -> str:
firecrawl_api_url = f"{FIRECRAWL_API_URL}/v1/scrape"
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {FIRECRAWL_API_KEY}"}
schema = {
@ -267,7 +270,7 @@ async def read_webpage_and_extract_content_with_firecrawl(web_url: str, query: s
{prompts.system_prompt_extract_relevant_information}
{personality_context}
User Query: {query}
User Query: {", ".join(queries)}
Collate only relevant information from the website to answer the target query and in the provided JSON schema.
""".strip()

View file

@ -551,12 +551,14 @@ async def schedule_query(
raise AssertionError(f"Invalid response for scheduling query: {raw_response}")
async def extract_relevant_info(q: str, corpus: str, user: KhojUser = None, agent: Agent = None) -> Union[str, None]:
async def extract_relevant_info(
qs: set[str], corpus: str, user: KhojUser = None, agent: Agent = None
) -> Union[str, None]:
"""
Extract relevant information for a given query from the target corpus
"""
if is_none_or_empty(corpus) or is_none_or_empty(q):
if is_none_or_empty(corpus) or is_none_or_empty(qs):
return None
personality_context = (
@ -564,7 +566,7 @@ async def extract_relevant_info(q: str, corpus: str, user: KhojUser = None, agen
)
extract_relevant_information = prompts.extract_relevant_information.format(
query=q,
query=", ".join(qs),
corpus=corpus.strip(),
personality_context=personality_context,
)