From 4471c1e37fdf09d7c8bd20a9e35f29bc72b7dead Mon Sep 17 00:00:00 2001 From: sabaimran Date: Tue, 9 Jul 2024 12:22:58 +0530 Subject: [PATCH] Apply mitigations for piling up open connections - Because we're using a FastAPI api framework with a Django ORM, we're running into some interesting conditions around connection pooling and clean-up. We're ending up with a large pile-up of open, stale connections to the DB recurringly when the server has been running for a while. To mitigate this problem, given starlette and django run in different python threads, add a middleware that will go and call the connection clean up method in each of the threads. --- pyproject.toml | 2 +- src/khoj/app/settings.py | 2 ++ src/khoj/configure.py | 48 +++++++++++++++++++++++++++++++--------- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index de82586f..36e6f579 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,7 @@ dependencies = [ "tenacity == 8.3.0", "anyio == 3.7.1", "pymupdf >= 1.23.5", - "django == 4.2.11", + "django == 5.0.6", "authlib == 1.2.1", "llama-cpp-python == 0.2.76", "itsdangerous == 2.1.2", diff --git a/src/khoj/app/settings.py b/src/khoj/app/settings.py index 99b784bc..54882e06 100644 --- a/src/khoj/app/settings.py +++ b/src/khoj/app/settings.py @@ -110,6 +110,8 @@ TEMPLATES = [ ASGI_APPLICATION = "app.asgi.application" +CLOSE_CONNECTIONS_AFTER_REQUEST = True + # Database # https://docs.djangoproject.com/en/4.2/ref/settings/#databases DATA_UPLOAD_MAX_NUMBER_FIELDS = 20000 diff --git a/src/khoj/configure.py b/src/khoj/configure.py index 7e413194..6c4d8484 100644 --- a/src/khoj/configure.py +++ b/src/khoj/configure.py @@ -1,13 +1,16 @@ import json import logging import os -from datetime import datetime, timedelta +from datetime import datetime from enum import Enum from typing import Optional import openai import requests import schedule +from asgiref.sync import sync_to_async +from django.conf import settings +from django.db import close_old_connections, connections from django.utils.timezone import make_aware from fastapi import Response from starlette.authentication import ( @@ -16,8 +19,10 @@ from starlette.authentication import ( SimpleUser, UnauthenticatedUser, ) +from starlette.concurrency import run_in_threadpool from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware +from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.sessions import SessionMiddleware from starlette.requests import HTTPConnection from starlette.types import ASGIApp, Receive, Scope, Send @@ -42,7 +47,7 @@ from khoj.routers.twilio import is_twilio_enabled from khoj.utils import constants, state from khoj.utils.config import SearchType from khoj.utils.fs_syncer import collect_files -from khoj.utils.helpers import is_none_or_empty, telemetry_disabled, timer +from khoj.utils.helpers import is_none_or_empty, telemetry_disabled from khoj.utils.rawconfig import FullConfig logger = logging.getLogger(__name__) @@ -55,6 +60,35 @@ class AuthenticatedKhojUser(SimpleUser): super().__init__(user.username) +class AsyncCloseConnectionsMiddleware(BaseHTTPMiddleware): + """ + Using this middleware to call close_old_connections() twice is a pretty yucky hack, + as it appears that run_in_threadpool (used by Starlette/FastAPI) and sync_to_async + (used by Django) have divergent behavior, ultimately acquiring the incorrect thread + in mixed sync/async which has the effect of duplicating connections. + We could fix the duplicate connections too if we normalized the thread behavior, + but at minimum we need to clean up connections in each case to prevent persistent + "InterfaceError: connection already closed" errors when the database connection is + reset via a database restart or something -- so here we are! + If we always use smart_sync_to_async(), this double calling isn't necessary, but + depending on what levels of abstraction we introduce, we might silently break the + assumptions. Better to be safe than sorry! + Attribution: https://gist.github.com/bryanhelmig/6fb091f23c1a4b7462dddce51cfaa1ca + """ + + async def dispatch(self, request, call_next): + await run_in_threadpool(close_old_connections) + await sync_to_async(close_old_connections)() + try: + response = await call_next(request) + finally: + # in tests, use @override_settings(CLOSE_CONNECTIONS_AFTER_REQUEST=True) + if getattr(settings, "CLOSE_CONNECTIONS_AFTER_REQUEST", False): + await run_in_threadpool(connections.close_all) + await sync_to_async(connections.close_all)() + return response + + class UserAuthenticationBackend(AuthenticationBackend): def __init__( self, @@ -77,7 +111,6 @@ class UserAuthenticationBackend(AuthenticationBackend): Subscription.objects.create(user=default_user, type="standard", renewal_date=renewal_date) async def authenticate(self, request: HTTPConnection): - # Request from Web client current_user = request.session.get("user") if current_user and current_user.get("email"): user = ( @@ -318,6 +351,7 @@ def configure_middleware(app): super().__init__(app) self.app = app + app.add_middleware(AsyncCloseConnectionsMiddleware) app.add_middleware(AuthenticationMiddleware, backend=UserAuthenticationBackend()) app.add_middleware(NextJsMiddleware) app.add_middleware(SessionMiddleware, secret_key=os.environ.get("KHOJ_DJANGO_SECRET_KEY", "!secret")) @@ -341,14 +375,6 @@ def update_content_index_regularly(): ) -@schedule.repeat(schedule.every(30).to(59).minutes) -def close_all_db_connections(): - from django import db - - db.close_old_connections() - logger.info("🔌 Closed all database connections for explicit recycling.") - - def configure_search_types(): # Extract core search types core_search_types = {e.name: e.value for e in SearchType}