diff --git a/src/khoj/processor/tools/online_search.py b/src/khoj/processor/tools/online_search.py index a37e3c3a..72191077 100644 --- a/src/khoj/processor/tools/online_search.py +++ b/src/khoj/processor/tools/online_search.py @@ -2,6 +2,7 @@ import asyncio import json import logging import os +import urllib.parse from collections import defaultdict from typing import Callable, Dict, List, Optional, Tuple, Union @@ -22,6 +23,10 @@ logger = logging.getLogger(__name__) SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") SERPER_DEV_URL = "https://google.serper.dev/search" +JINA_READER_API_URL = "https://r.jina.ai/" +JINA_SEARCH_API_URL = "https://s.jina.ai/" +JINA_API_KEY = os.getenv("JINA_API_KEY") + OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_QUERY_PARAMS = { @@ -49,9 +54,6 @@ async def search_online( custom_filters: List[str] = [], ): query += " ".join(custom_filters) - if not online_search_enabled(): - logger.warn("SERPER_DEV_API_KEY is not set") - return {} if not is_internet_connected(): logger.warn("Cannot search online as not connected to internet") return {} @@ -67,25 +69,28 @@ async def search_online( await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}") with timer(f"Internet searches for {list(subqueries)} took", logger): - search_tasks = [search_with_google(subquery) for subquery in subqueries] + search_func = search_with_google if SERPER_DEV_API_KEY else search_with_jina + search_tasks = [search_func(subquery) for subquery in subqueries] search_results = await asyncio.gather(*search_tasks) response_dict = {subquery: search_result for subquery, search_result in search_results} - # Gather distinct web pages from organic search results of each subquery without an instant answer - webpage_links = { - organic["link"]: subquery + # Gather distinct web page data from organic results of each subquery without an instant answer. + # Content of web pages is directly available when Jina is used for search. + webpages = { + (organic.get("link"), subquery, organic.get("content")) for subquery in response_dict for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] if "answerBox" not in response_dict[subquery] } # Read, extract relevant info from the retrieved web pages - if webpage_links: + if webpages: + webpage_links = [link for link, _, _ in webpages] logger.info(f"🌐👀 Reading web pages at: {list(webpage_links)}") if send_status_func: webpage_links_str = "\n- " + "\n- ".join(list(webpage_links)) await send_status_func(f"**📖 Reading web pages**: {webpage_links_str}") - tasks = [read_webpage_and_extract_content(subquery, link) for link, subquery in webpage_links.items()] + tasks = [read_webpage_and_extract_content(subquery, link, content) for link, subquery, content in webpages] results = await asyncio.gather(*tasks) # Collect extracted info from the retrieved web pages @@ -139,10 +144,13 @@ async def read_webpages( return response -async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str], str]: +async def read_webpage_and_extract_content( + subquery: str, url: str, content: str = None +) -> Tuple[str, Union[None, str], str]: try: - with timer(f"Reading web page at '{url}' took", logger): - content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_at_url(url) + if is_none_or_empty(content): + with timer(f"Reading web page at '{url}' took", logger): + content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_with_jina(url) with timer(f"Extracting relevant information from web page at '{url}' took", logger): extracted_info = await extract_relevant_info(subquery, content) return subquery, extracted_info, url @@ -177,5 +185,41 @@ async def read_webpage_with_olostep(web_url: str) -> str: return response_json["markdown_content"] -def online_search_enabled(): - return SERPER_DEV_API_KEY is not None +async def read_webpage_with_jina(web_url: str) -> str: + jina_reader_api_url = f"{JINA_READER_API_URL}/{web_url}" + headers = {"Accept": "application/json", "X-Timeout": "30"} + if JINA_API_KEY: + headers["Authorization"] = f"Bearer {JINA_API_KEY}" + + async with aiohttp.ClientSession() as session: + async with session.get(jina_reader_api_url, headers=headers) as response: + response.raise_for_status() + response_json = await response.json() + return response_json["data"]["content"] + + +async def search_with_jina(query: str) -> Tuple[str, Dict[str, List[Dict]]]: + encoded_query = urllib.parse.quote(query) + jina_search_api_url = f"{JINA_SEARCH_API_URL}/{encoded_query}" + headers = {"Accept": "application/json"} + if JINA_API_KEY: + headers["Authorization"] = f"Bearer {JINA_API_KEY}" + + async with aiohttp.ClientSession() as session: + async with session.get(jina_search_api_url, headers=headers) as response: + if response.status != 200: + logger.error(await response.text()) + return query, {} + response_json = await response.json() + parsed_response = [ + { + "title": item["title"], + "content": item.get("content"), + # rename description -> snippet for consistency + "snippet": item["description"], + # rename url -> link for consistency + "link": item["url"], + } + for item in response_json["data"] + ] + return query, {"organic": parsed_response} diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index a014fca5..6d0cf926 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -29,11 +29,7 @@ from khoj.processor.conversation.prompts import ( ) from khoj.processor.conversation.utils import save_to_conversation_log from khoj.processor.speech.text_to_speech import generate_text_to_speech -from khoj.processor.tools.online_search import ( - online_search_enabled, - read_webpages, - search_online, -) +from khoj.processor.tools.online_search import read_webpages, search_online from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, @@ -767,22 +763,16 @@ async def websocket_endpoint( conversation_commands.remove(ConversationCommand.Notes) if ConversationCommand.Online in conversation_commands: - if not online_search_enabled(): - conversation_commands.remove(ConversationCommand.Online) - # If online search is not enabled, try to read webpages directly - if ConversationCommand.Webpage not in conversation_commands: - conversation_commands.append(ConversationCommand.Webpage) - else: - try: - online_results = await search_online( - defiltered_query, meta_log, location, send_status_update, custom_filters - ) - except ValueError as e: - logger.warning(f"Error searching online: {e}. Attempting to respond without online results") - await send_complete_llm_response( - f"Error searching online: {e}. Attempting to respond without online results" - ) - continue + try: + online_results = await search_online( + defiltered_query, meta_log, location, send_status_update, custom_filters + ) + except ValueError as e: + logger.warning(f"Error searching online: {e}. Attempting to respond without online results") + await send_complete_llm_response( + f"Error searching online: {e}. Attempting to respond without online results" + ) + continue if ConversationCommand.Webpage in conversation_commands: try: @@ -1067,18 +1057,10 @@ async def chat( conversation_commands.remove(ConversationCommand.Notes) if ConversationCommand.Online in conversation_commands: - if not online_search_enabled(): - conversation_commands.remove(ConversationCommand.Online) - # If online search is not enabled, try to read webpages directly - if ConversationCommand.Webpage not in conversation_commands: - conversation_commands.append(ConversationCommand.Webpage) - else: - try: - online_results = await search_online( - defiltered_query, meta_log, location, custom_filters=_custom_filters - ) - except ValueError as e: - logger.warning(f"Error searching online: {e}. Attempting to respond without online results") + try: + online_results = await search_online(defiltered_query, meta_log, location, custom_filters=_custom_filters) + except ValueError as e: + logger.warning(f"Error searching online: {e}. Attempting to respond without online results") if ConversationCommand.Webpage in conversation_commands: try: