And a connections clean up decorator to all scheduled tasks

This commit is contained in:
sabaimran 2024-11-18 17:19:36 -08:00
parent 817601872f
commit 8bdd0b26d3

View file

@ -3,6 +3,7 @@ import logging
import os import os
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from functools import wraps
from typing import Optional from typing import Optional
import openai import openai
@ -198,6 +199,26 @@ def initialize_server(config: Optional[FullConfig]):
raise e raise e
def clean_connections(func):
"""
A decorator that ensures that Django database connections that have become unusable, or are obsolete, are closed
before and after a method is executed (see: https://docs.djangoproject.com/en/dev/ref/databases/#general-notes
for background).
"""
@wraps(func)
def func_wrapper(*args, **kwargs):
close_old_connections()
try:
result = func(*args, **kwargs)
finally:
close_old_connections()
return result
return func_wrapper
def configure_server( def configure_server(
config: FullConfig, config: FullConfig,
regenerate: bool = False, regenerate: bool = False,
@ -349,6 +370,7 @@ def update_content_index():
@schedule.repeat(schedule.every(22).to(25).hours) @schedule.repeat(schedule.every(22).to(25).hours)
@clean_connections
def update_content_index_regularly(): def update_content_index_regularly():
ProcessLockAdapters.run_with_lock( ProcessLockAdapters.run_with_lock(
update_content_index, ProcessLock.Operation.INDEX_CONTENT, max_duration_in_seconds=60 * 60 * 2 update_content_index, ProcessLock.Operation.INDEX_CONTENT, max_duration_in_seconds=60 * 60 * 2
@ -364,6 +386,7 @@ def configure_search_types():
@schedule.repeat(schedule.every(2).minutes) @schedule.repeat(schedule.every(2).minutes)
@clean_connections
def upload_telemetry(): def upload_telemetry():
if telemetry_disabled(state.config.app, state.telemetry_disabled) or not state.telemetry: if telemetry_disabled(state.config.app, state.telemetry_disabled) or not state.telemetry:
return return
@ -389,12 +412,14 @@ def upload_telemetry():
@schedule.repeat(schedule.every(31).minutes) @schedule.repeat(schedule.every(31).minutes)
@clean_connections
def delete_old_user_requests(): def delete_old_user_requests():
num_deleted = delete_user_requests() num_deleted = delete_user_requests()
logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests") logger.debug(f"🗑️ Deleted {num_deleted[0]} day-old user requests")
@schedule.repeat(schedule.every(17).minutes) @schedule.repeat(schedule.every(17).minutes)
@clean_connections
def wakeup_scheduler(): def wakeup_scheduler():
# Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers. # Wake up the scheduler to ensure it runs the scheduled tasks. This is because the elected leader may not always be aware of tasks scheduled on other workers.
TWELVE_HOURS = 43200 TWELVE_HOURS = 43200