mirror of
https://github.com/khoj-ai/khoj.git
synced 2024-12-23 12:48:09 +00:00
ff5c10c221
Increase defense-in-depth by reducing paths to create, read, update or delete entries, files and conversations in DB when user is unset.
520 lines
19 KiB
Python
520 lines
19 KiB
Python
# Standard Modules
|
|
import os
|
|
from urllib.parse import quote
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
from PIL import Image
|
|
|
|
from khoj.configure import configure_routes, configure_search_types
|
|
from khoj.database.adapters import EntryAdapters
|
|
from khoj.database.models import KhojApiUser, KhojUser
|
|
from khoj.processor.content.org_mode.org_to_entries import OrgToEntries
|
|
from khoj.search_type import text_search
|
|
from khoj.utils import state
|
|
from khoj.utils.rawconfig import ContentConfig, SearchConfig
|
|
|
|
|
|
# Test
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_search_with_no_auth_key(client):
|
|
# Arrange
|
|
user_query = quote("How to call Khoj from Emacs?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}")
|
|
|
|
# Assert
|
|
assert response.status_code == 403
|
|
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_search_with_invalid_auth_key(client):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer invalid-token"}
|
|
user_query = quote("How to call Khoj from Emacs?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 403
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_search_with_invalid_content_type(client):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
user_query = quote("How to call Khoj from Emacs?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&t=invalid_content_type", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 422
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_search_with_valid_content_type(client):
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
for content_type in ["all", "org", "markdown", "image", "pdf", "github", "notion", "plaintext", "image", "docx"]:
|
|
# Act
|
|
response = client.get(f"/api/search?q=random&t={content_type}", headers=headers)
|
|
# Assert
|
|
assert response.status_code == 200, f"Returned status: {response.status_code} for content type: {content_type}"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_with_no_auth_key(client):
|
|
# Arrange
|
|
files = get_sample_files_data()
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files)
|
|
|
|
# Assert
|
|
assert response.status_code == 403
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_with_invalid_auth_key(client):
|
|
# Arrange
|
|
files = get_sample_files_data()
|
|
headers = {"Authorization": "Bearer kk-invalid-token"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 403
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_update_with_invalid_content_type(client):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.get(f"/api/update?t=invalid_content_type", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 422
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_regenerate_with_invalid_content_type(client):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.get(f"/api/update?force=true&t=invalid_content_type", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 422
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_big_files(client):
|
|
# Arrange
|
|
state.billing_enabled = True
|
|
files = get_big_size_sample_files_data()
|
|
|
|
# Credential for the default_user, who is subscribed
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 429
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_medium_file_unsubscribed(client, api_user4: KhojApiUser):
|
|
# Arrange
|
|
api_token = api_user4.token
|
|
state.billing_enabled = True
|
|
files = get_medium_size_sample_files_data()
|
|
headers = {"Authorization": f"Bearer {api_token}"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 429
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_normal_file_unsubscribed(client, api_user4: KhojApiUser):
|
|
# Arrange
|
|
api_token = api_user4.token
|
|
state.billing_enabled = True
|
|
files = get_sample_files_data()
|
|
headers = {"Authorization": f"Bearer {api_token}"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_big_files_no_billing(client):
|
|
# Arrange
|
|
state.billing_enabled = False
|
|
files = get_big_size_sample_files_data()
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update(client):
|
|
# Arrange
|
|
files = get_sample_files_data()
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_index_update_fails_if_more_than_1000_files(client, api_user4: KhojApiUser):
|
|
# Arrange
|
|
api_token = api_user4.token
|
|
state.billing_enabled = True
|
|
files = [("files", (f"path/to/filename{i}.org", f"Symphony No {i}", "text/org")) for i in range(1001)]
|
|
|
|
headers = {"Authorization": f"Bearer {api_token}"}
|
|
|
|
# Act
|
|
ok_response = client.patch("/api/content", files=files[:1000], headers=headers)
|
|
bad_response = client.patch("/api/content", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert ok_response.status_code == 200
|
|
assert bad_response.status_code == 400
|
|
assert bad_response.content.decode("utf-8") == '{"detail":"Too many files. Maximum number of files is 1000."}'
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_regenerate_with_valid_content_type(client):
|
|
for content_type in ["all", "org", "markdown", "image", "pdf", "notion"]:
|
|
# Arrange
|
|
files = get_sample_files_data()
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
|
|
# Act
|
|
response = client.patch(f"/api/content?t={content_type}", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200, f"Returned status: {response.status_code} for content type: {content_type}"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_regenerate_with_github_fails_without_pat(client):
|
|
# Act
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
response = client.get(f"/api/update?force=true&t=github", headers=headers)
|
|
|
|
# Arrange
|
|
files = get_sample_files_data()
|
|
|
|
# Act
|
|
response = client.patch(f"/api/content?t=github", files=files, headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200, f"Returned status: {response.status_code} for content type: github"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db
|
|
def test_get_configured_types_via_api(client, sample_org_data, default_user3: KhojUser):
|
|
# Act
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user3)
|
|
|
|
enabled_types = EntryAdapters.get_unique_file_types(user=default_user3).all().values_list("file_type", flat=True)
|
|
|
|
# Assert
|
|
assert list(enabled_types) == ["org"]
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_get_api_config_types(client, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
|
|
# Act
|
|
response = client.get(f"/api/content/types", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
assert set(response.json()) == {"all", "org", "plaintext"}
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_get_configured_types_with_no_content_config(fastapi_app: FastAPI):
|
|
# Arrange
|
|
state.anonymous_mode = True
|
|
if state.config and state.config.content_type:
|
|
state.config.content_type = None
|
|
state.search_models = configure_search_types()
|
|
|
|
configure_routes(fastapi_app)
|
|
client = TestClient(fastapi_app)
|
|
|
|
# Act
|
|
response = client.get(f"/api/content/types")
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
assert response.json() == ["all"]
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search(client, search_config: SearchConfig, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
user_query = quote("How to git install application?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org&r=true&max_distance=0.22", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
assert len(response.json()) == 1, "Expected only 1 result"
|
|
search_result = response.json()[0]["entry"]
|
|
assert "git clone https://github.com/khoj-ai/khoj" in search_result, "Expected 'git clone' in search result"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search_no_results(client, search_config: SearchConfig, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
user_query = quote("How to find my goat?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org&r=true&max_distance=0.22", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
assert response.json() == [], "Expected no results"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search_with_only_filters(
|
|
client, content_config: ContentConfig, search_config: SearchConfig, sample_org_data, default_user: KhojUser
|
|
):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(
|
|
OrgToEntries,
|
|
sample_org_data,
|
|
regenerate=False,
|
|
user=default_user,
|
|
)
|
|
user_query = quote('+"Emacs" file:"*.org"')
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
# assert actual_data contains word "Emacs"
|
|
search_result = response.json()[0]["entry"]
|
|
assert "Emacs" in search_result
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search_with_include_filter(client, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
user_query = quote('How to git install application? +"Emacs"')
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
# assert actual_data contains word "Emacs"
|
|
search_result = response.json()[0]["entry"]
|
|
assert "emacs" in search_result
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search_with_exclude_filter(client, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(
|
|
OrgToEntries,
|
|
sample_org_data,
|
|
regenerate=False,
|
|
user=default_user,
|
|
)
|
|
user_query = quote('How to git install application? -"clone"')
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
# assert actual_data does not contains word "clone"
|
|
search_result = response.json()[0]["entry"]
|
|
assert "clone" not in search_result
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_notes_search_requires_parent_context(
|
|
client, search_config: SearchConfig, sample_org_data, default_user: KhojUser
|
|
):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-secret"}
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
user_query = quote("Install Khoj on Emacs")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org&r=true&max_distance=0.22", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
assert len(response.json()) == 1, "Expected only 1 result"
|
|
search_result = response.json()[0]["entry"]
|
|
assert "Emacs load path" in search_result, "Expected 'Emacs load path' in search result"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_different_user_data_not_accessed(client, sample_org_data, default_user: KhojUser):
|
|
# Arrange
|
|
headers = {"Authorization": "Bearer kk-token"} # Token for default_user2
|
|
text_search.setup(OrgToEntries, sample_org_data, regenerate=False, user=default_user)
|
|
user_query = quote("How to git install application?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 403
|
|
# assert actual response has no data as the default_user is different from the user making the query (anonymous)
|
|
assert len(response.json()) == 1 and response.json()["detail"] == "Forbidden"
|
|
|
|
|
|
# ----------------------------------------------------------------------------------------------------
|
|
@pytest.mark.django_db(transaction=True)
|
|
def test_user_no_data_returns_empty(client, sample_org_data, api_user3: KhojApiUser):
|
|
# Arrange
|
|
token = api_user3.token
|
|
headers = {"Authorization": "Bearer " + token}
|
|
user_query = quote("How to git install application?")
|
|
|
|
# Act
|
|
response = client.get(f"/api/search?q={user_query}&n=1&t=org", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
# assert actual response has no data as the default_user3, though other users have data
|
|
assert len(response.json()) == 0
|
|
assert response.json() == []
|
|
|
|
|
|
@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="requires OPENAI_API_KEY")
|
|
@pytest.mark.django_db(transaction=True)
|
|
async def test_chat_with_unauthenticated_user(chat_client_with_auth, api_user2: KhojApiUser):
|
|
# Arrange
|
|
query = "Hello!"
|
|
headers = {"Authorization": f"Bearer {api_user2.token}"}
|
|
|
|
# Act
|
|
auth_response = chat_client_with_auth.post(f"/api/chat", json={"q": query}, headers=headers)
|
|
no_auth_response = chat_client_with_auth.post(f"/api/chat", json={"q": query})
|
|
|
|
# Assert
|
|
assert auth_response.status_code == 200
|
|
assert no_auth_response.status_code == 403
|
|
|
|
|
|
def get_sample_files_data():
|
|
return [
|
|
("files", ("path/to/filename.org", "* practicing piano", "text/org")),
|
|
("files", ("path/to/filename1.org", "** top 3 reasons why I moved to SF", "text/org")),
|
|
("files", ("path/to/filename2.org", "* how to build a search engine", "text/org")),
|
|
("files", ("path/to/filename.pdf", "Moore's law does not apply to consumer hardware", "application/pdf")),
|
|
("files", ("path/to/filename1.pdf", "The sun is a ball of helium", "application/pdf")),
|
|
("files", ("path/to/filename2.pdf", "Effect of sunshine on baseline human happiness", "application/pdf")),
|
|
("files", ("path/to/filename.txt", "data,column,value", "text/plain")),
|
|
("files", ("path/to/filename1.txt", "<html>my first web page</html>", "text/plain")),
|
|
("files", ("path/to/filename2.txt", "2021-02-02 Journal Entry", "text/plain")),
|
|
("files", ("path/to/filename.md", "# Notes from client call", "text/markdown")),
|
|
(
|
|
"files",
|
|
(
|
|
"path/to/filename.docx",
|
|
"## Studying anthropological records from the Fatimid caliphate",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
),
|
|
),
|
|
(
|
|
"files",
|
|
("path/to/filename1.md", "## Studying anthropological records from the Fatimid caliphate", "text/markdown"),
|
|
),
|
|
("files", ("path/to/filename2.md", "**Understanding science through the lens of art**", "text/markdown")),
|
|
]
|
|
|
|
|
|
def get_big_size_sample_files_data():
|
|
# a string of approximately 100 MB
|
|
big_text = "a" * (100 * 1024 * 1024)
|
|
return [
|
|
(
|
|
"files",
|
|
("path/to/filename.org", big_text, "text/org"),
|
|
)
|
|
]
|
|
|
|
|
|
def get_medium_size_sample_files_data():
|
|
big_text = "a" * (10 * 1024 * 1024) # a string of approximately 10 MB
|
|
return [
|
|
(
|
|
"files",
|
|
("path/to/filename.org", big_text, "text/org"),
|
|
)
|
|
]
|