mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-11-23 15:38:55 +01:00
Allow just one worker to be the background schedule leader (#836)
* Add a leader election mechanism to circumvent runtime issues for multiple schedulers - Reduce the load on the DB and risk of issues on the service side by limiting the execution environment to one elected leader at a given time. This one is responsible for managing all of the execution of the jobs, though all workers are capable of adding and removing jobs * Set a max duration for the schedule leader position (12 hrs), add some error if automation not added successfully
This commit is contained in:
parent
80fe5ce182
commit
c83b8f2768
6 changed files with 70 additions and 2 deletions
|
@ -386,3 +386,13 @@ def upload_telemetry():
|
||||||
def delete_old_user_requests():
|
def delete_old_user_requests():
|
||||||
num_deleted = delete_user_requests()
|
num_deleted = delete_user_requests()
|
||||||
logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests")
|
logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests")
|
||||||
|
|
||||||
|
|
||||||
|
@schedule.repeat(schedule.every(17).minutes)
|
||||||
|
def wakeup_scheduler():
|
||||||
|
# Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers.
|
||||||
|
if state.schedule_leader_process_lock:
|
||||||
|
state.scheduler.wakeup()
|
||||||
|
else:
|
||||||
|
# Make sure the other workers don't run the scheduled tasks
|
||||||
|
state.scheduler.pause()
|
||||||
|
|
25
src/khoj/database/migrations/0050_alter_processlock_name.py
Normal file
25
src/khoj/database/migrations/0050_alter_processlock_name.py
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
# Generated by Django 4.2.11 on 2024-06-28 06:11
|
||||||
|
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
dependencies = [
|
||||||
|
("database", "0049_datastore"),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name="processlock",
|
||||||
|
name="name",
|
||||||
|
field=models.CharField(
|
||||||
|
choices=[
|
||||||
|
("index_content", "Index Content"),
|
||||||
|
("scheduled_job", "Scheduled Job"),
|
||||||
|
("schedule_leader", "Schedule Leader"),
|
||||||
|
],
|
||||||
|
max_length=200,
|
||||||
|
unique=True,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
]
|
|
@ -120,6 +120,7 @@ class ProcessLock(BaseModel):
|
||||||
class Operation(models.TextChoices):
|
class Operation(models.TextChoices):
|
||||||
INDEX_CONTENT = "index_content"
|
INDEX_CONTENT = "index_content"
|
||||||
SCHEDULED_JOB = "scheduled_job"
|
SCHEDULED_JOB = "scheduled_job"
|
||||||
|
SCHEDULE_LEADER = "schedule_leader"
|
||||||
|
|
||||||
# We need to make sure that some operations are thread-safe. To do so, add locks for potentially shared operations.
|
# We need to make sure that some operations are thread-safe. To do so, add locks for potentially shared operations.
|
||||||
# For example, we need to make sure that only one process is updating the embeddings at a time.
|
# For example, we need to make sure that only one process is updating the embeddings at a time.
|
||||||
|
|
|
@ -94,11 +94,22 @@ from khoj.configure import configure_routes, initialize_server, configure_middle
|
||||||
from khoj.utils import state
|
from khoj.utils import state
|
||||||
from khoj.utils.cli import cli
|
from khoj.utils.cli import cli
|
||||||
from khoj.utils.initialization import initialization
|
from khoj.utils.initialization import initialization
|
||||||
|
from khoj.database.adapters import ProcessLockAdapters
|
||||||
|
from khoj.database.models import ProcessLock
|
||||||
|
|
||||||
|
from django.db.utils import IntegrityError
|
||||||
|
|
||||||
|
SCHEDULE_LEADER_NAME = ProcessLock.Operation.SCHEDULE_LEADER
|
||||||
|
|
||||||
|
|
||||||
def shutdown_scheduler():
|
def shutdown_scheduler():
|
||||||
logger.info("🌑 Shutting down Khoj")
|
logger.info("🌑 Shutting down Khoj")
|
||||||
# state.scheduler.shutdown()
|
|
||||||
|
if state.schedule_leader_process_lock:
|
||||||
|
logger.info("🔓 Schedule Leader released")
|
||||||
|
ProcessLockAdapters.remove_process_lock(state.schedule_leader_process_lock)
|
||||||
|
|
||||||
|
state.scheduler.shutdown()
|
||||||
|
|
||||||
|
|
||||||
def run(should_start_server=True):
|
def run(should_start_server=True):
|
||||||
|
@ -146,7 +157,25 @@ def run(should_start_server=True):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
state.scheduler.add_jobstore(DjangoJobStore(), "default")
|
state.scheduler.add_jobstore(DjangoJobStore(), "default")
|
||||||
state.scheduler.start()
|
|
||||||
|
# We use this mechanism to only elect one schedule leader in a distributed environment. This one will be responsible for actually executing the scheduled tasks. The others will still be capable of adding and removing tasks, but they will not execute them. This is to decrease the overall burden on the database and the system.
|
||||||
|
try:
|
||||||
|
schedule_leader_process_lock = ProcessLockAdapters.get_process_lock(SCHEDULE_LEADER_NAME)
|
||||||
|
if schedule_leader_process_lock:
|
||||||
|
logger.info("🔒 Schedule Leader is already running")
|
||||||
|
state.scheduler.start(paused=True)
|
||||||
|
else:
|
||||||
|
logger.info("🔒 Schedule Leader elected")
|
||||||
|
created_process_lock = ProcessLockAdapters.set_process_lock(
|
||||||
|
SCHEDULE_LEADER_NAME, max_duration_in_seconds=43200
|
||||||
|
)
|
||||||
|
state.scheduler.start()
|
||||||
|
state.schedule_leader_process_lock = created_process_lock
|
||||||
|
except IntegrityError:
|
||||||
|
logger.info("🔒 Schedule Leader running elsewhere")
|
||||||
|
state.scheduler.start(paused=True)
|
||||||
|
finally:
|
||||||
|
logger.info("Started Background Scheduler")
|
||||||
|
|
||||||
# Start Server
|
# Start Server
|
||||||
configure_routes(app)
|
configure_routes(app)
|
||||||
|
|
|
@ -569,6 +569,7 @@ def edit_job(
|
||||||
try:
|
try:
|
||||||
automation: Job = AutomationAdapters.get_automation(user, automation_id)
|
automation: Job = AutomationAdapters.get_automation(user, automation_id)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
|
logger.error(f"Error editing automation {automation_id} for {user.email}: {e}", exc_info=True)
|
||||||
return Response(content="Invalid automation", status_code=403)
|
return Response(content="Invalid automation", status_code=403)
|
||||||
|
|
||||||
# Normalize query parameters
|
# Normalize query parameters
|
||||||
|
|
|
@ -8,6 +8,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from openai import OpenAI
|
from openai import OpenAI
|
||||||
from whisper import Whisper
|
from whisper import Whisper
|
||||||
|
|
||||||
|
from khoj.database.models import ProcessLock
|
||||||
from khoj.processor.embeddings import CrossEncoderModel, EmbeddingsModel
|
from khoj.processor.embeddings import CrossEncoderModel, EmbeddingsModel
|
||||||
from khoj.utils import config as utils_config
|
from khoj.utils import config as utils_config
|
||||||
from khoj.utils.config import OfflineChatProcessorModel, SearchModels
|
from khoj.utils.config import OfflineChatProcessorModel, SearchModels
|
||||||
|
@ -31,6 +32,7 @@ query_cache: Dict[str, LRU] = defaultdict(LRU)
|
||||||
chat_lock = threading.Lock()
|
chat_lock = threading.Lock()
|
||||||
SearchType = utils_config.SearchType
|
SearchType = utils_config.SearchType
|
||||||
scheduler: BackgroundScheduler = None
|
scheduler: BackgroundScheduler = None
|
||||||
|
schedule_leader_process_lock: ProcessLock = None
|
||||||
telemetry: List[Dict[str, str]] = []
|
telemetry: List[Dict[str, str]] = []
|
||||||
khoj_version: str = None
|
khoj_version: str = None
|
||||||
device = get_device()
|
device = get_device()
|
||||||
|
|
Loading…
Reference in a new issue