mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-11-27 17:35:07 +01:00
Improve rendering task scheduled settings view and message
- Render crontime string in natural language in message & settings UI - Show more fields in tasks web config UI - Add link to the tasks settings page in task scheduled chat response - Improve task variables names Rename executing_query to query_to_run. scheduling_query to scheduling_request
This commit is contained in:
parent
d341b1efe8
commit
230d160602
5 changed files with 50 additions and 28 deletions
|
@ -82,6 +82,7 @@ dependencies = [
|
||||||
"huggingface-hub >= 0.22.2",
|
"huggingface-hub >= 0.22.2",
|
||||||
"apscheduler ~= 3.10.0",
|
"apscheduler ~= 3.10.0",
|
||||||
"pytz ~= 2024.1",
|
"pytz ~= 2024.1",
|
||||||
|
"cron-descriptor == 1.4.3",
|
||||||
]
|
]
|
||||||
dynamic = ["version"]
|
dynamic = ["version"]
|
||||||
|
|
||||||
|
|
|
@ -286,7 +286,9 @@
|
||||||
<thead>
|
<thead>
|
||||||
<tr>
|
<tr>
|
||||||
<th scope="col">Name</th>
|
<th scope="col">Name</th>
|
||||||
<th scope="col">Next Run</th>
|
<th scope="col">Scheduling Request</th>
|
||||||
|
<th scope="col">Query to Run</th>
|
||||||
|
<th scope="col">Schedule</th>
|
||||||
<th scope="col">Actions</th>
|
<th scope="col">Actions</th>
|
||||||
</tr>
|
</tr>
|
||||||
</thead>
|
</thead>
|
||||||
|
@ -674,12 +676,17 @@
|
||||||
|
|
||||||
function generateTaskRow(taskObj) {
|
function generateTaskRow(taskObj) {
|
||||||
let taskId = taskObj.id;
|
let taskId = taskObj.id;
|
||||||
let taskName = taskObj.name;
|
let taskSchedulingRequest = taskObj.scheduling_request;
|
||||||
let taskNextRun = taskObj.next;
|
let taskQuery = taskObj.query_to_run;
|
||||||
|
let taskSubject = taskObj.subject;
|
||||||
|
let taskNextRun = `Next run at ${taskObj.next}`;
|
||||||
|
let taskSchedule = taskObj.schedule;
|
||||||
return `
|
return `
|
||||||
<tr id="scheduled-task-item-${taskId}">
|
<tr id="scheduled-task-item-${taskId}">
|
||||||
<td><b>${taskName}</b></td>
|
<td><b>${taskSubject}</b></td>
|
||||||
<td id="scheduled-task-${taskId}">${taskNextRun}</td>
|
<td><b>${taskSchedulingRequest}</b></td>
|
||||||
|
<td><b>${taskQuery}</b></td>
|
||||||
|
<td id="scheduled-task-${taskId}" title="${taskNextRun}">${taskSchedule}</td>
|
||||||
<td>
|
<td>
|
||||||
<img onclick="deleteTask('${taskId}')" class="configured-icon api-key-action enabled" src="/static/assets/icons/trash-solid.svg" alt="Delete Task" title="Delete Task">
|
<img onclick="deleteTask('${taskId}')" class="configured-icon api-key-action enabled" src="/static/assets/icons/trash-solid.svg" alt="Delete Task" title="Delete Task">
|
||||||
</td>
|
</td>
|
||||||
|
|
|
@ -8,6 +8,7 @@ import time
|
||||||
import uuid
|
import uuid
|
||||||
from typing import Any, Callable, List, Optional, Union
|
from typing import Any, Callable, List, Optional, Union
|
||||||
|
|
||||||
|
import cron_descriptor
|
||||||
from apscheduler.job import Job
|
from apscheduler.job import Job
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
|
from fastapi import APIRouter, Depends, File, HTTPException, Request, UploadFile
|
||||||
|
@ -401,11 +402,17 @@ def get_jobs(request: Request) -> Response:
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
if task.id.startswith(f"job_{user.uuid}_"):
|
if task.id.startswith(f"job_{user.uuid}_"):
|
||||||
task_metadata = json.loads(task.name)
|
task_metadata = json.loads(task.name)
|
||||||
|
schedule = (
|
||||||
|
f'{cron_descriptor.get_description(task_metadata["crontime"])} {task.next_run_time.strftime("%Z")}'
|
||||||
|
)
|
||||||
tasks_info.append(
|
tasks_info.append(
|
||||||
{
|
{
|
||||||
"id": task.id,
|
"id": task.id,
|
||||||
"name": re.sub(r"^/task\s*", "", task_metadata["inferred_query"]),
|
"subject": task_metadata["subject"],
|
||||||
"next": task.next_run_time.strftime("%Y-%m-%d %H:%M"),
|
"query_to_run": re.sub(r"^/task\s*", "", task_metadata["query_to_run"]),
|
||||||
|
"scheduling_request": task_metadata["scheduling_request"],
|
||||||
|
"schedule": schedule,
|
||||||
|
"next": task.next_run_time.strftime("%Y-%m-%d %I:%M %p %Z"),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ from datetime import datetime
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
from urllib.parse import unquote
|
from urllib.parse import unquote
|
||||||
|
|
||||||
|
import cron_descriptor
|
||||||
from asgiref.sync import sync_to_async
|
from asgiref.sync import sync_to_async
|
||||||
from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket
|
from fastapi import APIRouter, Depends, HTTPException, Request, WebSocket
|
||||||
from fastapi.requests import Request
|
from fastapi.requests import Request
|
||||||
|
@ -398,16 +399,19 @@ async def websocket_endpoint(
|
||||||
await send_complete_llm_response(f"Unable to schedule task. Ensure the task doesn't already exist.")
|
await send_complete_llm_response(f"Unable to schedule task. Ensure the task doesn't already exist.")
|
||||||
continue
|
continue
|
||||||
# Display next run time in user timezone instead of UTC
|
# Display next run time in user timezone instead of UTC
|
||||||
next_run_time = job.next_run_time.strftime("%Y-%m-%d %H:%M %Z (%z)")
|
schedule = f'{cron_descriptor.get_description(crontime)} {job.next_run_time.strftime("%Z")}'
|
||||||
|
next_run_time = job.next_run_time.strftime("%Y-%m-%d %H:%M %Z")
|
||||||
# Remove /task prefix from inferred_query
|
# Remove /task prefix from inferred_query
|
||||||
unprefixed_inferred_query = re.sub(r"^\/task\s*", "", inferred_query)
|
unprefixed_inferred_query = re.sub(r"^\/task\s*", "", inferred_query)
|
||||||
# Create the scheduled task response
|
# Create the scheduled task response
|
||||||
llm_response = f"""
|
llm_response = f"""
|
||||||
### 🕒 Scheduled Task
|
### 🕒 Scheduled Task
|
||||||
- Query: **"{unprefixed_inferred_query}"**
|
|
||||||
- Subject: **{subject}**
|
- Subject: **{subject}**
|
||||||
- Schedule: `{crontime}`
|
- Query: "{unprefixed_inferred_query}"
|
||||||
- Next Run At: **{next_run_time}**.
|
- Schedule: `{schedule}`
|
||||||
|
- Next Run At: {next_run_time}
|
||||||
|
|
||||||
|
Manage your tasks [here](/config#tasks).
|
||||||
""".strip()
|
""".strip()
|
||||||
|
|
||||||
await sync_to_async(save_to_conversation_log)(
|
await sync_to_async(save_to_conversation_log)(
|
||||||
|
@ -649,16 +653,19 @@ async def chat(
|
||||||
status_code=500,
|
status_code=500,
|
||||||
)
|
)
|
||||||
# Display next run time in user timezone instead of UTC
|
# Display next run time in user timezone instead of UTC
|
||||||
next_run_time = job.next_run_time.strftime("%Y-%m-%d %H:%M %Z (%z)")
|
schedule = f'{cron_descriptor.get_description(crontime)} {job.next_run_time.strftime("%Z")}'
|
||||||
|
next_run_time = job.next_run_time.strftime("%Y-%m-%d %H:%M %Z")
|
||||||
# Remove /task prefix from inferred_query
|
# Remove /task prefix from inferred_query
|
||||||
unprefixed_inferred_query = re.sub(r"^\/task\s*", "", inferred_query)
|
unprefixed_inferred_query = re.sub(r"^\/task\s*", "", inferred_query)
|
||||||
# Create the scheduled task response
|
# Create the scheduled task response
|
||||||
llm_response = f"""
|
llm_response = f"""
|
||||||
### 🕒 Scheduled Task
|
### 🕒 Scheduled Task
|
||||||
- Query: **"{unprefixed_inferred_query}"**
|
|
||||||
- Subject: **{subject}**
|
- Subject: **{subject}**
|
||||||
- Schedule: `{crontime}`
|
- Query: "{unprefixed_inferred_query}"
|
||||||
- Next Run At: **{next_run_time}**.'
|
- Schedule: `{schedule}`
|
||||||
|
- Next Run At: {next_run_time}
|
||||||
|
|
||||||
|
Manage your tasks [here](/config#tasks).
|
||||||
""".strip()
|
""".strip()
|
||||||
|
|
||||||
await sync_to_async(save_to_conversation_log)(
|
await sync_to_async(save_to_conversation_log)(
|
||||||
|
|
|
@ -872,13 +872,13 @@ def should_notify(original_query: str, executed_query: str, ai_response: str) ->
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def scheduled_chat(executing_query: str, scheduling_query: str, subject: str, user: KhojUser, calling_url: URL):
|
def scheduled_chat(query_to_run: str, scheduling_request: str, subject: str, user: KhojUser, calling_url: URL):
|
||||||
# Extract relevant params from the original URL
|
# Extract relevant params from the original URL
|
||||||
scheme = "http" if not calling_url.is_secure else "https"
|
scheme = "http" if not calling_url.is_secure else "https"
|
||||||
query_dict = parse_qs(calling_url.query)
|
query_dict = parse_qs(calling_url.query)
|
||||||
|
|
||||||
# Replace the original scheduling query with the scheduled query
|
# Replace the original scheduling query with the scheduled query
|
||||||
query_dict["q"] = [executing_query]
|
query_dict["q"] = [query_to_run]
|
||||||
|
|
||||||
# Construct the URL to call the chat API with the scheduled query string
|
# Construct the URL to call the chat API with the scheduled query string
|
||||||
encoded_query = urlencode(query_dict, doseq=True)
|
encoded_query = urlencode(query_dict, doseq=True)
|
||||||
|
@ -904,7 +904,7 @@ def scheduled_chat(executing_query: str, scheduling_query: str, subject: str, us
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Extract the AI response from the chat API response
|
# Extract the AI response from the chat API response
|
||||||
cleaned_query = re.sub(r"^/task\s*", "", scheduling_query).strip()
|
cleaned_query = re.sub(r"^/task\s*", "", query_to_run).strip()
|
||||||
if raw_response.headers.get("Content-Type") == "application/json":
|
if raw_response.headers.get("Content-Type") == "application/json":
|
||||||
response_map = raw_response.json()
|
response_map = raw_response.json()
|
||||||
ai_response = response_map.get("response") or response_map.get("image")
|
ai_response = response_map.get("response") or response_map.get("image")
|
||||||
|
@ -912,9 +912,9 @@ def scheduled_chat(executing_query: str, scheduling_query: str, subject: str, us
|
||||||
ai_response = raw_response.text
|
ai_response = raw_response.text
|
||||||
|
|
||||||
# Notify user if the AI response is satisfactory
|
# Notify user if the AI response is satisfactory
|
||||||
if should_notify(original_query=scheduling_query, executed_query=cleaned_query, ai_response=ai_response):
|
if should_notify(original_query=scheduling_request, executed_query=cleaned_query, ai_response=ai_response):
|
||||||
if is_resend_enabled():
|
if is_resend_enabled():
|
||||||
send_task_email(user.get_short_name(), user.email, scheduling_query, ai_response, subject)
|
send_task_email(user.get_short_name(), user.email, scheduling_request, ai_response, subject)
|
||||||
else:
|
else:
|
||||||
return raw_response
|
return raw_response
|
||||||
|
|
||||||
|
@ -923,14 +923,14 @@ async def create_scheduled_task(
|
||||||
q: str, location: LocationData, timezone: str, user: KhojUser, calling_url: URL, meta_log: dict = {}
|
q: str, location: LocationData, timezone: str, user: KhojUser, calling_url: URL, meta_log: dict = {}
|
||||||
):
|
):
|
||||||
user_timezone = pytz.timezone(timezone)
|
user_timezone = pytz.timezone(timezone)
|
||||||
crontime, inferred_query, subject = await schedule_query(q, location, meta_log)
|
crontime_string, query_to_run, subject = await schedule_query(q, location, meta_log)
|
||||||
trigger = CronTrigger.from_crontab(crontime, user_timezone)
|
trigger = CronTrigger.from_crontab(crontime_string, user_timezone)
|
||||||
# Generate id and metadata used by task scheduler and process locks for the task runs
|
# Generate id and metadata used by task scheduler and process locks for the task runs
|
||||||
job_metadata = json.dumps(
|
job_metadata = json.dumps(
|
||||||
{"inferred_query": inferred_query, "original_query": q, "subject": subject, "crontime": crontime}
|
{"query_to_run": query_to_run, "scheduling_request": q, "subject": subject, "crontime": crontime_string}
|
||||||
)
|
)
|
||||||
query_id = hashlib.md5(f"{inferred_query}".encode("utf-8")).hexdigest()
|
query_id = hashlib.md5(f"{query_to_run}".encode("utf-8")).hexdigest()
|
||||||
job_id = f"job_{user.uuid}_{crontime}_{query_id}"
|
job_id = f"job_{user.uuid}_{crontime_string}_{query_id}"
|
||||||
job = state.scheduler.add_job(
|
job = state.scheduler.add_job(
|
||||||
run_with_process_lock,
|
run_with_process_lock,
|
||||||
trigger=trigger,
|
trigger=trigger,
|
||||||
|
@ -939,8 +939,8 @@ async def create_scheduled_task(
|
||||||
f"{ProcessLock.Operation.SCHEDULED_JOB}_{user.uuid}_{query_id}",
|
f"{ProcessLock.Operation.SCHEDULED_JOB}_{user.uuid}_{query_id}",
|
||||||
),
|
),
|
||||||
kwargs={
|
kwargs={
|
||||||
"executing_query": inferred_query,
|
"query_to_run": query_to_run,
|
||||||
"scheduling_query": q,
|
"scheduling_request": q,
|
||||||
"subject": subject,
|
"subject": subject,
|
||||||
"user": user,
|
"user": user,
|
||||||
"calling_url": calling_url,
|
"calling_url": calling_url,
|
||||||
|
@ -950,4 +950,4 @@ async def create_scheduled_task(
|
||||||
max_instances=2, # Allow second instance to kill any previous instance with stale lock
|
max_instances=2, # Allow second instance to kill any previous instance with stale lock
|
||||||
jitter=30,
|
jitter=30,
|
||||||
)
|
)
|
||||||
return job, crontime, inferred_query, subject
|
return job, crontime_string, query_to_run, subject
|
||||||
|
|
Loading…
Reference in a new issue