mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-11-30 10:53:02 +01:00
85 lines
3.1 KiB
Python
85 lines
3.1 KiB
Python
# Standard Packages
|
|
import numpy as np
|
|
import psutil
|
|
from scipy.stats import linregress
|
|
import secrets
|
|
|
|
# External Packages
|
|
import pytest
|
|
|
|
# Internal Packages
|
|
from khoj.processor.embeddings import EmbeddingsModel
|
|
from khoj.utils import helpers
|
|
|
|
|
|
def test_get_from_null_dict():
|
|
# null handling
|
|
assert helpers.get_from_dict(dict()) == dict()
|
|
assert helpers.get_from_dict(dict(), None) == None
|
|
|
|
# key present in nested dictionary
|
|
# 1-level dictionary
|
|
assert helpers.get_from_dict({"a": 1, "b": 2}, "a") == 1
|
|
assert helpers.get_from_dict({"a": 1, "b": 2}, "c") == None
|
|
|
|
# 2-level dictionary
|
|
assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "a") == {"a_a": 1}
|
|
assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "a", "a_a") == 1
|
|
|
|
# key not present in nested dictionary
|
|
# 2-level_dictionary
|
|
assert helpers.get_from_dict({"a": {"a_a": 1}, "b": 2}, "b", "b_a") == None
|
|
|
|
|
|
def test_merge_dicts():
|
|
# basic merge of dicts with non-overlapping keys
|
|
assert helpers.merge_dicts(priority_dict={"a": 1}, default_dict={"b": 2}) == {"a": 1, "b": 2}
|
|
|
|
# use default dict items when not present in priority dict
|
|
assert helpers.merge_dicts(priority_dict={}, default_dict={"b": 2}) == {"b": 2}
|
|
|
|
# do not override existing key in priority_dict with default dict
|
|
assert helpers.merge_dicts(priority_dict={"a": 1}, default_dict={"a": 2}) == {"a": 1}
|
|
|
|
|
|
def test_lru_cache():
|
|
# Test initializing cache
|
|
cache = helpers.LRU({"a": 1, "b": 2}, capacity=2)
|
|
assert cache == {"a": 1, "b": 2}
|
|
|
|
# Test capacity overflow
|
|
cache["c"] = 3
|
|
assert cache == {"b": 2, "c": 3}
|
|
|
|
# Test delete least recently used item from LRU cache on capacity overflow
|
|
cache["b"] # accessing 'b' makes it the most recently used item
|
|
cache["d"] = 4 # so 'c' is deleted from the cache instead of 'b'
|
|
assert cache == {"b": 2, "d": 4}
|
|
|
|
|
|
@pytest.mark.skip(reason="Memory leak exists on GPU, MPS devices")
|
|
def test_encode_docs_memory_leak():
|
|
# Arrange
|
|
iterations = 50
|
|
batch_size = 20
|
|
embeddings_model = EmbeddingsModel()
|
|
memory_usage_trend = []
|
|
device = f"{helpers.get_device()}".upper()
|
|
|
|
# Act
|
|
# Encode random strings repeatedly and record memory usage trend
|
|
for iteration in range(iterations):
|
|
random_docs = [" ".join(secrets.token_hex(5) for _ in range(10)) for _ in range(batch_size)]
|
|
a = [embeddings_model.embed_documents(random_docs)]
|
|
memory_usage_trend += [psutil.Process().memory_info().rss / (1024 * 1024)]
|
|
print(f"{iteration:02d}, {memory_usage_trend[-1]:.2f}", flush=True)
|
|
|
|
# Calculate slope of line fitting memory usage history
|
|
memory_usage_trend = np.array(memory_usage_trend)
|
|
slope, _, _, _, _ = linregress(np.arange(len(memory_usage_trend)), memory_usage_trend)
|
|
print(f"Memory usage increased at ~{slope:.2f} MB per iteration on {device}")
|
|
|
|
# Assert
|
|
# If slope is positive memory utilization is increasing
|
|
# Positive threshold of 2, from observing memory usage trend on MPS vs CPU device
|
|
assert slope < 2, f"Memory leak suspected on {device}. Memory usage increased at ~{slope:.2f} MB per iteration"
|