From 3b0783aab9a9486c9d202b84d2ec02eb99c7f880 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 1 Jan 2023 16:22:35 -0300 Subject: [PATCH 1/3] Automate updating embeddings, search index on a hourly schedule - Use the schedule pypi package - Use QTimer to poll schedule.run_pending() regularly for jobs to run --- setup.py | 1 + src/configure.py | 9 +++++++++ src/main.py | 5 +++-- src/routers/api.py | 2 ++ 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 558a23ab..add6e6e9 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ setup( "dateparser == 1.1.1", "pyqt6 == 6.3.1", "defusedxml == 0.7.1", + 'schedule == 1.1.0', ], include_package_data=True, entry_points={"console_scripts": ["khoj = src.main:run"]}, diff --git a/src/configure.py b/src/configure.py index f38b7d11..81952280 100644 --- a/src/configure.py +++ b/src/configure.py @@ -3,6 +3,9 @@ import sys import logging import json +# External Packages +import schedule + # Internal Packages from src.processor.ledger.beancount_to_jsonl import BeancountToJsonl from src.processor.markdown.markdown_to_jsonl import MarkdownToJsonl @@ -37,6 +40,12 @@ def configure_server(args, required=False): state.processor_config = configure_processor(args.config.processor) +@schedule.repeat(schedule.every(1).hour) +def update_search_index(): + state.model = configure_search(state.model, state.config, regenerate=False) + logger.info("Search Index updated via Scheduler") + + def configure_search(model: SearchModels, config: FullConfig, regenerate: bool, t: SearchType = None): # Initialize Org Notes Search if (t == SearchType.Org or t == None) and config.content_type.org: diff --git a/src/main.py b/src/main.py index 6b81bfba..a44ef3ca 100644 --- a/src/main.py +++ b/src/main.py @@ -16,6 +16,7 @@ from fastapi import FastAPI from fastapi.staticfiles import StaticFiles from PyQt6 import QtWidgets from PyQt6.QtCore import QThread, QTimer +import schedule # Internal Packages from src.configure import configure_server @@ -99,10 +100,10 @@ def run(): # Setup Signal Handlers signal.signal(signal.SIGINT, sigint_handler) - # Invoke python Interpreter every 500ms to handle signals + # Invoke Python interpreter every 500ms to handle signals, run scheduled tasks timer = QTimer() timer.start(500) - timer.timeout.connect(lambda: None) + timer.timeout.connect(schedule.run_pending) # Start Application server.start() diff --git a/src/routers/api.py b/src/routers/api.py index c8347f03..313e48c9 100644 --- a/src/routers/api.py +++ b/src/routers/api.py @@ -126,4 +126,6 @@ def search(q: str, n: Optional[int] = 5, t: Optional[SearchType] = None, r: Opti @api.get('/update') def update(t: Optional[SearchType] = None, force: Optional[bool] = False): state.model = configure_search(state.model, state.config, regenerate=force, t=t) + logger.info("Search Index updated via API call") + return {'status': 'ok', 'message': 'index updated'} From 701d92e17b90a9a8add094b8b437b2f5b13aad40 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 1 Jan 2023 16:40:47 -0300 Subject: [PATCH 2/3] Lock the index before updating it via API or Scheduler - There are 3 paths to updating/setting the index (stored in state.model) - App start - API - Scheduler - Put all updates to the index behind a lock. As multiple updates path that could (potentially) run at the same time (via API or Scheduler) --- src/configure.py | 4 ++++ src/routers/api.py | 2 ++ src/utils/state.py | 2 ++ 3 files changed, 8 insertions(+) diff --git a/src/configure.py b/src/configure.py index 81952280..45e1661e 100644 --- a/src/configure.py +++ b/src/configure.py @@ -34,7 +34,9 @@ def configure_server(args, required=False): state.config = args.config # Initialize the search model from Config + state.search_index_lock.acquire() state.model = configure_search(state.model, state.config, args.regenerate) + state.search_index_lock.release() # Initialize Processor from Config state.processor_config = configure_processor(args.config.processor) @@ -42,7 +44,9 @@ def configure_server(args, required=False): @schedule.repeat(schedule.every(1).hour) def update_search_index(): + state.search_index_lock.acquire() state.model = configure_search(state.model, state.config, regenerate=False) + state.search_index_lock.release() logger.info("Search Index updated via Scheduler") diff --git a/src/routers/api.py b/src/routers/api.py index 313e48c9..f92a6f1e 100644 --- a/src/routers/api.py +++ b/src/routers/api.py @@ -125,7 +125,9 @@ def search(q: str, n: Optional[int] = 5, t: Optional[SearchType] = None, r: Opti @api.get('/update') def update(t: Optional[SearchType] = None, force: Optional[bool] = False): + state.search_index_lock.acquire() state.model = configure_search(state.model, state.config, regenerate=force, t=t) + state.search_index_lock.release() logger.info("Search Index updated via API call") return {'status': 'ok', 'message': 'index updated'} diff --git a/src/utils/state.py b/src/utils/state.py index 283d2b5a..0e323b89 100644 --- a/src/utils/state.py +++ b/src/utils/state.py @@ -1,4 +1,5 @@ # Standard Packages +import threading from packaging import version # External Packages @@ -20,6 +21,7 @@ host: str = None port: int = None cli_args: list[str] = None query_cache = LRU() +search_index_lock = threading.Lock() if torch.cuda.is_available(): # Use CUDA GPU From c5359539155e193589954cfd08758f672809ccc6 Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Sun, 1 Jan 2023 20:48:13 -0300 Subject: [PATCH 3/3] Update index automatically in non GUI mode too - Poll scheduler every minute using threading.Timer - Use 60 seconds polling interval to avoid fork bombing - Schedule next via the same poll scheduler - Allow clean program interrupt by running scheduler in daemon mode --- src/main.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main.py b/src/main.py index a44ef3ca..20ef8121 100644 --- a/src/main.py +++ b/src/main.py @@ -3,6 +3,7 @@ import os import signal import sys import logging +import threading import warnings from platform import system @@ -73,6 +74,8 @@ def run(): logger.info("Starting Khoj...") if args.no_gui: + # Setup task scheduler + poll_task_scheduler() # Start Server configure_server(args, required=True) start_server(app, host=args.host, port=args.port, socket=args.socket) @@ -143,6 +146,13 @@ def start_server(app, host=None, port=None, socket=None): uvicorn.run(app, host=host, port=port) +def poll_task_scheduler(): + timer_thread = threading.Timer(60.0, poll_task_scheduler) + timer_thread.daemon = True + timer_thread.start() + schedule.run_pending() + + class ServerThread(QThread): def __init__(self, app, host=None, port=None, socket=None): super(ServerThread, self).__init__()