From ca2f962e951a31c319d710ca2472c221975c81f8 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Fri, 8 Mar 2024 16:41:19 +0530 Subject: [PATCH 1/4] Read, extract information from web pages in parallel to lower response time - Time reading webpage, extract info from webpage steps for perf analysis - Deduplicate webpages to read gathered across separate google searches - Use aiohttp to make API requests non-blocking, pair with asyncio to parallelize all the online search webpage read and extract calls --- src/khoj/processor/tools/online_search.py | 88 ++++++++++++----------- tests/test_helpers.py | 4 +- 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index f1ff8e5d..33589eac 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -1,12 +1,14 @@ +import asyncio import json import logging import os -from typing import Dict, List, Union +from typing import Dict, Union +import aiohttp import requests from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries -from khoj.utils.helpers import is_none_or_empty +from khoj.utils.helpers import is_none_or_empty, timer from khoj.utils.rawconfig import LocationData logger = logging.getLogger(__name__) @@ -21,17 +23,18 @@ OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_QUERY_PARAMS = { "timeout": 35, # seconds "waitBeforeScraping": 1, # seconds - "saveHtml": False, - "saveMarkdown": True, + "saveHtml": "False", + "saveMarkdown": "True", "removeCSSselectors": "default", "htmlTransformer": "none", - "removeImages": True, - "fastLane": True, + "removeImages": "True", + "fastLane": "True", # Similar to Stripe's API, the expand parameters avoid the need to make a second API call # to retrieve the dataset (from the dataset API) if you only need the markdown or html. - "expandMarkdown": True, - "expandHtml": False, + "expandMarkdown": "True", + "expandHtml": "False", } +MAX_WEBPAGES_TO_READ = 1 async def search_with_google(query: str, conversation_history: dict, location: LocationData): @@ -65,52 +68,55 @@ async def search_with_google(query: str, conversation_history: dict, location: L # Breakdown the query into subqueries to get the correct answer subqueries = await generate_online_subqueries(query, conversation_history, location) - response_dict = {} for subquery in subqueries: logger.info(f"Searching with Google for '{subquery}'") response_dict[subquery] = _search_with_google(subquery) - extracted_content: Dict[str, List] = {} - if is_none_or_empty(OLOSTEP_API_KEY): - logger.warning("OLOSTEP_API_KEY is not set. Skipping web scraping.") - return response_dict + # Gather distinct web pages from organic search results of each subquery without an instant answer + webpage_links = { + result["link"] + for subquery in response_dict + for result in response_dict[subquery].get("organic")[:MAX_WEBPAGES_TO_READ] + if is_none_or_empty(response_dict[subquery].get("answerBox")) + } - for subquery in response_dict: - # If a high quality answer is not found, search the web pages of the first 3 organic results - if is_none_or_empty(response_dict[subquery].get("answerBox")): - extracted_content[subquery] = [] - for result in response_dict[subquery].get("organic")[:1]: - logger.info(f"Searching web page of '{result['link']}'") - try: - extracted_content[subquery].append(search_with_olostep(result["link"]).strip()) - except Exception as e: - logger.error(f"Error while searching web page of '{result['link']}': {e}", exc_info=True) - continue - extracted_relevant_content = await extract_relevant_info(subquery, extracted_content) - response_dict[subquery]["extracted_content"] = extracted_relevant_content + # Read, extract relevant info from the retrieved web pages + tasks = [] + for webpage_link in webpage_links: + logger.info(f"Reading web page at '{webpage_link}'") + task = read_webpage_and_extract_content(subquery, webpage_link) + tasks.append(task) + results = await asyncio.gather(*tasks) + + # Collect extracted info from the retrieved web pages + for subquery, extracted_webpage_content in results: + if extracted_webpage_content is not None: + response_dict[subquery]["extracted_content"] = extracted_webpage_content return response_dict -def search_with_olostep(web_url: str) -> str: - if OLOSTEP_API_KEY is None: - raise ValueError("OLOSTEP_API_KEY is not set") +async def read_webpage_and_extract_content(subquery, url): + try: + with timer(f"Reading web page at '{url}' took", logger): + content = await read_webpage_with_olostep(url) + with timer(f"Extracting relevant information from web page at '{url}' took", logger): + extracted_info = await extract_relevant_info(subquery, {subquery: [content.strip()]}) if content else None + return subquery, extracted_info + except Exception as e: + logger.error(f"Failed to read web page at '{url}': {e}", exc_info=True) + return subquery, None + +async def read_webpage_with_olostep(web_url: str) -> str: headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} - web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore web_scraping_params["url"] = web_url - try: - response = requests.request("GET", OLOSTEP_API_URL, params=web_scraping_params, headers=headers) - - if response.status_code != 200: - logger.error(response, exc_info=True) - return None - except Exception as e: - logger.error(f"Error while searching with Olostep: {e}", exc_info=True) - return None - - return response.json()["markdown_content"] + async with aiohttp.ClientSession() as session: + async with session.get(OLOSTEP_API_URL, params=web_scraping_params, headers=headers) as response: + response.raise_for_status() + response_json = await response.json() + return response_json["markdown_content"] diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 215c1430..e48259ad 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest from scipy.stats import linregress from khoj.processor.embeddings import EmbeddingsModel -from khoj.processor.tools.online_search import search_with_olostep +from khoj.processor.tools.online_search import read_webpage_with_olostep from khoj.utils import helpers @@ -90,7 +90,7 @@ def test_olostep_api(): website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" # Act - response = search_with_olostep(website) + response = read_webpage_with_olostep(website) # Assert assert ( From 88f096977b22143a7ca02e4c707e5897c58d5c52 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 10 Mar 2024 00:08:48 +0530 Subject: [PATCH 2/4] Read webpages directly when Olostep proxy not setup This is useful for self-hosted, individual user, low traffic setups where a proxy service is not required --- pyproject.toml | 4 ++-- src/khoj/processor/tools/online_search.py | 18 ++++++++++++++++- tests/test_helpers.py | 24 +++++++++++++++++++---- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 17003c6c..63e254c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ classifiers = [ "Topic :: Text Processing :: Linguistic", ] dependencies = [ - "bs4 >= 0.0.1", + "beautifulsoup4 ~= 4.12.3", "dateparser >= 1.1.1", "defusedxml == 0.7.1", "fastapi >= 0.104.1", @@ -58,7 +58,6 @@ dependencies = [ "langchain <= 0.2.0", "langchain-openai >= 0.0.5", "requests >= 2.26.0", - "bs4 >= 0.0.1", "anyio == 3.7.1", "pymupdf >= 1.23.5", "django == 4.2.10", @@ -76,6 +75,7 @@ dependencies = [ "openai-whisper >= 20231117", "django-phonenumber-field == 7.3.0", "phonenumbers == 8.13.27", + "markdownify ~= 0.11.6", ] dynamic = ["version"] diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index 33589eac..f0436e2b 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -6,6 +6,8 @@ from typing import Dict, Union import aiohttp import requests +from bs4 import BeautifulSoup +from markdownify import markdownify from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries from khoj.utils.helpers import is_none_or_empty, timer @@ -101,7 +103,7 @@ async def search_with_google(query: str, conversation_history: dict, location: L async def read_webpage_and_extract_content(subquery, url): try: with timer(f"Reading web page at '{url}' took", logger): - content = await read_webpage_with_olostep(url) + content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage(url) with timer(f"Extracting relevant information from web page at '{url}' took", logger): extracted_info = await extract_relevant_info(subquery, {subquery: [content.strip()]}) if content else None return subquery, extracted_info @@ -110,6 +112,20 @@ async def read_webpage_and_extract_content(subquery, url): return subquery, None +async def read_webpage(web_url: str) -> str: + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36", + } + + async with aiohttp.ClientSession() as session: + async with session.get(web_url, headers=headers, timeout=30) as response: + response.raise_for_status() + html = await response.text() + parsed_html = BeautifulSoup(html, "html.parser") + body = parsed_html.body.get_text(separator="\n", strip=True) + return markdownify(body) + + async def read_webpage_with_olostep(web_url: str) -> str: headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore diff --git a/tests/test_helpers.py b/tests/test_helpers.py index e48259ad..086e4895 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest from scipy.stats import linregress from khoj.processor.embeddings import EmbeddingsModel -from khoj.processor.tools.online_search import read_webpage_with_olostep +from khoj.processor.tools.online_search import read_webpage, read_webpage_with_olostep from khoj.utils import helpers @@ -84,13 +84,29 @@ def test_encode_docs_memory_leak(): assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration" -@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set") -def test_olostep_api(): +@pytest.mark.asyncio +async def test_reading_webpage(): # Arrange website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" # Act - response = read_webpage_with_olostep(website) + response = await read_webpage(website) + + # Assert + assert ( + "An alarm sent from the area near the fire also failed to register at the courthouse where the fire watchmen were" + in response + ) + + +@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set") +@pytest.mark.asyncio +async def test_reading_webpage_with_olostep(): + # Arrange + website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" + + # Act + response = await read_webpage_with_olostep(website) # Assert assert ( From d136a6be44edbb8903846ea97e8e797dca227c41 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 10 Mar 2024 02:09:11 +0530 Subject: [PATCH 3/4] Simplify, modularize and add type hints to online search functions - Simplify content arg to `extract_relevant_info' function. Validate, clean the content arg inside the `extract_relevant_info' function - Extract `search_with_google' function outside the parent function - Call the parent function a more appropriate `search_online' instead of `search_with_google' - Simplify the `search_with_google' function using list comprehension. Drop empty search result fields from chat model context for response to reduce cost and response latency - No need to show stacktrace when unable to read webpage, basic error is enough - Add type hints to online search functions to catch issues with mypy --- .../conversation/offline/chat_model.py | 2 +- .../processor/conversation/openai/utils.py | 2 +- src/khoj/processor/tools/online_search.py | 63 +++++++++---------- src/khoj/routers/api_chat.py | 4 +- src/khoj/routers/helpers.py | 10 +-- 5 files changed, 38 insertions(+), 43 deletions(-) diff --git a/src/khoj/processor/conversation/offline/chat_model.py b/src/khoj/processor/conversation/offline/chat_model.py index 799d6d31..437bdd3d 100644 --- a/src/khoj/processor/conversation/offline/chat_model.py +++ b/src/khoj/processor/conversation/offline/chat_model.py @@ -247,7 +247,7 @@ def llm_thread(g, messages: List[ChatMessage], model: Any): def send_message_to_model_offline( message, loaded_model=None, model="mistral-7b-instruct-v0.1.Q4_0.gguf", streaming=False, system_message="" -): +) -> str: try: from gpt4all import GPT4All except ModuleNotFoundError as e: diff --git a/src/khoj/processor/conversation/openai/utils.py b/src/khoj/processor/conversation/openai/utils.py index 00ad74ce..c7c38d46 100644 --- a/src/khoj/processor/conversation/openai/utils.py +++ b/src/khoj/processor/conversation/openai/utils.py @@ -43,7 +43,7 @@ class StreamingChatCallbackHandler(StreamingStdOutCallbackHandler): before_sleep=before_sleep_log(logger, logging.DEBUG), reraise=True, ) -def completion_with_backoff(**kwargs): +def completion_with_backoff(**kwargs) -> str: messages = kwargs.pop("messages") if not "openai_api_key" in kwargs: kwargs["openai_api_key"] = os.getenv("OPENAI_API_KEY") diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index f0436e2b..597f394e 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -2,7 +2,7 @@ import asyncio import json import logging import os -from typing import Dict, Union +from typing import Dict, Tuple, Union import aiohttp import requests @@ -16,12 +16,10 @@ from khoj.utils.rawconfig import LocationData logger = logging.getLogger(__name__) SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") -OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") - SERPER_DEV_URL = "https://google.serper.dev/search" +OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" - OLOSTEP_QUERY_PARAMS = { "timeout": 35, # seconds "waitBeforeScraping": 1, # seconds @@ -39,31 +37,7 @@ OLOSTEP_QUERY_PARAMS = { MAX_WEBPAGES_TO_READ = 1 -async def search_with_google(query: str, conversation_history: dict, location: LocationData): - def _search_with_google(subquery: str): - payload = json.dumps( - { - "q": subquery, - } - ) - - headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"} - - response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload) - - if response.status_code != 200: - logger.error(response.text) - return {} - - json_response = response.json() - sub_response_dict = {} - sub_response_dict["knowledgeGraph"] = json_response.get("knowledgeGraph", {}) - sub_response_dict["organic"] = json_response.get("organic", []) - sub_response_dict["answerBox"] = json_response.get("answerBox", []) - sub_response_dict["peopleAlsoAsk"] = json_response.get("peopleAlsoAsk", []) - - return sub_response_dict - +async def search_online(query: str, conversation_history: dict, location: LocationData): if SERPER_DEV_API_KEY is None: logger.warn("SERPER_DEV_API_KEY is not set") return {} @@ -74,14 +48,14 @@ async def search_with_google(query: str, conversation_history: dict, location: L for subquery in subqueries: logger.info(f"Searching with Google for '{subquery}'") - response_dict[subquery] = _search_with_google(subquery) + response_dict[subquery] = search_with_google(subquery) # Gather distinct web pages from organic search results of each subquery without an instant answer webpage_links = { result["link"] for subquery in response_dict - for result in response_dict[subquery].get("organic")[:MAX_WEBPAGES_TO_READ] - if is_none_or_empty(response_dict[subquery].get("answerBox")) + for result in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] + if "answerBox" not in response_dict[subquery] } # Read, extract relevant info from the retrieved web pages @@ -100,15 +74,34 @@ async def search_with_google(query: str, conversation_history: dict, location: L return response_dict -async def read_webpage_and_extract_content(subquery, url): +def search_with_google(subquery: str): + payload = json.dumps({"q": subquery}) + headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"} + + response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload) + + if response.status_code != 200: + logger.error(response.text) + return {} + + json_response = response.json() + extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"] + extracted_search_result = { + field: json_response[field] for field in extraction_fields if not is_none_or_empty(json_response.get(field)) + } + + return extracted_search_result + + +async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str]]: try: with timer(f"Reading web page at '{url}' took", logger): content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage(url) with timer(f"Extracting relevant information from web page at '{url}' took", logger): - extracted_info = await extract_relevant_info(subquery, {subquery: [content.strip()]}) if content else None + extracted_info = await extract_relevant_info(subquery, content) return subquery, extracted_info except Exception as e: - logger.error(f"Failed to read web page at '{url}': {e}", exc_info=True) + logger.error(f"Failed to read web page at '{url}' with {e}") return subquery, None diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 3c170f6f..fa13c12d 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -14,7 +14,7 @@ from khoj.database.adapters import ConversationAdapters, EntryAdapters, aget_use from khoj.database.models import KhojUser from khoj.processor.conversation.prompts import help_message, no_entries_found from khoj.processor.conversation.utils import save_to_conversation_log -from khoj.processor.tools.online_search import search_with_google +from khoj.processor.tools.online_search import search_online from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, @@ -284,7 +284,7 @@ async def chat( if ConversationCommand.Online in conversation_commands: try: - online_results = await search_with_google(defiltered_query, meta_log, location) + online_results = await search_online(defiltered_query, meta_log, location) except ValueError as e: return StreamingResponse( iter(["Please set your SERPER_DEV_API_KEY to get started with online searches 🌐"]), diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index f8c355fa..85cf9a55 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -256,15 +256,17 @@ async def generate_online_subqueries(q: str, conversation_history: dict, locatio return [q] -async def extract_relevant_info(q: str, corpus: dict) -> List[str]: +async def extract_relevant_info(q: str, corpus: str) -> Union[str, None]: """ - Given a target corpus, extract the most relevant info given a query + Extract relevant information for a given query from the target corpus """ - key = list(corpus.keys())[0] + if is_none_or_empty(corpus) or is_none_or_empty(q): + return None + extract_relevant_information = prompts.extract_relevant_information.format( query=q, - corpus=corpus[key], + corpus=corpus.strip(), ) response = await send_message_to_model_wrapper( From dc86e44a0748b0b17f3507601da4728de3708487 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 10 Mar 2024 02:34:46 +0530 Subject: [PATCH 4/4] Include search results & webpage content in online context for chat response Previously if a web page was read for a sub-query, only the extracted web page content was provided as context for the given sub-query. But the google results themselves have relevant snippets. So include them --- src/khoj/processor/conversation/openai/gpt.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/khoj/processor/conversation/openai/gpt.py b/src/khoj/processor/conversation/openai/gpt.py index 3ed1ff90..90b636f3 100644 --- a/src/khoj/processor/conversation/openai/gpt.py +++ b/src/khoj/processor/conversation/openai/gpt.py @@ -146,12 +146,9 @@ def converse( return iter([prompts.no_online_results_found.format()]) if ConversationCommand.Online in conversation_commands: - simplified_online_results = online_results.copy() - for result in online_results: - if online_results[result].get("extracted_content"): - simplified_online_results[result] = online_results[result]["extracted_content"] - - conversation_primer = f"{prompts.online_search_conversation.format(online_results=str(simplified_online_results))}\n{conversation_primer}" + conversation_primer = ( + f"{prompts.online_search_conversation.format(online_results=str(online_results))}\n{conversation_primer}" + ) if not is_none_or_empty(compiled_references): conversation_primer = f"{prompts.notes_conversation.format(query=user_query, references=compiled_references)}\n{conversation_primer}"