From c83b8f2768eb9093564da0e91a8ef4c39ba38258 Mon Sep 17 00:00:00 2001 From: sabaimran <65192171+sabaimran@users.noreply.github.com> Date: Fri, 28 Jun 2024 00:43:25 -0700 Subject: [PATCH] Allow just one worker to be the background schedule leader (#836) * Add a leader election mechanism to circumvent runtime issues for multiple schedulers - Reduce the load on the DB and risk of issues on the service side by limiting the execution environment to one elected leader at a given time. This one is responsible for managing all of the execution of the jobs, though all workers are capable of adding and removing jobs * Set a max duration for the schedule leader position (12 hrs), add some error if automation not added successfully --- src/khoj/configure.py | 10 ++++++ .../migrations/0050_alter_processlock_name.py | 25 ++++++++++++++ src/khoj/database/models/__init__.py | 1 + src/khoj/main.py | 33 +++++++++++++++++-- src/khoj/routers/api.py | 1 + src/khoj/utils/state.py | 2 ++ 6 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 src/khoj/database/migrations/0050_alter_processlock_name.py diff --git a/src/khoj/configure.py b/src/khoj/configure.py index eca7f3e6..7e413194 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -386,3 +386,13 @@ def upload_telemetry(): def delete_old_user_requests(): num_deleted = delete_user_requests() logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests") + + +@schedule.repeat(schedule.every(17).minutes) +def wakeup_scheduler(): + # Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers. + if state.schedule_leader_process_lock: + state.scheduler.wakeup() + else: + # Make sure the other workers don't run the scheduled tasks + state.scheduler.pause() diff --git a/src/khoj/database/migrations/0050_alter_processlock_name.py b/src/khoj/database/migrations/0050_alter_processlock_name.py new file mode 100644 index 00000000..8ce68db6 --- /dev/null +++ b/src/khoj/database/migrations/0050_alter_processlock_name.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.11 on 2024-06-28 06:11 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("database", "0049_datastore"), + ] + + operations = [ + migrations.AlterField( + model_name="processlock", + name="name", + field=models.CharField( + choices=[ + ("index_content", "Index Content"), + ("scheduled_job", "Scheduled Job"), + ("schedule_leader", "Schedule Leader"), + ], + max_length=200, + unique=True, + ), + ), + ] diff --git a/src/khoj/database/models/__init__.py b/src/khoj/database/models/__init__.py index 708d5caf..174d5090 100644 --- a/src/khoj/database/models/__init__.py +++ b/src/khoj/database/models/__init__.py @@ -120,6 +120,7 @@ class ProcessLock(BaseModel): class Operation(models.TextChoices): INDEX_CONTENT = "index_content" SCHEDULED_JOB = "scheduled_job" + SCHEDULE_LEADER = "schedule_leader" # We need to make sure that some operations are thread-safe. To do so, add locks for potentially shared operations. # For example, we need to make sure that only one process is updating the embeddings at a time. diff --git a/src/khoj/main.py b/src/khoj/main.py index 7a73e0b7..9180d9e3 100644 --- a/src/khoj/main.py +++ b/src/khoj/main.py @@ -94,11 +94,22 @@ from khoj.configure import configure_routes, initialize_server, configure_middle from khoj.utils import state from khoj.utils.cli import cli from khoj.utils.initialization import initialization +from khoj.database.adapters import ProcessLockAdapters +from khoj.database.models import ProcessLock + +from django.db.utils import IntegrityError + +SCHEDULE_LEADER_NAME = ProcessLock.Operation.SCHEDULE_LEADER def shutdown_scheduler(): logger.info("🌑 Shutting down Khoj") - # state.scheduler.shutdown() + + if state.schedule_leader_process_lock: + logger.info("🔓 Schedule Leader released") + ProcessLockAdapters.remove_process_lock(state.schedule_leader_process_lock) + + state.scheduler.shutdown() def run(should_start_server=True): @@ -146,7 +157,25 @@ def run(should_start_server=True): } ) state.scheduler.add_jobstore(DjangoJobStore(), "default") - state.scheduler.start() + + # We use this mechanism to only elect one schedule leader in a distributed environment. This one will be responsible for actually executing the scheduled tasks. The others will still be capable of adding and removing tasks, but they will not execute them. This is to decrease the overall burden on the database and the system. + try: + schedule_leader_process_lock = ProcessLockAdapters.get_process_lock(SCHEDULE_LEADER_NAME) + if schedule_leader_process_lock: + logger.info("🔒 Schedule Leader is already running") + state.scheduler.start(paused=True) + else: + logger.info("🔒 Schedule Leader elected") + created_process_lock = ProcessLockAdapters.set_process_lock( + SCHEDULE_LEADER_NAME, max_duration_in_seconds=43200 + ) + state.scheduler.start() + state.schedule_leader_process_lock = created_process_lock + except IntegrityError: + logger.info("🔒 Schedule Leader running elsewhere") + state.scheduler.start(paused=True) + finally: + logger.info("Started Background Scheduler") # Start Server configure_routes(app) diff --git a/src/khoj/routers/api.py b/src/khoj/routers/api.py index babac509..cbe19891 100644 --- a/src/khoj/routers/api.py +++ b/src/khoj/routers/api.py @@ -569,6 +569,7 @@ def edit_job( try: automation: Job = AutomationAdapters.get_automation(user, automation_id) except ValueError as e: + logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True) return Response(content="Invalid automation", status_code=403) # Normalize query parameters diff --git a/src/khoj/utils/state.py b/src/khoj/utils/state.py index 7439929f..4a8b36b5 100644 --- a/src/khoj/utils/state.py +++ b/src/khoj/utils/state.py @@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from openai import OpenAI from whisper import Whisper +from khoj.database.models import ProcessLock from khoj.processor.embeddings import CrossEncoderModel, EmbeddingsModel from khoj.utils import config as utils_config from khoj.utils.config import OfflineChatProcessorModel, SearchModels @@ -31,6 +32,7 @@ query_cache: Dict[str, LRU] = defaultdict(LRU) chat_lock = threading.Lock() SearchType = utils_config.SearchType scheduler: BackgroundScheduler = None +schedule_leader_process_lock: ProcessLock = None telemetry: List[Dict[str, str]] = [] khoj_version: str = None device = get_device()