Default to Jina search, reader API when no Serper.dev, Olostep API keys

Jina AI provides a search and webpage reader API that doesn't require
an API key. This provides a good default to enable online search for
self-hosted readers requiring no additional setup.

Jina search API also returns webpage contents with the results, so
just use those directly when Jina Search API used instead of
trying to read webpages separately. The extract relvant content from
webpage step using a chat model is still used from the
`read_webpage_and_extract_content' func in this case.

Parse search results from Jina search API into same format as
Serper.dev for accurate rendering of online references by clients
This commit is contained in:
Debanjum Singh Solanky 2024-06-22 10:42:45 +05:30
parent ff44734774
commit a038e4911b
2 changed files with 73 additions and 47 deletions

View file

@ -2,6 +2,7 @@ import asyncio
import json import json
import logging import logging
import os import os
import urllib.parse
from collections import defaultdict from collections import defaultdict
from typing import Callable, Dict, List, Optional, Tuple, Union from typing import Callable, Dict, List, Optional, Tuple, Union
@ -22,6 +23,10 @@ logger = logging.getLogger(__name__)
SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY") SERPER_DEV_API_KEY = os.getenv("SERPER_DEV_API_KEY")
SERPER_DEV_URL = "https://google.serper.dev/search" SERPER_DEV_URL = "https://google.serper.dev/search"
JINA_READER_API_URL = "https://r.jina.ai/"
JINA_SEARCH_API_URL = "https://s.jina.ai/"
JINA_API_KEY = os.getenv("JINA_API_KEY")
OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY") OLOSTEP_API_KEY = os.getenv("OLOSTEP_API_KEY")
OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI" OLOSTEP_API_URL = "https://agent.olostep.com/olostep-p2p-incomingAPI"
OLOSTEP_QUERY_PARAMS = { OLOSTEP_QUERY_PARAMS = {
@ -49,9 +54,6 @@ async def search_online(
custom_filters: List[str] = [], custom_filters: List[str] = [],
): ):
query += " ".join(custom_filters) query += " ".join(custom_filters)
if not online_search_enabled():
logger.warn("SERPER_DEV_API_KEY is not set")
return {}
if not is_internet_connected(): if not is_internet_connected():
logger.warn("Cannot search online as not connected to internet") logger.warn("Cannot search online as not connected to internet")
return {} return {}
@ -67,25 +69,28 @@ async def search_online(
await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}") await send_status_func(f"**🌐 Searching the Internet for**: {subqueries_str}")
with timer(f"Internet searches for {list(subqueries)} took", logger): with timer(f"Internet searches for {list(subqueries)} took", logger):
search_tasks = [search_with_google(subquery) for subquery in subqueries] search_func = search_with_google if SERPER_DEV_API_KEY else search_with_jina
search_tasks = [search_func(subquery) for subquery in subqueries]
search_results = await asyncio.gather(*search_tasks) search_results = await asyncio.gather(*search_tasks)
response_dict = {subquery: search_result for subquery, search_result in search_results} response_dict = {subquery: search_result for subquery, search_result in search_results}
# Gather distinct web pages from organic search results of each subquery without an instant answer # Gather distinct web page data from organic results of each subquery without an instant answer.
webpage_links = { # Content of web pages is directly available when Jina is used for search.
organic["link"]: subquery webpages = {
(organic.get("link"), subquery, organic.get("content"))
for subquery in response_dict for subquery in response_dict
for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ] for organic in response_dict[subquery].get("organic", [])[:MAX_WEBPAGES_TO_READ]
if "answerBox" not in response_dict[subquery] if "answerBox" not in response_dict[subquery]
} }
# Read, extract relevant info from the retrieved web pages # Read, extract relevant info from the retrieved web pages
if webpage_links: if webpages:
webpage_links = [link for link, _, _ in webpages]
logger.info(f"🌐👀 Reading web pages at: {list(webpage_links)}") logger.info(f"🌐👀 Reading web pages at: {list(webpage_links)}")
if send_status_func: if send_status_func:
webpage_links_str = "\n- " + "\n- ".join(list(webpage_links)) webpage_links_str = "\n- " + "\n- ".join(list(webpage_links))
await send_status_func(f"**📖 Reading web pages**: {webpage_links_str}") await send_status_func(f"**📖 Reading web pages**: {webpage_links_str}")
tasks = [read_webpage_and_extract_content(subquery, link) for link, subquery in webpage_links.items()] tasks = [read_webpage_and_extract_content(subquery, link, content) for link, subquery, content in webpages]
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
# Collect extracted info from the retrieved web pages # Collect extracted info from the retrieved web pages
@ -139,10 +144,13 @@ async def read_webpages(
return response return response
async def read_webpage_and_extract_content(subquery: str, url: str) -> Tuple[str, Union[None, str], str]: async def read_webpage_and_extract_content(
subquery: str, url: str, content: str = None
) -> Tuple[str, Union[None, str], str]:
try: try:
with timer(f"Reading web page at '{url}' took", logger): if is_none_or_empty(content):
content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_at_url(url) with timer(f"Reading web page at '{url}' took", logger):
content = await read_webpage_with_olostep(url) if OLOSTEP_API_KEY else await read_webpage_with_jina(url)
with timer(f"Extracting relevant information from web page at '{url}' took", logger): with timer(f"Extracting relevant information from web page at '{url}' took", logger):
extracted_info = await extract_relevant_info(subquery, content) extracted_info = await extract_relevant_info(subquery, content)
return subquery, extracted_info, url return subquery, extracted_info, url
@ -177,5 +185,41 @@ async def read_webpage_with_olostep(web_url: str) -> str:
return response_json["markdown_content"] return response_json["markdown_content"]
def online_search_enabled(): async def read_webpage_with_jina(web_url: str) -> str:
return SERPER_DEV_API_KEY is not None jina_reader_api_url = f"{JINA_READER_API_URL}/{web_url}"
headers = {"Accept": "application/json", "X-Timeout": "30"}
if JINA_API_KEY:
headers["Authorization"] = f"Bearer {JINA_API_KEY}"
async with aiohttp.ClientSession() as session:
async with session.get(jina_reader_api_url, headers=headers) as response:
response.raise_for_status()
response_json = await response.json()
return response_json["data"]["content"]
async def search_with_jina(query: str) -> Tuple[str, Dict[str, List[Dict]]]:
encoded_query = urllib.parse.quote(query)
jina_search_api_url = f"{JINA_SEARCH_API_URL}/{encoded_query}"
headers = {"Accept": "application/json"}
if JINA_API_KEY:
headers["Authorization"] = f"Bearer {JINA_API_KEY}"
async with aiohttp.ClientSession() as session:
async with session.get(jina_search_api_url, headers=headers) as response:
if response.status != 200:
logger.error(await response.text())
return query, {}
response_json = await response.json()
parsed_response = [
{
"title": item["title"],
"content": item.get("content"),
# rename description -> snippet for consistency
"snippet": item["description"],
# rename url -> link for consistency
"link": item["url"],
}
for item in response_json["data"]
]
return query, {"organic": parsed_response}

View file

@ -29,11 +29,7 @@ from khoj.processor.conversation.prompts import (
) )
from khoj.processor.conversation.utils import save_to_conversation_log from khoj.processor.conversation.utils import save_to_conversation_log
from khoj.processor.speech.text_to_speech import generate_text_to_speech from khoj.processor.speech.text_to_speech import generate_text_to_speech
from khoj.processor.tools.online_search import ( from khoj.processor.tools.online_search import read_webpages, search_online
online_search_enabled,
read_webpages,
search_online,
)
from khoj.routers.api import extract_references_and_questions from khoj.routers.api import extract_references_and_questions
from khoj.routers.helpers import ( from khoj.routers.helpers import (
ApiUserRateLimiter, ApiUserRateLimiter,
@ -767,22 +763,16 @@ async def websocket_endpoint(
conversation_commands.remove(ConversationCommand.Notes) conversation_commands.remove(ConversationCommand.Notes)
if ConversationCommand.Online in conversation_commands: if ConversationCommand.Online in conversation_commands:
if not online_search_enabled(): try:
conversation_commands.remove(ConversationCommand.Online) online_results = await search_online(
# If online search is not enabled, try to read webpages directly defiltered_query, meta_log, location, send_status_update, custom_filters
if ConversationCommand.Webpage not in conversation_commands: )
conversation_commands.append(ConversationCommand.Webpage) except ValueError as e:
else: logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
try: await send_complete_llm_response(
online_results = await search_online( f"Error searching online: {e}. Attempting to respond without online results"
defiltered_query, meta_log, location, send_status_update, custom_filters )
) continue
except ValueError as e:
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
await send_complete_llm_response(
f"Error searching online: {e}. Attempting to respond without online results"
)
continue
if ConversationCommand.Webpage in conversation_commands: if ConversationCommand.Webpage in conversation_commands:
try: try:
@ -1067,18 +1057,10 @@ async def chat(
conversation_commands.remove(ConversationCommand.Notes) conversation_commands.remove(ConversationCommand.Notes)
if ConversationCommand.Online in conversation_commands: if ConversationCommand.Online in conversation_commands:
if not online_search_enabled(): try:
conversation_commands.remove(ConversationCommand.Online) online_results = await search_online(defiltered_query, meta_log, location, custom_filters=_custom_filters)
# If online search is not enabled, try to read webpages directly except ValueError as e:
if ConversationCommand.Webpage not in conversation_commands: logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
conversation_commands.append(ConversationCommand.Webpage)
else:
try:
online_results = await search_online(
defiltered_query, meta_log, location, custom_filters=_custom_filters
)
except ValueError as e:
logger.warning(f"Error searching online: {e}. Attempting to respond without online results")
if ConversationCommand.Webpage in conversation_commands: if ConversationCommand.Webpage in conversation_commands:
try: try: