In the time-based job for managing the schedule leader, and logic to create a new lock when the current one is expired.

This commit is contained in:
sabaimran 2024-08-08 12:42:59 +05:30
parent 7ee0d9067d
commit 64b2073e63
2 changed files with 28 additions and 3 deletions

View file

@ -417,6 +417,27 @@ def delete_old_user_requests():
@schedule.repeat(schedule.every(17).minutes)
def wakeup_scheduler():
# Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers.
TWELVE_HOURS = 43200
# If the worker currently possesses a process lock, check if it is valid.
if state.schedule_leader_process_lock:
if not ProcessLockAdapters.is_process_locked(state.schedule_leader_process_lock):
state.schedule_leader_process_lock = None
state.scheduler.pause()
# Get the current process lock
schedule_leader_process_lock = ProcessLockAdapters.get_process_lock(ProcessLock.Operation.SCHEDULE_LEADER)
# Check if the process lock is still active. If not, create a new process lock. This worker will become the scheduler leader.
if not ProcessLockAdapters.is_process_locked(schedule_leader_process_lock):
schedule_leader_process_lock = ProcessLockAdapters.set_process_lock(
ProcessLock.Operation.SCHEDULE_LEADER, max_duration_in_seconds=TWELVE_HOURS
)
state.schedule_leader_process_lock = schedule_leader_process_lock
state.scheduler.resume()
logger.info("🔔 Scheduler leader process lock acquired")
if state.schedule_leader_process_lock:
state.scheduler.wakeup()
else:

View file

@ -449,15 +449,19 @@ class ProcessLockAdapters:
return ProcessLock.objects.create(name=process_name, max_duration_in_seconds=max_duration_in_seconds)
@staticmethod
def is_process_locked(process_name: str):
def is_process_locked_by_name(process_name: str):
process_lock = ProcessLock.objects.filter(name=process_name).first()
if not process_lock:
return False
return ProcessLockAdapters.is_process_locked(process_lock)
@staticmethod
def is_process_locked(process_lock: ProcessLock):
if process_lock.started_at + timedelta(seconds=process_lock.max_duration_in_seconds) < datetime.now(
tz=timezone.utc
):
process_lock.delete()
logger.info(f"🔓 Deleted stale {process_name} process lock on timeout")
logger.info(f"🔓 Deleted stale {process_lock.name} process lock on timeout")
return False
return True
@ -468,7 +472,7 @@ class ProcessLockAdapters:
@staticmethod
def run_with_lock(func: Callable, operation: ProcessLock.Operation, max_duration_in_seconds: int = 600, **kwargs):
# Exit early if process lock is already taken
if ProcessLockAdapters.is_process_locked(operation):
if ProcessLockAdapters.is_process_locked_by_name(operation):
logger.debug(f"🔒 Skip executing {func} as {operation} lock is already taken")
return