Lock the index before updating it via API or Scheduler

- There are 3 paths to updating/setting the index (stored in state.model)
  - App start
  - API
  - Scheduler

- Put all updates to the index behind a lock. As multiple updates path
that could (potentially) run at the same time (via API or Scheduler)
This commit is contained in:
Debanjum Singh Solanky 2023-01-01 16:40:47 -03:00
parent 3b0783aab9
commit 701d92e17b
3 changed files with 8 additions and 0 deletions

View file

@ -34,7 +34,9 @@ def configure_server(args, required=False):
state.config = args.config state.config = args.config
# Initialize the search model from Config # Initialize the search model from Config
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, args.regenerate) state.model = configure_search(state.model, state.config, args.regenerate)
state.search_index_lock.release()
# Initialize Processor from Config # Initialize Processor from Config
state.processor_config = configure_processor(args.config.processor) state.processor_config = configure_processor(args.config.processor)
@ -42,7 +44,9 @@ def configure_server(args, required=False):
@schedule.repeat(schedule.every(1).hour) @schedule.repeat(schedule.every(1).hour)
def update_search_index(): def update_search_index():
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=False) state.model = configure_search(state.model, state.config, regenerate=False)
state.search_index_lock.release()
logger.info("Search Index updated via Scheduler") logger.info("Search Index updated via Scheduler")

View file

@ -125,7 +125,9 @@ def search(q: str, n: Optional[int] = 5, t: Optional[SearchType] = None, r: Opti
@api.get('/update') @api.get('/update')
def update(t: Optional[SearchType] = None, force: Optional[bool] = False): def update(t: Optional[SearchType] = None, force: Optional[bool] = False):
state.search_index_lock.acquire()
state.model = configure_search(state.model, state.config, regenerate=force, t=t) state.model = configure_search(state.model, state.config, regenerate=force, t=t)
state.search_index_lock.release()
logger.info("Search Index updated via API call") logger.info("Search Index updated via API call")
return {'status': 'ok', 'message': 'index updated'} return {'status': 'ok', 'message': 'index updated'}

View file

@ -1,4 +1,5 @@
# Standard Packages # Standard Packages
import threading
from packaging import version from packaging import version
# External Packages # External Packages
@ -20,6 +21,7 @@ host: str = None
port: int = None port: int = None
cli_args: list[str] = None cli_args: list[str] = None
query_cache = LRU() query_cache = LRU()
search_index_lock = threading.Lock()
if torch.cuda.is_available(): if torch.cuda.is_available():
# Use CUDA GPU # Use CUDA GPU