diff --git a/src/khoj/configure.py b/src/khoj/configure.py index 56e5f209..0ed5aa63 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -3,6 +3,7 @@ import logging import os from datetime import datetime from enum import Enum +from functools import wraps from typing import Optional import openai @@ -198,6 +199,26 @@ def initialize_server(config: Optional[FullConfig]): raise e +def clean_connections(func): + """ + A decorator that ensures that Django database connections that have become unusable, or are obsolete, are closed + before and after a method is executed (see: https://docs.djangoproject.com/en/dev/ref/databases/#general-notes + for background). + """ + + @wraps(func) + def func_wrapper(*args, **kwargs): + close_old_connections() + try: + result = func(*args, **kwargs) + finally: + close_old_connections() + + return result + + return func_wrapper + + def configure_server( config: FullConfig, regenerate: bool = False, @@ -349,6 +370,7 @@ def update_content_index(): @schedule.repeat(schedule.every(22).to(25).hours) +@clean_connections def update_content_index_regularly(): ProcessLockAdapters.run_with_lock( update_content_index, ProcessLock.Operation.INDEX_CONTENT, max_duration_in_seconds=60 * 60 * 2 @@ -364,6 +386,7 @@ def configure_search_types(): @schedule.repeat(schedule.every(2).minutes) +@clean_connections def upload_telemetry(): if telemetry_disabled(state.config.app, state.telemetry_disabled) or not state.telemetry: return @@ -389,12 +412,14 @@ def upload_telemetry(): @schedule.repeat(schedule.every(31).minutes) +@clean_connections def delete_old_user_requests(): num_deleted = delete_user_requests() logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests") @schedule.repeat(schedule.every(17).minutes) +@clean_connections def wakeup_scheduler(): # Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers. TWELVE_HOURS = 43200