diff --git a/pyproject.toml b/pyproject.toml index 304b3886..76928771 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,7 @@ dependencies = [ "websockets == 12.0", "psutil >= 5.8.0", "huggingface-hub >= 0.22.2", + "apscheduler ~= 3.10.0", ] dynamic = ["version"] diff --git a/src/khoj/main.py b/src/khoj/main.py index 745b77fb..74807137 100644 --- a/src/khoj/main.py +++ b/src/khoj/main.py @@ -23,6 +23,7 @@ warnings.filterwarnings("ignore", message=r"legacy way to download files from th import uvicorn import django +from apscheduler.schedulers.background import BackgroundScheduler from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles @@ -126,6 +127,10 @@ def run(should_start_server=True): # Setup task scheduler poll_task_scheduler() + # Setup Background Scheduler + state.scheduler = BackgroundScheduler() + state.scheduler.start() + # Start Server configure_routes(app) diff --git a/src/khoj/processor/conversation/prompts.py b/src/khoj/processor/conversation/prompts.py index f5700167..04dee1ce 100644 --- a/src/khoj/processor/conversation/prompts.py +++ b/src/khoj/processor/conversation/prompts.py @@ -10,8 +10,7 @@ You were created by Khoj Inc. with the following capabilities: - You *CAN REMEMBER ALL NOTES and PERSONAL INFORMATION FOREVER* that the user ever shares with you. - Users can share files and other information with you using the Khoj Desktop, Obsidian or Emacs app. They can also drag and drop their files into the chat window. -- You *CAN* generate images, look-up real-time information from the internet, and answer questions based on the user's notes. -- You cannot set reminders. +- You *CAN* generate images, look-up real-time information from the internet, set reminders and answer questions based on the user's notes. - Say "I don't know" or "I don't understand" if you don't know what to say or if you don't know the answer to a question. - Ask crisp follow-up questions to get additional context, when the answer cannot be inferred from the provided notes or past conversations. - Sometimes the user will share personal information that needs to be remembered, like an account ID or a residential address. These can be acknowledged with a simple "Got it" or "Okay". @@ -301,6 +300,22 @@ AI: I can help with that. I see online that there is a new model of the Dell XPS Q: What are the specs of the new Dell XPS 15? Khoj: default +Example: +Chat History: +User: Where did I go on my last vacation? +AI: You went to Jordan and visited Petra, the Dead Sea, and Wadi Rum. + +Q: Remind me who did I go with on that trip? +Khoj: default + +Example: +Chat History: +User: How's the weather outside? Current Location: Bali, Indonesia +AI: It's currently 28°C and partly cloudy in Bali. + +Q: Share a painting using the weather for Bali every morning. +Khoj: reminder + Now it's your turn to pick the mode you would like to use to answer the user's question. Provide your response as a string. Chat History: @@ -492,6 +507,42 @@ Khoj: """.strip() ) +# Schedule task +# -- +crontime_prompt = PromptTemplate.from_template( + """ +You are Khoj, an extremely smart and helpful task scheduling assistant +- Given a user query, you infer the date, time to run the query at as a cronjob time string (converted to UTC time zone) +- Convert the cron job time to run in UTC +- Infer user's time zone from the current location provided in their message +- Use an approximate time that makes sense, if it not unspecified. +- Also extract the query to run at the scheduled time. Add any context required from the chat history to improve the query. + +# Examples: +User: Could you share a funny Calvin and Hobbes quote from my notes? +AI: Here is one I found: "It's not denial. I'm just selective about the reality I accept." +User: Hahah, nice! Show a new one every morning at 9am. My Current Location: Shanghai, China +Khoj: ["0 1 * * *", "Share a funny Calvin and Hobbes or Bill Watterson quote from my notes."] + +User: Share the top weekly posts on Hacker News on Monday evenings. Format it as a newsletter. My Current Location: Nairobi, Kenya +Khoj: ["30 15 * * 1", "Top posts last week on Hacker News"] + +User: What is the latest version of the Khoj python package? +AI: The latest released Khoj python package version is 1.5.0. +User: Notify me when version 2.0.0 is released. My Current Location: Mexico City, Mexico +Khoj: ["0 16 * * *", "Check if the latest released version of the Khoj python package is >= 2.0.0?"] + +User: Tell me the latest local tech news on the first Sunday of every Month. My Current Location: Dublin, Ireland +Khoj: ["0 9 1-7 * 0", "Latest tech, AI and engineering news from around Dublin, Ireland"] + +# Chat History: +{chat_history} + +User: {query}. My Current Location: {user_location} +Khoj: +""".strip() +) + # System messages to user # -- help_message = PromptTemplate.from_template( diff --git a/src/khoj/routers/api_chat.py b/src/khoj/routers/api_chat.py index 2a1bbc5e..bb164b13 100644 --- a/src/khoj/routers/api_chat.py +++ b/src/khoj/routers/api_chat.py @@ -4,7 +4,8 @@ import math from typing import Dict, Optional from urllib.parse import unquote -from asgiref.sync import sync_to_async +from apscheduler.triggers.cron import CronTrigger +from asgiref.sync import async_to_sync, sync_to_async from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket from fastapi.requests import Request from fastapi.responses import Response, StreamingResponse @@ -29,12 +30,14 @@ from khoj.routers.api import extract_references_and_questions from khoj.routers.helpers import ( ApiUserRateLimiter, CommonQueryParams, + CommonQueryParamsClass, ConversationCommandRateLimiter, agenerate_chat_response, aget_relevant_information_sources, aget_relevant_output_modes, get_conversation_command, is_ready_to_chat, + schedule_query, text_to_image, update_telemetry_state, validate_conversation_config, @@ -381,6 +384,55 @@ async def websocket_endpoint( await conversation_command_rate_limiter.update_and_check_if_valid(websocket, cmd) q = q.replace(f"/{cmd.value}", "").strip() + if ConversationCommand.Reminder in conversation_commands: + crontime, inferred_query = await schedule_query(q, location, meta_log) + trigger = CronTrigger.from_crontab(crontime) + common = CommonQueryParamsClass( + client=websocket.user.client_app, + user_agent=websocket.headers.get("user-agent"), + host=websocket.headers.get("host"), + ) + scope = websocket.scope.copy() + scope["path"] = "/api/chat" + scope["type"] = "http" + request = Request(scope) + + state.scheduler.add_job( + async_to_sync(chat), + trigger=trigger, + args=(request, common, inferred_query), + kwargs={ + "stream": False, + "conversation_id": conversation_id, + "city": city, + "region": region, + "country": country, + }, + id=f"job_{user.uuid}_{inferred_query}", + replace_existing=True, + ) + + llm_response = ( + f'🕒 Scheduled running Query: "{inferred_query}" on Schedule: `{crontime}` (in server timezone).' + ) + await sync_to_async(save_to_conversation_log)( + q, + llm_response, + user, + meta_log, + intent_type="reminder", + client_application=websocket.user.client_app, + conversation_id=conversation_id, + ) + update_telemetry_state( + request=websocket, + telemetry_type="api", + api="chat", + **common.__dict__, + ) + await send_complete_llm_response(llm_response) + continue + compiled_references, inferred_queries, defiltered_query = await extract_references_and_questions( websocket, meta_log, q, 7, 0.18, conversation_commands, location, send_status_update ) @@ -576,6 +628,33 @@ async def chat( user_name = await aget_user_name(user) + if ConversationCommand.Reminder in conversation_commands: + crontime, inferred_query = await schedule_query(q, location, meta_log) + trigger = CronTrigger.from_crontab(crontime) + state.scheduler.add_job( + async_to_sync(chat), + trigger=trigger, + args=(request, common, inferred_query, n, d, False, title, conversation_id, city, region, country), + id=f"job_{user.uuid}_{inferred_query}", + replace_existing=True, + ) + + llm_response = f'🕒 Scheduled running Query: "{inferred_query}" on Schedule: `{crontime}` (in server timezone).' + await sync_to_async(save_to_conversation_log)( + q, + llm_response, + user, + meta_log, + intent_type="reminder", + client_application=request.user.client_app, + conversation_id=conversation_id, + ) + + if stream: + return StreamingResponse(llm_response, media_type="text/event-stream", status_code=200) + else: + return Response(content=llm_response, media_type="text/plain", status_code=200) + compiled_references, inferred_queries, defiltered_query = await extract_references_and_questions( request, meta_log, q, (n or 5), (d or math.inf), conversation_commands, location ) diff --git a/src/khoj/routers/helpers.py b/src/khoj/routers/helpers.py index af33564f..1dab6c53 100644 --- a/src/khoj/routers/helpers.py +++ b/src/khoj/routers/helpers.py @@ -134,7 +134,7 @@ def update_telemetry_state( def construct_chat_history(conversation_history: dict, n: int = 4, agent_name="AI") -> str: chat_history = "" for chat in conversation_history.get("chat", [])[-n:]: - if chat["by"] == "khoj" and chat["intent"].get("type") == "remember": + if chat["by"] == "khoj" and chat["intent"].get("type") in ["remember", "reminder"]: chat_history += f"User: {chat['intent']['query']}\n" chat_history += f"{agent_name}: {chat['message']}\n" elif chat["by"] == "khoj" and ("text-to-image" in chat["intent"].get("type")): @@ -312,6 +312,34 @@ async def generate_online_subqueries(q: str, conversation_history: dict, locatio return [q] +async def schedule_query(q: str, location_data: LocationData, conversation_history: dict) -> Tuple[str, ...]: + """ + Schedule the date, time to run the query. Assume the server timezone is UTC. + """ + user_location = ( + f"{location_data.city}, {location_data.region}, {location_data.country}" if location_data else "Greenwich" + ) + chat_history = construct_chat_history(conversation_history) + + crontime_prompt = prompts.crontime_prompt.format( + query=q, + user_location=user_location, + chat_history=chat_history, + ) + + raw_response = await send_message_to_model_wrapper(crontime_prompt) + + # Validate that the response is a non-empty, JSON-serializable list + try: + raw_response = raw_response.strip() + response: List[str] = json.loads(raw_response) + if not isinstance(response, list) or not response or len(response) != 2: + raise AssertionError(f"Invalid response for scheduling query : {response}") + return tuple(response) + except Exception: + raise AssertionError(f"Invalid response for scheduling query: {raw_response}") + + async def extract_relevant_info(q: str, corpus: str) -> Union[str, None]: """ Extract relevant information for a given query from the target corpus @@ -547,7 +575,7 @@ async def text_to_image( text2image_model = text_to_image_config.model_name chat_history = "" for chat in conversation_log.get("chat", [])[-4:]: - if chat["by"] == "khoj" and chat["intent"].get("type") == "remember": + if chat["by"] == "khoj" and chat["intent"].get("type") in ["remember", "reminder"]: chat_history += f"Q: {chat['intent']['query']}\n" chat_history += f"A: {chat['message']}\n" elif chat["by"] == "khoj" and "text-to-image" in chat["intent"].get("type"): diff --git a/src/khoj/utils/helpers.py b/src/khoj/utils/helpers.py index 3cb5bfac..9ff402ab 100644 --- a/src/khoj/utils/helpers.py +++ b/src/khoj/utils/helpers.py @@ -304,6 +304,7 @@ class ConversationCommand(str, Enum): Online = "online" Webpage = "webpage" Image = "image" + Reminder = "reminder" command_descriptions = { @@ -313,6 +314,7 @@ command_descriptions = { ConversationCommand.Online: "Search for information on the internet.", ConversationCommand.Webpage: "Get information from webpage links provided by you.", ConversationCommand.Image: "Generate images by describing your imagination in words.", + ConversationCommand.Reminder: "Schedule your query to run at a specified time or interval.", ConversationCommand.Help: "Display a help message with all available commands and other metadata.", } @@ -325,7 +327,8 @@ tool_descriptions_for_llm = { } mode_descriptions_for_llm = { - ConversationCommand.Image: "Use this if you think the user is requesting an image or visual response to their query.", + ConversationCommand.Image: "Use this if the user is requesting an image or visual response to their query.", + ConversationCommand.Reminder: "Use this if the user is requesting a response at a scheduled date or time.", ConversationCommand.Default: "Use this if the other response modes don't seem to fit the query.", } diff --git a/src/khoj/utils/state.py b/src/khoj/utils/state.py index 8270a70f..7439929f 100644 --- a/src/khoj/utils/state.py +++ b/src/khoj/utils/state.py @@ -4,6 +4,7 @@ from collections import defaultdict from pathlib import Path from typing import Any, Dict, List +from apscheduler.schedulers.background import BackgroundScheduler from openai import OpenAI from whisper import Whisper @@ -29,6 +30,7 @@ cli_args: List[str] = None query_cache: Dict[str, LRU] = defaultdict(LRU) chat_lock = threading.Lock() SearchType = utils_config.SearchType +scheduler: BackgroundScheduler = None telemetry: List[Dict[str, str]] = [] khoj_version: str = None device = get_device()