Improve Online Search Speed and Context (#670)

### Major
- Read web pages in parallel to improve chat response time
- Read web pages directly when Olostep proxy not setup
- Include search results & web page content in online context for chat response

### Minor
- Simplify, modularize and add type hints to online search functions
This commit is contained in:
Debanjum 2024-03-11 22:16:30 +05:30 committed by GitHub
commit 3abe7ccb26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 120 additions and 90 deletions

View file

@ -36,7 +36,7 @@ classifiers = [
"Topic :: Text Processing :: Linguistic", "Topic :: Text Processing :: Linguistic",
] ]
dependencies = [ dependencies = [
"bs4 >= 0.0.1", "beautifulsoup4 ~= 4.12.3",
"dateparser >= 1.1.1", "dateparser >= 1.1.1",
"defusedxml == 0.7.1", "defusedxml == 0.7.1",
"fastapi >= 0.104.1", "fastapi >= 0.104.1",
@ -58,7 +58,6 @@ dependencies = [
"langchain <= 0.2.0", "langchain <= 0.2.0",
"langchain-openai >= 0.0.5", "langchain-openai >= 0.0.5",
"requests >= 2.26.0", "requests >= 2.26.0",
"bs4 >= 0.0.1",
"anyio == 3.7.1", "anyio == 3.7.1",
"pymupdf >= 1.23.5", "pymupdf >= 1.23.5",
"django == 4.2.10", "django == 4.2.10",
@ -76,6 +75,7 @@ dependencies = [
"openai-whisper >= 20231117", "openai-whisper >= 20231117",
"django-phonenumber-field == 7.3.0", "django-phonenumber-field == 7.3.0",
"phonenumbers == 8.13.27", "phonenumbers == 8.13.27",
"markdownify ~= 0.11.6",
] ]
dynamic = ["version"] dynamic = ["version"]

View file

@ -247,7 +247,7 @@ def llm_thread(g, messages: List[ChatMessage], model: Any):
def send_message_to_model_offline( def send_message_to_model_offline(
message, loaded_model=None, model="mistral-7b-instruct-v0.1.Q4_0.gguf", streaming=False, system_message="" message, loaded_model=None, model="mistral-7b-instruct-v0.1.Q4_0.gguf", streaming=False, system_message=""
): ) -> str:
try: try:
from gpt4all import GPT4All from gpt4all import GPT4All
except ModuleNotFoundError as e: except ModuleNotFoundError as e:

View file

@ -146,12 +146,9 @@ def converse(
return iter([prompts.no_online_results_found.format()]) return iter([prompts.no_online_results_found.format()])
if ConversationCommand.Online in conversation_commands: if ConversationCommand.Online in conversation_commands:
simplified_online_results = online_results.copy() conversation_primer = (
for result in online_results: f"{prompts.online_search_conversation.format(online_results=str(online_results))}\n{conversation_primer}"
if online_results[result].get("extracted_content"): )
simplified_online_results[result] = online_results[result]["extracted_content"]
conversation_primer = f"{prompts.online_search_conversation.format(online_results=str(simplified_online_results))}\n{conversation_primer}"
if not is_none_or_empty(compiled_references): if not is_none_or_empty(compiled_references):
conversation_primer = f"{prompts.notes_conversation.format(query=user_query, references=compiled_references)}\n{conversation_primer}" conversation_primer = f"{prompts.notes_conversation.format(query=user_query, references=compiled_references)}\n{conversation_primer}"

View file

@ -43,7 +43,7 @@ class StreamingChatCallbackHandler(StreamingStdOutCallbackHandler):
before_sleep=before_sleep_log(logger, logging.DEBUG), before_sleep=before_sleep_log(logger, logging.DEBUG),
reraise=True, reraise=True,
) )
def completion_with_backoff(**kwargs): def completion_with_backoff(**kwargs) -> str:
messages = kwargs.pop("messages") messages = kwargs.pop("messages")
if not "openai_api_key" in kwargs: if not "openai_api_key" in kwargs:
kwargs["openai_api_key"] = os.getenv("OPENAI_API_KEY") kwargs["openai_api_key"] = os.getenv("OPENAI_API_KEY")

View file

@ -1,116 +1,131 @@
import asyncio
import json import json
import logging import logging
import os import os
from typing import Dict, List, Union from typing import Dict, Tuple, Union
import aiohttp
import requests import requests
from bs4 import BeautifulSoup
from markdownify import markdownify
from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries from khoj.routers.helpers import extract_relevant_info, generate_online_subqueries
from khoj.utils.helpers import is_none_or_empty from khoj.utils.helpers import is_none_or_empty, timer
from khoj.utils.rawconfig import LocationData from khoj.utils.rawconfig import LocationData
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY")
OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY")
SERPER_DEV_URL = "https://google.serper.dev/search" SERPER_DEV_URL = "https://google.serper.dev/search"
OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY")
OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI"
OLOSTEP_QUERY_PARAMS = { OLOSTEP_QUERY_PARAMS = {
"timeout": 35, # seconds "timeout": 35, # seconds
"waitBeforeScraping": 1, # seconds "waitBeforeScraping": 1, # seconds
"saveHtml": False, "saveHtml": "False",
"saveMarkdown": True, "saveMarkdown": "True",
"removeCSSselectors": "default", "removeCSSselectors": "default",
"htmlTransformer": "none", "htmlTransformer": "none",
"removeImages": True, "removeImages": "True",
"fastLane": True, "fastLane": "True",
# Similar to Stripe's API, the expand parameters avoid the need to make a second API call # Similar to Stripe's API, the expand parameters avoid the need to make a second API call
# to retrieve the dataset (from the dataset API) if you only need the markdown or html. # to retrieve the dataset (from the dataset API) if you only need the markdown or html.
"expandMarkdown": True, "expandMarkdown": "True",
"expandHtml": False, "expandHtml": "False",
} }
MAX_WEBPAGES_TO_READ = 1
async def search_with_google(query: str, conversation_history: dict, location: LocationData): async def search_online(query: str, conversation_history: dict, location: LocationData):
def _search_with_google(subquery: str):
payload = json.dumps(
{
"q": subquery,
}
)
headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"}
response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload)
if response.status_code != 200:
logger.error(response.text)
return {}
json_response = response.json()
sub_response_dict = {}
sub_response_dict["knowledgeGraph"] = json_response.get("knowledgeGraph", {})
sub_response_dict["organic"] = json_response.get("organic", [])
sub_response_dict["answerBox"] = json_response.get("answerBox", [])
sub_response_dict["peopleAlsoAsk"] = json_response.get("peopleAlsoAsk", [])
return sub_response_dict
if SERPER_DEV_API_KEY is None: if SERPER_DEV_API_KEY is None:
logger.warn("SERPER_DEV_API_KEY is not set") logger.warn("SERPER_DEV_API_KEY is not set")
return {} return {}
# Breakdown the query into subqueries to get the correct answer # Breakdown the query into subqueries to get the correct answer
subqueries = await generate_online_subqueries(query, conversation_history, location) subqueries = await generate_online_subqueries(query, conversation_history, location)
response_dict = {} response_dict = {}
for subquery in subqueries: for subquery in subqueries:
logger.info(f"Searching with Google for '{subquery}'") logger.info(f"Searching with Google for '{subquery}'")
response_dict[subquery] = _search_with_google(subquery) response_dict[subquery] = search_with_google(subquery)
extracted_content: Dict[str, List] = {} # Gather distinct web pages from organic search results of each subquery without an instant answer
if is_none_or_empty(OLOSTEP_API_KEY): webpage_links = {
logger.warning("OLOSTEP_API_KEY is not set. Skipping web scraping.") result["link"]
return response_dict for subquery in response_dict
for result in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]
if "answerBox" not in response_dict[subquery]
}
for subquery in response_dict: # Read, extract relevant info from the retrieved web pages
# If a high quality answer is not found, search the web pages of the first 3 organic results tasks = []
if is_none_or_empty(response_dict[subquery].get("answerBox")): for webpage_link in webpage_links:
extracted_content[subquery] = [] logger.info(f"Reading web page at '{webpage_link}'")
for result in response_dict[subquery].get("organic")[:1]: task = read_webpage_and_extract_content(subquery, webpage_link)
logger.info(f"Searching web page of '{result['link']}'") tasks.append(task)
try: results = await asyncio.gather(*tasks)
extracted_content[subquery].append(search_with_olostep(result["link"]).strip())
except Exception as e: # Collect extracted info from the retrieved web pages
logger.error(f"Error while searching web page of '{result['link']}': {e}", exc_info=True) for subquery, extracted_webpage_content in results:
continue if extracted_webpage_content is not None:
extracted_relevant_content = await extract_relevant_info(subquery, extracted_content) response_dict[subquery]["extracted_content"] = extracted_webpage_content
response_dict[subquery]["extracted_content"] = extracted_relevant_content
return response_dict return response_dict
def search_with_olostep(web_url: str) -> str: def search_with_google(subquery: str):
if OLOSTEP_API_KEY is None: payload = json.dumps({"q": subquery})
raise ValueError("OLOSTEP_API_KEY is not set") headers = {"X-API-KEY": SERPER_DEV_API_KEY, "Content-Type": "application/json"}
response = requests.request("POST", SERPER_DEV_URL, headers=headers, data=payload)
if response.status_code != 200:
logger.error(response.text)
return {}
json_response = response.json()
extraction_fields = ["organic", "answerBox", "peopleAlsoAsk", "knowledgeGraph"]
extracted_search_result = {
field: json_response[field] for field in extraction_fields if not is_none_or_empty(json_response.get(field))
}
return extracted_search_result
async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str]]:
try:
with timer(f"Reading web page at '{url}' took", logger):
content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage(url)
with timer(f"Extracting relevant information from web page at '{url}' took", logger):
extracted_info = await extract_relevant_info(subquery, content)
return subquery, extracted_info
except Exception as e:
logger.error(f"Failed to read web page at '{url}' with {e}")
return subquery, None
async def read_webpage(web_url: str) -> str:
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36",
}
async with aiohttp.ClientSession() as session:
async with session.get(web_url, headers=headers, timeout=30) as response:
response.raise_for_status()
html = await response.text()
parsed_html = BeautifulSoup(html, "html.parser")
body = parsed_html.body.get_text(separator="\n", strip=True)
return markdownify(body)
async def read_webpage_with_olostep(web_url: str) -> str:
headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"} headers = {"Authorization": f"Bearer {OLOSTEP_API_KEY}"}
web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore web_scraping_params: Dict[str, Union[str, int, bool]] = OLOSTEP_QUERY_PARAMS.copy() # type: ignore
web_scraping_params["url"] = web_url web_scraping_params["url"] = web_url
try: async with aiohttp.ClientSession() as session:
response = requests.request("GET", OLOSTEP_API_URL, params=web_scraping_params, headers=headers) async with session.get(OLOSTEP_API_URL, params=web_scraping_params, headers=headers) as response:
response.raise_for_status()
if response.status_code != 200: response_json = await response.json()
logger.error(response, exc_info=True) return response_json["markdown_content"]
return None
except Exception as e:
logger.error(f"Error while searching with Olostep: {e}", exc_info=True)
return None
return response.json()["markdown_content"]

