diff --git a/src/khoj/configure.py b/src/khoj/configure.py index eca7f3e6..7e413194 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -386,3 +386,13 @@ def upload_telemetry(): def delete_old_user_requests(): num_deleted = delete_user_requests() logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests") + + +@schedule.repeat(schedule.every(17).minutes) +def wakeup_scheduler(): + # Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers. + if state.schedule_leader_process_lock: + state.scheduler.wakeup() + else: + # Make sure the other workers don't run the scheduled tasks + state.scheduler.pause() diff --git a/src/khoj/database/migrations/0050_alter_processlock_name.py b/src/khoj/database/migrations/0050_alter_processlock_name.py new file mode 100644 index 00000000..8ce68db6 --- /dev/null +++ b/src/khoj/database/migrations/0050_alter_processlock_name.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.11 on 2024-06-28 06:11 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0049_datastore"), + ] + + operations = [ + migrations.AlterField( + model_name="processlock", + name="name", + field=models.CharField( + choices=[ + ("index_content", "Index Content"), + ("scheduled_job", "Scheduled Job"), + ("schedule_leader", "Schedule Leader"), + ], + max_length=200, + unique=True, + ), + ), + ] diff --git a/src/khoj/database/models/__init__.py b/src/khoj/database/models/__init__.py index 708d5caf..174d5090 100644 --- a/src/khoj/database/models/__init__.py +++ b/src/khoj/database/models/__init__.py @@ -120,6 +120,7 @@ class ProcessLock(BaseModel): class Operation(models.TextChoices): INDEX_CONTENT = "index_content" SCHEDULED_JOB = "scheduled_job" + SCHEDULE_LEADER = "schedule_leader" # We need to make sure that some operations are thread-safe. To do so, add locks for potentially shared operations. # For example, we need to make sure that only one process is updating the embeddings at a time. diff --git a/src/khoj/main.py b/src/khoj/main.py index 7a73e0b7..9180d9e3 100644 --- a/src/khoj/main.py +++ b/src/khoj/main.py @@ -94,11 +94,22 @@ from khoj.configure import configure_routes, initialize_server, configure_middle from khoj.utils import state from khoj.utils.cli import cli from khoj.utils.initialization import initialization +from khoj.database.adapters import ProcessLockAdapters +from khoj.database.models import ProcessLock + +from django.db.utils import IntegrityError + +SCHEDULE_LEADER_NAME = ProcessLock.Operation.SCHEDULE_LEADER def shutdown_scheduler(): logger.info("🌑 Shutting down Khoj") - # state.scheduler.shutdown() + + if state.schedule_leader_process_lock: + logger.info("🔓 Schedule Leader released") + ProcessLockAdapters.remove_process_lock(state.schedule_leader_process_lock) + + state.scheduler.shutdown() def run(should_start_server=True): @@ -146,7 +157,25 @@ def run(should_start_server=True): } ) state.scheduler.add_jobstore(DjangoJobStore(), "default") - state.scheduler.start() + + # We use this mechanism to only elect one schedule leader in a distributed environment. This one will be responsible for actually executing the scheduled tasks. The others will still be capable of adding and removing tasks, but they will not execute them. This is to decrease the overall burden on the database and the system. + try: + schedule_leader_process_lock = ProcessLockAdapters.get_process_lock(SCHEDULE_LEADER_NAME) + if schedule_leader_process_lock: + logger.info("🔒 Schedule Leader is already running") + state.scheduler.start(paused=True) + else: + logger.info("🔒 Schedule Leader elected") + created_process_lock = ProcessLockAdapters.set_process_lock( + SCHEDULE_LEADER_NAME, max_duration_in_seconds=43200 + ) + state.scheduler.start() + state.schedule_leader_process_lock = created_process_lock + except IntegrityError: + logger.info("🔒 Schedule Leader running elsewhere") + state.scheduler.start(paused=True) + finally: + logger.info("Started Background Scheduler") # Start Server configure_routes(app) diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py index babac509..cbe19891 100644 --- a/src/khoj/routers/api.py +++ b/src/khoj/routers/api.py @@ -569,6 +569,7 @@ def edit_job( try: automation: Job = AutomationAdapters.get_automation(user, automation_id) except ValueError as e: + logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True) return Response(content="Invalid automation", status_code=403) # Normalize query parameters diff --git a/src/khoj/utils/state.py b/src/khoj/utils/state.py index 7439929f..4a8b36b5 100644 --- a/src/khoj/utils/state.py +++ b/src/khoj/utils/state.py @@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from openai import OpenAI from whisper import Whisper +from khoj.database.models import ProcessLock from khoj.processor.embeddings import CrossEncoderModel, EmbeddingsModel from khoj.utils import config as utils_config from khoj.utils.config import OfflineChatProcessorModel, SearchModels @@ -31,6 +32,7 @@ query_cache: Dict[str, LRU] = defaultdict(LRU) chat_lock = threading.Lock() SearchType = utils_config.SearchType scheduler: BackgroundScheduler = None +schedule_leader_process_lock: ProcessLock = None telemetry: List[Dict[str, str]] = [] khoj_version: str = None device = get_device()