From 64b2073e63847d70d1e2ad3c01e12c6a4ad98e60 Mon Sep 17 00:00:00 2001 From: sabaimran Date: Thu, 8 Aug 2024 12:42:59 +0530 Subject: [PATCH] In the time-based job for managing the schedule leader, and logic to create a new lock when the current one is expired. --- src/khoj/configure.py | 21 +++++++++++++++++++++ src/khoj/database/adapters/__init__.py | 10 +++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/khoj/configure.py b/src/khoj/configure.py index a77ade87..8d9c4489 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -417,6 +417,27 @@ def delete_old_user_requests(): @schedule.repeat(schedule.every(17).minutes) def wakeup_scheduler(): # Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers. + TWELVE_HOURS = 43200 + + # If the worker currently possesses a process lock, check if it is valid. + + if state.schedule_leader_process_lock: + if not ProcessLockAdapters.is_process_locked(state.schedule_leader_process_lock): + state.schedule_leader_process_lock = None + state.scheduler.pause() + + # Get the current process lock + schedule_leader_process_lock = ProcessLockAdapters.get_process_lock(ProcessLock.Operation.SCHEDULE_LEADER) + + # Check if the process lock is still active. If not, create a new process lock. This worker will become the scheduler leader. + if not ProcessLockAdapters.is_process_locked(schedule_leader_process_lock): + schedule_leader_process_lock = ProcessLockAdapters.set_process_lock( + ProcessLock.Operation.SCHEDULE_LEADER, max_duration_in_seconds=TWELVE_HOURS + ) + state.schedule_leader_process_lock = schedule_leader_process_lock + state.scheduler.resume() + logger.info("🔔 Scheduler leader process lock acquired") + if state.schedule_leader_process_lock: state.scheduler.wakeup() else: diff --git a/src/khoj/database/adapters/__init__.py b/src/khoj/database/adapters/__init__.py index 02264c38..c25bcb51 100644 --- a/src/khoj/database/adapters/__init__.py +++ b/src/khoj/database/adapters/__init__.py @@ -449,15 +449,19 @@ class ProcessLockAdapters: return ProcessLock.objects.create(name=process_name, max_duration_in_seconds=max_duration_in_seconds) @staticmethod - def is_process_locked(process_name: str): + def is_process_locked_by_name(process_name: str): process_lock = ProcessLock.objects.filter(name=process_name).first() if not process_lock: return False + return ProcessLockAdapters.is_process_locked(process_lock) + + @staticmethod + def is_process_locked(process_lock: ProcessLock): if process_lock.started_at + timedelta(seconds=process_lock.max_duration_in_seconds) < datetime.now( tz=timezone.utc ): process_lock.delete() - logger.info(f"🔓 Deleted stale {process_name} process lock on timeout") + logger.info(f"🔓 Deleted stale {process_lock.name} process lock on timeout") return False return True @@ -468,7 +472,7 @@ class ProcessLockAdapters: @staticmethod def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600, **kwargs): # Exit early if process lock is already taken - if ProcessLockAdapters.is_process_locked(operation): + if ProcessLockAdapters.is_process_locked_by_name(operation): logger.debug(f"🔒 Skip executing {func} as {operation} lock is already taken") return