Automatically update search index hourly

- c535953 Update index automatically in non GUI mode too
- 701d92e Lock the index before updating it via API or Scheduler
- 3b0783a Automate updating embeddings, search index on a hourly schedule

Resolves #106
This commit is contained in:
Debanjum 2023-01-02 00:37:59 +00:00 committed by GitHub
commit fe1398401d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 2 deletions

View file

@ -41,6 +41,7 @@ setup(
"dateparser == 1.1.1", "dateparser == 1.1.1",
"pyqt6 == 6.3.1", "pyqt6 == 6.3.1",
"defusedxml == 0.7.1", "defusedxml == 0.7.1",
'schedule == 1.1.0',
], ],
include_package_data=True, include_package_data=True,
entry_points={"console_scripts": ["khoj = src.main:run"]}, entry_points={"console_scripts": ["khoj = src.main:run"]},

View file

@ -3,6 +3,9 @@ import sys
import logging import logging
import json import json
# External Packages
import schedule
# Internal Packages # Internal Packages
from src.processor.ledger.beancount_to_jsonl import BeancountToJsonl from src.processor.ledger.beancount_to_jsonl import BeancountToJsonl
from src.processor.markdown.markdown_to_jsonl import MarkdownToJsonl from src.processor.markdown.markdown_to_jsonl import MarkdownToJsonl
@ -31,12 +34,22 @@ def configure_server(args, required=False):
state.config = args.config state.config = args.config
# Initialize the search model from Config # Initialize the search model from Config
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, args.regenerate) state.model = configure_search(state.model, state.config, args.regenerate)
state.search_index_lock.release()
# Initialize Processor from Config # Initialize Processor from Config
state.processor_config = configure_processor(args.config.processor) state.processor_config = configure_processor(args.config.processor)
@schedule.repeat(schedule.every(1).hour)
def update_search_index():
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=False)
state.search_index_lock.release()
logger.info("Search Index updated via Scheduler")
def configure_search(model: SearchModels, config: FullConfig, regenerate: bool, t: SearchType = None): def configure_search(model: SearchModels, config: FullConfig, regenerate: bool, t: SearchType = None):
# Initialize Org Notes Search # Initialize Org Notes Search
if (t == SearchType.Org or t == None) and config.content_type.org: if (t == SearchType.Org or t == None) and config.content_type.org:

View file

@ -3,6 +3,7 @@ import os
import signal import signal
import sys import sys
import logging import logging
import threading
import warnings import warnings
from platform import system from platform import system
@ -16,6 +17,7 @@ from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from PyQt6 import QtWidgets from PyQt6 import QtWidgets
from PyQt6.QtCore import QThread, QTimer from PyQt6.QtCore import QThread, QTimer
import schedule
# Internal Packages # Internal Packages
from src.configure import configure_server from src.configure import configure_server
@ -72,6 +74,8 @@ def run():
logger.info("Starting Khoj...") logger.info("Starting Khoj...")
if args.no_gui: if args.no_gui:
# Setup task scheduler
poll_task_scheduler()
# Start Server # Start Server
configure_server(args, required=True) configure_server(args, required=True)
start_server(app, host=args.host, port=args.port, socket=args.socket) start_server(app, host=args.host, port=args.port, socket=args.socket)
@ -99,10 +103,10 @@ def run():
# Setup Signal Handlers # Setup Signal Handlers
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGINT, sigint_handler)
# Invoke python Interpreter every 500ms to handle signals # Invoke Python interpreter every 500ms to handle signals, run scheduled tasks
timer = QTimer() timer = QTimer()
timer.start(500) timer.start(500)
timer.timeout.connect(lambda: None) timer.timeout.connect(schedule.run_pending)
# Start Application # Start Application
server.start() server.start()
@ -142,6 +146,13 @@ def start_server(app, host=None, port=None, socket=None):
uvicorn.run(app, host=host, port=port) uvicorn.run(app, host=host, port=port)
def poll_task_scheduler():
timer_thread = threading.Timer(60.0, poll_task_scheduler)
timer_thread.daemon = True
timer_thread.start()
schedule.run_pending()
class ServerThread(QThread): class ServerThread(QThread):
def __init__(self, app, host=None, port=None, socket=None): def __init__(self, app, host=None, port=None, socket=None):
super(ServerThread, self).__init__() super(ServerThread, self).__init__()

View file

@ -125,5 +125,9 @@ def search(q: str, n: Optional[int] = 5, t: Optional[SearchType] = None, r: Opti
@api.get('/update') @api.get('/update')
def update(t: Optional[SearchType] = None, force: Optional[bool] = False): def update(t: Optional[SearchType] = None, force: Optional[bool] = False):
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=force, t=t) state.model = configure_search(state.model, state.config, regenerate=force, t=t)
state.search_index_lock.release()
logger.info("Search Index updated via API call")
return {'status': 'ok', 'message': 'index updated'} return {'status': 'ok', 'message': 'index updated'}

View file

@ -1,4 +1,5 @@
# Standard Packages # Standard Packages
import threading
from packaging import version from packaging import version
# External Packages # External Packages
@ -20,6 +21,7 @@ host: str = None
port: int = None port: int = None
cli_args: list[str] = None cli_args: list[str] = None
query_cache = LRU() query_cache = LRU()
search_index_lock = threading.Lock()
if torch.cuda.is_available(): if torch.cuda.is_available():
# Use CUDA GPU # Use CUDA GPU