View file

@ -14,7 +14,7 @@ from khoj.database.adapters import ConversationAdapters, EntryAdapters, aget_use
from khoj.database.models import KhojUser from khoj.database.models import KhojUser
from khoj.processor.conversation.prompts import help_message, no_entries_found from khoj.processor.conversation.prompts import help_message, no_entries_found
from khoj.processor.conversation.utils import save_to_conversation_log from khoj.processor.conversation.utils import save_to_conversation_log
from khoj.processor.tools.online_search import search_with_google from khoj.processor.tools.online_search import search_online
from khoj.routers.api import extract_references_and_questions from khoj.routers.api import extract_references_and_questions
from khoj.routers.helpers import ( from khoj.routers.helpers import (
ApiUserRateLimiter, ApiUserRateLimiter,
@ -289,7 +289,7 @@ async def chat(
if ConversationCommand.Online in conversation_commands: if ConversationCommand.Online in conversation_commands:
try: try:
online_results = await search_with_google(defiltered_query, meta_log, location) online_results = await search_online(defiltered_query, meta_log, location)
except ValueError as e: except ValueError as e:
return StreamingResponse( return StreamingResponse(
iter(["Please set your SERPER_DEV_API_KEY to get started with online searches 🌐"]), iter(["Please set your SERPER_DEV_API_KEY to get started with online searches 🌐"]),

View file

@ -256,15 +256,17 @@ async def generate_online_subqueries(q: str, conversation_history: dict, locatio
return [q] return [q]
async def extract_relevant_info(q: str, corpus: dict) -> List[str]: async def extract_relevant_info(q: str, corpus: str) -> Union[str, None]:
""" """
Given a target corpus, extract the most relevant info given a query Extract relevant information for a given query from the target corpus
""" """
key = list(corpus.keys())[0] if is_none_or_empty(corpus) or is_none_or_empty(q):
return None
extract_relevant_information = prompts.extract_relevant_information.format( extract_relevant_information = prompts.extract_relevant_information.format(
query=q, query=q,
corpus=corpus[key], corpus=corpus.strip(),
) )
response = await send_message_to_model_wrapper( response = await send_message_to_model_wrapper(

View file

@ -7,7 +7,7 @@ import pytest
from scipy.stats import linregress from scipy.stats import linregress
from khoj.processor.embeddings import EmbeddingsModel from khoj.processor.embeddings import EmbeddingsModel
from khoj.processor.tools.online_search import search_with_olostep from khoj.processor.tools.online_search import read_webpage, read_webpage_with_olostep
from khoj.utils import helpers from khoj.utils import helpers
@ -84,13 +84,29 @@ def test_encode_docs_memory_leak():
assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration" assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration"
@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set") @pytest.mark.asyncio
def test_olostep_api(): async def test_reading_webpage():
# Arrange # Arrange
website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire" website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire"
# Act # Act
response = search_with_olostep(website) response = await read_webpage(website)
# Assert
assert (
"An alarm sent from the area near the fire also failed to register at the courthouse where the fire watchmen were"
in response
)
@pytest.mark.skipif(os.getenv("OLOSTEP_API_KEY") is None, reason="OLOSTEP_API_KEY is not set")
@pytest.mark.asyncio
async def test_reading_webpage_with_olostep():
# Arrange
website = "https://en.wikipedia.org/wiki/Great_Chicago_Fire"
# Act
response = await read_webpage_with_olostep(website)
# Assert # Assert
assert ( assert (