import secrets import numpy as np import psutil import pytest from scipy.stats import linregress from khoj.processor.embeddings import EmbeddingsModel from khoj.utils import helpers def test_get_from_null_dict(): # null handling assert helpers.get_from_dict(dict()) == dict() assert helpers.get_from_dict(dict(), None) == None # key present in nested dictionary # 1-level dictionary assert helpers.get_from_dict({"a": 1, "b": 2}, "a") == 1 assert helpers.get_from_dict({"a": 1, "b": 2}, "c") == None # 2-level dictionary assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "a") == {"a_a": 1} assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "a", "a_a") == 1 # key not present in nested dictionary # 2-level_dictionary assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "b", "b_a") == None def test_merge_dicts(): # basic merge of dicts with non-overlapping keys assert helpers.merge_dicts(priority_dict={"a": 1}, default_dict={"b": 2}) == {"a": 1, "b": 2} # use default dict items when not present in priority dict assert helpers.merge_dicts(priority_dict={}, default_dict={"b": 2}) == {"b": 2} # do not override existing key in priority_dict with default dict assert helpers.merge_dicts(priority_dict={"a": 1}, default_dict={"a": 2}) == {"a": 1} def test_lru_cache(): # Test initializing cache cache = helpers.LRU({"a": 1, "b": 2}, capacity=2) assert cache == {"a": 1, "b": 2} # Test capacity overflow cache["c"] = 3 assert cache == {"b": 2, "c": 3} # Test delete least recently used item from LRU cache on capacity overflow cache["b"] # accessing 'b' makes it the most recently used item cache["d"] = 4 # so 'c' is deleted from the cache instead of 'b' assert cache == {"b": 2, "d": 4} @pytest.mark.skip(reason="Memory leak exists on GPU, MPS devices") def test_encode_docs_memory_leak(): # Arrange iterations = 50 batch_size = 20 embeddings_model = EmbeddingsModel() memory_usage_trend = [] device = f"{helpers.get_device()}".upper() # Act # Encode random strings repeatedly and record memory usage trend for iteration in range(iterations): random_docs = [" ".join(secrets.token_hex(5) for _ in range(10)) for _ in range(batch_size)] a = [embeddings_model.embed_documents(random_docs)] memory_usage_trend += [psutil.Process().memory_info().rss / (1024 * 1024)] print(f"{iteration:02d}, {memory_usage_trend[-1]:.2f}", flush=True) # Calculate slope of line fitting memory usage history memory_usage_trend = np.array(memory_usage_trend) slope, _, _, _, _ = linregress(np.arange(len(memory_usage_trend)), memory_usage_trend) print(f"Memory usage increased at ~{slope:.2f} MB per iteration on {device}") # Assert # If slope is positive memory utilization is increasing # Positive threshold of 2, from observing memory usage trend on MPS vs CPU device assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration"