Add unit tests for agents

- Add permutations of testing for with, without knowledge base. Private, public, different users.
This commit is contained in:
sabaimran 2024-10-20 20:04:50 -07:00
parent fc70f25583
commit a979457442
4 changed files with 237 additions and 2 deletions

View file

@ -640,6 +640,16 @@ class AgentAdapters:
agents = await sync_to_async(AgentAdapters.get_all_accessible_agents)(user) agents = await sync_to_async(AgentAdapters.get_all_accessible_agents)(user)
return await sync_to_async(list)(agents) return await sync_to_async(list)(agents)
@staticmethod
async def ais_agent_accessible(agent: Agent, user: KhojUser) -> bool:
if agent.privacy_level == Agent.PrivacyLevel.PUBLIC:
return True
if agent.creator == user:
return True
if agent.privacy_level == Agent.PrivacyLevel.PROTECTED:
return True
return False
@staticmethod @staticmethod
def get_conversation_agent_by_id(agent_id: int): def get_conversation_agent_by_id(agent_id: int):
agent = Agent.objects.filter(id=agent_id).first() agent = Agent.objects.filter(id=agent_id).first()

View file

@ -21,6 +21,7 @@ from starlette.authentication import has_required_scope, requires
from khoj.configure import initialize_content from khoj.configure import initialize_content
from khoj.database import adapters from khoj.database import adapters
from khoj.database.adapters import ( from khoj.database.adapters import (
AgentAdapters,
AutomationAdapters, AutomationAdapters,
ConversationAdapters, ConversationAdapters,
EntryAdapters, EntryAdapters,
@ -114,10 +115,16 @@ async def execute_search(
dedupe: Optional[bool] = True, dedupe: Optional[bool] = True,
agent: Optional[Agent] = None, agent: Optional[Agent] = None,
): ):
start_time = time.time()
# Run validation checks # Run validation checks
results: List[SearchResponse] = [] results: List[SearchResponse] = []
start_time = time.time()
# Ensure the agent, if present, is accessible by the user
if user and agent and not await AgentAdapters.ais_agent_accessible(agent, user):
logger.error(f"Agent {agent.slug} is not accessible by user {user}")
return results
if q is None or q == "": if q is None or q == "":
logger.warning(f"No query param (q) passed in API call to initiate search") logger.warning(f"No query param (q) passed in API call to initiate search")
return results return results

View file

@ -178,6 +178,13 @@ def api_user4(default_user4):
) )
@pytest.mark.django_db
@pytest.fixture
def default_openai_chat_model_option():
chat_model = ChatModelOptionsFactory(chat_model="gpt-4o-mini", model_type="openai")
return chat_model
@pytest.mark.django_db @pytest.mark.django_db
@pytest.fixture @pytest.fixture
def offline_agent(): def offline_agent():

211
tests/test_agents.py Normal file
View file

@ -0,0 +1,211 @@
# tests/test_agents.py
import os
import pytest
from asgiref.sync import sync_to_async
from khoj.database.adapters import AgentAdapters
from khoj.database.models import Agent, ChatModelOptions, Entry, KhojUser
from khoj.routers.api import execute_search
from khoj.utils.helpers import get_absolute_path
from tests.helpers import ChatModelOptionsFactory
def test_create_default_agent(default_user: KhojUser):
ChatModelOptionsFactory()
agent = AgentAdapters.create_default_agent(default_user)
assert agent is not None
assert agent.input_tools == []
assert agent.output_modes == []
assert agent.privacy_level == Agent.PrivacyLevel.PUBLIC
assert agent.managed_by_admin == True
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_create_or_update_agent(default_user: KhojUser, default_openai_chat_model_option: ChatModelOptions):
new_agent = await AgentAdapters.aupdate_agent(
default_user,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[],
[],
[],
)
assert new_agent is not None
assert new_agent.name == "Test Agent"
assert new_agent.privacy_level == Agent.PrivacyLevel.PRIVATE
assert new_agent.creator == default_user
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_create_or_update_agent_with_knowledge_base(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
entries = await sync_to_async(list)(Entry.objects.filter(agent=new_agent))
file_names = set()
for entry in entries:
file_names.add(entry.file_path)
assert new_agent is not None
assert new_agent.name == "Test Agent"
assert new_agent.privacy_level == Agent.PrivacyLevel.PRIVATE
assert new_agent.creator == default_user2
assert len(entries) > 0
assert full_filename in file_names
assert len(file_names) == 1
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_create_or_update_agent_with_knowledge_base_and_search(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
search_result = await execute_search(user=default_user2, q="having kids", agent=new_agent)
assert len(search_result) == 5
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_agent_with_knowledge_base_and_search_not_creator(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client, default_user3: KhojUser
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PUBLIC,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
search_result = await execute_search(user=default_user3, q="having kids", agent=new_agent)
assert len(search_result) == 5
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_agent_with_knowledge_base_and_search_not_creator_and_private(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client, default_user3: KhojUser
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
search_result = await execute_search(user=default_user3, q="having kids", agent=new_agent)
assert len(search_result) == 0
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_agent_with_knowledge_base_and_search_not_creator_and_private_accessible_to_none(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PRIVATE,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
search_result = await execute_search(user=None, q="having kids", agent=new_agent)
assert len(search_result) == 5
@pytest.mark.anyio
@pytest.mark.django_db(transaction=True)
async def test_multiple_agents_with_knowledge_base_and_users(
default_user2: KhojUser, default_openai_chat_model_option: ChatModelOptions, chat_client, default_user3: KhojUser
):
full_filename = get_absolute_path("tests/data/markdown/having_kids.markdown")
new_agent = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent",
"Test Personality",
Agent.PrivacyLevel.PUBLIC,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename],
[],
[],
)
full_filename2 = get_absolute_path("tests/data/markdown/Namita.markdown")
new_agent2 = await AgentAdapters.aupdate_agent(
default_user2,
"Test Agent 2",
"Test Personality",
Agent.PrivacyLevel.PUBLIC,
"icon",
"color",
default_openai_chat_model_option.chat_model,
[full_filename2],
[],
[],
)
search_result = await execute_search(user=default_user3, q="having kids", agent=new_agent2)
search_result2 = await execute_search(user=default_user3, q="Namita", agent=new_agent2)
assert len(search_result) == 0
assert len(search_result2) == 1