Push Files to Index from Emacs, Obsidian & Desktop Clients using Multi-Part Forms (#499)

### Overview
- Add ability to push data to index from the Emacs, Obsidian client
- Switch to standard mechanism of syncing files via HTTP multi-part/form. Previously we were streaming the data as JSON
  - Benefits of new mechanism
    - No manual parsing of files to send or receive on clients or server is required as most have in-built mechanisms to send multi-part/form requests
    - The whole response is not required to be kept in memory to parse content as JSON. As individual files arrive they're automatically pushed to disk to conserve memory if required
    - Binary files don't need to be encoded on client and decoded on server

### Code Details
### Major
- Use multi-part form to receive files to index on server
- Use multi-part form to send files to index on desktop client
- Send files to index on server from the khoj.el emacs client
  - Send content for indexing on server at a regular interval from khoj.el
- Send files to index on server from the khoj obsidian client
- Update tests to test multi-part/form method of pushing files to index

#### Minor
- Put indexer API endpoint under /api path segment
- Explicitly make GET request to /config/data from khoj.el:khoj-server-configure method
- Improve emoji, message on content index updated via logger
- Don't call khoj server on khoj.el load, only once khoj invoked explicitly by user
- Improve indexing of binary files
  - Let fs_syncer pass PDF files directly as binary before indexing
  - Use encoding of each file set in indexer request to read file 
- Add CORS policy to khoj server. Allow requests from khoj apps, obsidian & localhost
- Update indexer API endpoint URL to` index/update` from `indexer/batch`

Resolves #471 #243
This commit is contained in:
Debanjum 2023-10-17 06:05:15 -07:00 committed by GitHub
commit ecc6fbfeb2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 445 additions and 307 deletions

View file

@ -40,6 +40,7 @@ dependencies = [
"dateparser >= 1.1.1",
"defusedxml == 0.7.1",
"fastapi == 0.77.1",
"python-multipart >= 0.0.5",
"jinja2 == 3.1.2",
"openai >= 0.27.0, < 1.0.0",
"tiktoken >= 0.3.2",

View file

@ -8,7 +8,6 @@ const {dialog} = require('electron');
const cron = require('cron').CronJob;
const axios = require('axios');
const { Readable } = require('stream');
const KHOJ_URL = 'http://127.0.0.1:42110'
@ -65,7 +64,7 @@ const schema = {
var state = {}
const store = new Store({schema});
const store = new Store({ schema });
console.log(store);
@ -86,37 +85,48 @@ function handleSetTitle (event, title) {
});
}
function filenameToMimeType (filename) {
const extension = filename.split('.').pop();
switch (extension) {
case 'pdf':
return 'application/pdf';
case 'png':
return 'image/png';
case 'jpg':
case 'jpeg':
return 'image/jpeg';
case 'md':
case 'markdown':
return 'text/markdown';
case 'org':
return 'text/org';
default:
return 'text/plain';
}
}
function pushDataToKhoj (regenerate = false) {
let filesToPush = [];
const files = store.get('files');
const folders = store.get('folders');
state = {
completed: true
const files = store.get('files') || [];
const folders = store.get('folders') || [];
state = { completed: true }
for (const file of files) {
filesToPush.push(file.path);
}
if (files) {
for (file of files) {
filesToPush.push(file.path);
}
}
if (folders) {
for (folder of folders) {
const files = fs.readdirSync(folder.path, { withFileTypes: true });
for (file of files) {
if (file.isFile() && validFileTypes.includes(file.name.split('.').pop())) {
filesToPush.push(path.join(folder.path, file.name));
}
for (const folder of folders) {
const files = fs.readdirSync(folder.path, { withFileTypes: true });
for (const file of files) {
if (file.isFile() && validFileTypes.includes(file.name.split('.').pop())) {
filesToPush.push(path.join(folder.path, file.name));
}
}
}
let data = {
files: []
}
const lastSync = store.get('lastSync') || [];
for (file of filesToPush) {
const formData = new FormData();
for (const file of filesToPush) {
const stats = fs.statSync(file);
if (!regenerate) {
if (stats.mtime.toISOString() < lastSync.find((syncedFile) => syncedFile.path === file)?.datetime) {
@ -125,18 +135,10 @@ function pushDataToKhoj (regenerate = false) {
}
try {
let rawData;
// If the file is a PDF or IMG file, read it as a binary file
if (binaryFileTypes.includes(file.split('.').pop())) {
rawData = fs.readFileSync(file).toString('base64');
} else {
rawData = fs.readFileSync(file, 'utf8');
}
data.files.push({
path: file,
content: rawData
});
encoding = binaryFileTypes.includes(file.split('.').pop()) ? "binary" : "utf8";
mimeType = filenameToMimeType(file) + (encoding === "utf8" ? "; charset=UTF-8" : "");
fileObj = new Blob([fs.createReadStream(file, encoding)], { type: mimeType });
formData.append('files', fileObj, file);
state[file] = {
success: true,
}
@ -151,44 +153,37 @@ function pushDataToKhoj (regenerate = false) {
for (const syncedFile of lastSync) {
if (!filesToPush.includes(syncedFile.path)) {
data.files.push({
path: syncedFile.path,
content: ""
});
fileObj = new Blob([""], { type: filenameToMimeType(syncedFile.path) });
formData.append('files', fileObj, syncedFile.path);
}
}
const headers = { 'x-api-key': 'secret', 'Content-Type': 'application/json' };
const stream = new Readable({
read() {
this.push(JSON.stringify(data));
this.push(null);
}
});
const hostURL = store.get('hostURL') || KHOJ_URL;
axios.post(`${hostURL}/v1/indexer/batch?regenerate=${regenerate}`, stream, { headers })
.then(response => {
console.log(response.data);
const win = BrowserWindow.getAllWindows()[0];
win.webContents.send('update-state', state);
let lastSync = [];
for (const file of filesToPush) {
lastSync.push({
path: file,
datetime: new Date().toISOString()
});
}
store.set('lastSync', lastSync);
})
.catch(error => {
console.error(error);
state['completed'] = false
const win = BrowserWindow.getAllWindows()[0];
win.webContents.send('update-state', state);
});
if (!!formData?.entries()?.next().value) {
const hostURL = store.get('hostURL') || KHOJ_URL;
const headers = {
'x-api-key': 'secret'
};
axios.post(`${hostURL}/api/v1/index/update?force=${regenerate}&client=desktop`, formData, { headers })
.then(response => {
console.log(response.data);
const win = BrowserWindow.getAllWindows()[0];
win.webContents.send('update-state', state);
let lastSync = [];
for (const file of filesToPush) {
lastSync.push({
path: file,
datetime: new Date().toISOString()
});
}
store.set('lastSync', lastSync);
})
.catch(error => {
console.error(error);
state['completed'] = false
const win = BrowserWindow.getAllWindows()[0];
win.webContents.send('update-state', state);
});
}
}
pushDataToKhoj();

View file

@ -93,6 +93,15 @@
:group 'khoj
:type 'number)
(defcustom khoj-server-api-key "secret"
"API Key to Khoj server."
:group 'khoj
:type 'string)
(defcustom khoj-index-interval 3600
"Interval (in seconds) to wait before updating content index."
:group 'khoj
:type 'number)
(defcustom khoj-default-content-type "org"
"The default content type to perform search on."
@ -125,6 +134,12 @@
(defvar khoj--search-on-idle-timer nil
"Idle timer to trigger incremental search.")
(defvar khoj--index-timer nil
"Timer to trigger content indexing.")
(defvar khoj--indexed-files '()
"Files that were indexed in previous content indexing run.")
(declare-function org-element-property "org-mode" (PROPERTY ELEMENT))
(declare-function org-element-type "org-mode" (ELEMENT))
(declare-function markdown-mode "markdown-mode" ())
@ -375,9 +390,10 @@ CONFIG is json obtained from Khoj config API."
(string-join "/"))))
(defun khoj--server-configure ()
"Configure the the Khoj server for search and chat."
"Configure the Khoj server for search and chat."
(interactive)
(let* ((org-directory-regexes (or (mapcar (lambda (dir) (format "%s/**/*.org" dir)) khoj-org-directories) json-null))
(url-request-method "GET")
(current-config
(with-temp-buffer
(url-insert-file-contents (format "%s/api/config/data" khoj-server-url))
@ -389,7 +405,6 @@ CONFIG is json obtained from Khoj config API."
(default-index-dir (khoj--get-directory-from-config default-config '(content-type org embeddings-file)))
(default-chat-dir (khoj--get-directory-from-config default-config '(processor conversation conversation-logfile)))
(chat-model (or khoj-chat-model (alist-get 'chat-model (alist-get 'openai (alist-get 'conversation (alist-get 'processor default-config))))))
(default-model (alist-get 'model (alist-get 'conversation (alist-get 'processor default-config))))
(enable-offline-chat (or khoj-chat-offline (alist-get 'enable-offline-chat (alist-get 'conversation (alist-get 'processor default-config)))))
(config (or current-config default-config)))
@ -519,9 +534,75 @@ CONFIG is json obtained from Khoj config API."
(khoj--server-configure))))
;; -----------------------------------------------
;; Extract and Render Entries of each Content Type
;; -----------------------------------------------
;; -------------------
;; Khoj Index Content
;; -------------------
(defun khoj--server-index-files (&optional force content-type file-paths)
"Send files at `FILE-PATHS' to the Khoj server to index for search and chat.
`FORCE' re-indexes all files of `CONTENT-TYPE' even if they are already indexed."
(interactive)
(let ((boundary (format "-------------------------%d" (random (expt 10 10))))
(files-to-index (or file-paths
(append (mapcan (lambda (dir) (directory-files-recursively dir "\\.org$")) khoj-org-directories) khoj-org-files)))
(type-query (if (or (equal content-type "all") (not content-type)) "" (format "t=%s" content-type)))
(inhibit-message t)
(message-log-max nil))
(let ((url-request-method "POST")
(url-request-data (khoj--render-files-as-request-body files-to-index khoj--indexed-files boundary))
(url-request-extra-headers `(("content-type" . ,(format "multipart/form-data; boundary=%s" boundary))
("x-api-key" . ,khoj-server-api-key))))
(with-current-buffer
(url-retrieve (format "%s/api/v1/index/update?%s&force=%s&client=emacs" khoj-server-url type-query (or force "false"))
;; render response from indexing API endpoint on server
(lambda (status)
(if (not status)
(message "khoj.el: %scontent index %supdated" (if content-type (format "%s " content-type) "") (if force "force " ""))
(with-current-buffer (current-buffer)
(goto-char "\n\n")
(message "khoj.el: Failed to %supdate %s content index. Status: %s. Response: %s"
(if force "force " "")
content-type
status
(string-trim (buffer-substring-no-properties (point) (point-max)))))))
nil t t)))
(setq khoj--indexed-files files-to-index)))
(defun khoj--render-files-as-request-body (files-to-index previously-indexed-files boundary)
"Render `FILES-TO-INDEX', `PREVIOUSLY-INDEXED-FILES' as multi-part form body.
Use `BOUNDARY' to separate files. This is sent to Khoj server as a POST request."
(with-temp-buffer
(set-buffer-multibyte nil)
(insert "\n")
(dolist (file-to-index files-to-index)
(insert (format "--%s\r\n" boundary))
(insert (format "Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n" file-to-index))
(insert "Content-Type: text/org\r\n\r\n")
(insert (with-temp-buffer
(insert-file-contents-literally file-to-index)
(buffer-string)))
(insert "\r\n"))
(dolist (file-to-index previously-indexed-files)
(when (not (member file-to-index files-to-index))
(insert (format "--%s\r\n" boundary))
(insert (format "Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n" file-to-index))
(insert "Content-Type: text/org\r\n\r\n")
(insert "")
(insert "\r\n")))
(insert (format "--%s--\r\n" boundary))
(buffer-string)))
;; Cancel any running indexing timer, first
(when khoj--index-timer
(cancel-timer khoj--index-timer))
;; Send files to index on server every `khoj-index-interval' seconds
(setq khoj--index-timer
(run-with-timer 60 khoj-index-interval 'khoj--server-index-files))
;; -------------------------------------------
;; Render Response from Khoj server for Emacs
;; -------------------------------------------
(defun khoj--extract-entries-as-markdown (json-response query)
"Convert JSON-RESPONSE, QUERY from API to markdown entries."
@ -1029,17 +1110,20 @@ Paragraph only starts at first text after blank line."
;; Khoj Menu
;; ---------
(transient-define-argument khoj--content-type-switch ()
:class 'transient-switches
:argument-format "--content-type=%s"
:argument-regexp ".+"
;; set content type to: last used > based on current buffer > default type
:init-value (lambda (obj) (oset obj value (format "--content-type=%s" (or khoj--content-type (khoj--buffer-name-to-content-type (buffer-name))))))
;; dynamically set choices to content types enabled on khoj backend
:choices (or (ignore-errors (mapcar #'symbol-name (khoj--get-enabled-content-types))) '("all" "org" "markdown" "pdf" "image")))
(defun khoj--setup-and-show-menu ()
"Create Transient menu for khoj and show it."
;; Create the Khoj Transient menu
(transient-define-argument khoj--content-type-switch ()
:class 'transient-switches
:argument-format "--content-type=%s"
:argument-regexp ".+"
;; set content type to: last used > based on current buffer > default type
:init-value (lambda (obj) (oset obj value (format "--content-type=%s" (or khoj--content-type (khoj--buffer-name-to-content-type (buffer-name))))))
;; dynamically set choices to content types enabled on khoj backend
:choices (or (ignore-errors (mapcar #'symbol-name (khoj--get-enabled-content-types))) '("all" "org" "markdown" "pdf" "image")))
(transient-define-suffix khoj--search-command (&optional args)
(interactive (list (transient-args transient-current-command)))
(transient-define-suffix khoj--search-command (&optional args)
(interactive (list (transient-args transient-current-command)))
(progn
;; set content type to: specified > last used > based on current buffer > default type
(setq khoj--content-type (or (transient-arg-value "--content-type=" args) (khoj--buffer-name-to-content-type (buffer-name))))
@ -1048,9 +1132,9 @@ Paragraph only starts at first text after blank line."
;; trigger incremental search
(call-interactively #'khoj-incremental)))
(transient-define-suffix khoj--find-similar-command (&optional args)
"Find items similar to current item at point."
(interactive (list (transient-args transient-current-command)))
(transient-define-suffix khoj--find-similar-command (&optional args)
"Find items similar to current item at point."
(interactive (list (transient-args transient-current-command)))
(progn
;; set content type to: specified > last used > based on current buffer > default type
(setq khoj--content-type (or (transient-arg-value "--content-type=" args) (khoj--buffer-name-to-content-type (buffer-name))))
@ -1058,37 +1142,38 @@ Paragraph only starts at first text after blank line."
(setq khoj-results-count (or (transient-arg-value "--results-count=" args) khoj-results-count))
(khoj--find-similar khoj--content-type)))
(transient-define-suffix khoj--update-command (&optional args)
"Call khoj API to update index of specified content type."
(interactive (list (transient-args transient-current-command)))
(let* ((force-update (if (member "--force-update" args) "true" "false"))
;; set content type to: specified > last used > based on current buffer > default type
(content-type (or (transient-arg-value "--content-type=" args) (khoj--buffer-name-to-content-type (buffer-name))))
(type-query (if (equal content-type "all") "" (format "t=%s" content-type)))
(update-url (format "%s/api/update?%s&force=%s&client=emacs" khoj-server-url type-query force-update))
(url-request-method "GET"))
(progn
(setq khoj--content-type content-type)
(url-retrieve update-url (lambda (_) (message "khoj.el: %s index %supdated!" content-type (if (member "--force-update" args) "force " "")))))))
(transient-define-suffix khoj--update-command (&optional args)
"Call khoj API to update index of specified content type."
(interactive (list (transient-args transient-current-command)))
(let* ((force-update (if (member "--force-update" args) "true" "false"))
;; set content type to: specified > last used > based on current buffer > default type
(content-type (or (transient-arg-value "--content-type=" args) (khoj--buffer-name-to-content-type (buffer-name))))
(url-request-method "GET"))
(progn
(setq khoj--content-type content-type)
(khoj--server-index-files force-update content-type))))
(transient-define-suffix khoj--chat-command (&optional _)
"Command to Chat with Khoj."
(interactive (list (transient-args transient-current-command)))
(khoj--chat))
(transient-define-suffix khoj--chat-command (&optional _)
"Command to Chat with Khoj."
(interactive (list (transient-args transient-current-command)))
(khoj--chat))
(transient-define-prefix khoj--menu ()
"Create Khoj Menu to Configure and Execute Commands."
[["Configure Search"
("n" "Results Count" "--results-count=" :init-value (lambda (obj) (oset obj value (format "%s" khoj-results-count))))
("t" "Content Type" khoj--content-type-switch)]
["Configure Update"
("-f" "Force Update" "--force-update")]]
[["Act"
("c" "Chat" khoj--chat-command)
("s" "Search" khoj--search-command)
("f" "Find Similar" khoj--find-similar-command)
("u" "Update" khoj--update-command)
("q" "Quit" transient-quit-one)]])
(transient-define-prefix khoj--menu ()
"Create Khoj Menu to Configure and Execute Commands."
[["Configure Search"
("n" "Results Count" "--results-count=" :init-value (lambda (obj) (oset obj value (format "%s" khoj-results-count))))
("t" "Content Type" khoj--content-type-switch)]
["Configure Update"
("-f" "Force Update" "--force-update")]]
[["Act"
("c" "Chat" khoj--chat-command)
("s" "Search" khoj--search-command)
("f" "Find Similar" khoj--find-similar-command)
("u" "Update" khoj--update-command)
("q" "Quit" transient-quit-one)]])
;; Show the Khoj Transient menu
(khoj--menu))
;; ----------
@ -1101,7 +1186,7 @@ Paragraph only starts at first text after blank line."
(interactive)
(when khoj-auto-setup
(khoj-setup t))
(khoj--menu))
(khoj--setup-and-show-menu))
(provide 'khoj)

View file

@ -206,6 +206,64 @@ Rule everything\n")
"Rule everything"))
))
;; -------------------------------------
;; Test Helpers to Index Content
;; -------------------------------------
(ert-deftest khoj-tests--render-files-to-add-request-body ()
"Test files are formatted into a multi-part http request body"
(let ((upgrade-file (make-temp-file "upgrade" nil ".org" "# Become God\n## Upgrade\n\nPenance to Immortality\n\n"))
(act-file (make-temp-file "act" nil ".org" "## Act\n\nRule everything\n\n")))
(unwind-protect
(progn
(should
(equal
(khoj--render-files-as-request-body (list upgrade-file act-file) '() "khoj")
(format
"\n--khoj\r\n\
Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n\
Content-Type: text/org\r\n\r\n\
# Become God\n\
## Upgrade\n\n\
Penance to Immortality\n\n\r
--khoj\r\n\
Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n\
Content-Type: text/org\r\n\r\n\
## Act\n\n\
Rule everything\n\n\r\n\
--khoj--\r\n" upgrade-file act-file))))
(delete-file upgrade-file)
(delete-file act-file))))
(ert-deftest khoj-tests--render-files-to-add-delete-in-request-body ()
"Test files are formatted into a multi-part http request body"
(let ((upgrade-file (make-temp-file "upgrade" nil ".org" "# Become God\n## Upgrade\n\nPenance to Immortality\n\n"))
(act-file (make-temp-file "act" nil ".org" "## Act\n\nRule everything\n\n")))
(unwind-protect
(progn
(should
(equal
(khoj--render-files-as-request-body (list upgrade-file act-file) (list upgrade-file act-file "/tmp/deleted-file.org") "khoj")
(format
"\n--khoj\r\n\
Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n\
Content-Type: text/org\r\n\r\n\
# Become God\n\
## Upgrade\n\n\
Penance to Immortality\n\n\r
--khoj\r\n\
Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n\
Content-Type: text/org\r\n\r\n\
## Act\n\n\
Rule everything\n\n\r
--khoj\r\n\
Content-Disposition: form-data; name=\"files\"; filename=\"%s\"\r\n\
Content-Type: text/org\r\n\r\n\
\r
--khoj--\r\n" upgrade-file act-file "/tmp/deleted-file.org"))))
(delete-file upgrade-file)
(delete-file act-file))))
(provide 'khoj-tests)

View file

@ -1,12 +1,13 @@
import { Notice, Plugin } from 'obsidian';
import { Notice, Plugin, TFile } from 'obsidian';
import { KhojSetting, KhojSettingTab, DEFAULT_SETTINGS } from 'src/settings'
import { KhojSearchModal } from 'src/search_modal'
import { KhojChatModal } from 'src/chat_modal'
import { configureKhojBackend } from './utils';
import { configureKhojBackend, updateContentIndex } from './utils';
export default class Khoj extends Plugin {
settings: KhojSetting;
indexingTimer: NodeJS.Timeout;
async onload() {
await this.loadSettings();
@ -54,6 +55,15 @@ export default class Khoj extends Plugin {
// Add a settings tab so the user can configure khoj
this.addSettingTab(new KhojSettingTab(this.app, this));
// Add scheduled job to update index every 60 minutes
this.indexingTimer = setInterval(async () => {
if (this.settings.autoConfigure) {
this.settings.lastSyncedFiles = await updateContentIndex(
this.app.vault, this.settings, this.settings.lastSyncedFiles
);
}
}, 60 * 60 * 1000);
}
async loadSettings() {
@ -72,4 +82,12 @@ export default class Khoj extends Plugin {
}
this.saveData(this.settings);
}
async onunload() {
// Remove scheduled job to update index at regular cadence
if (this.indexingTimer)
clearInterval(this.indexingTimer);
this.unload();
}
}

View file

@ -1,5 +1,6 @@
import { App, Notice, PluginSettingTab, request, Setting } from 'obsidian';
import { App, Notice, PluginSettingTab, request, Setting, TFile } from 'obsidian';
import Khoj from 'src/main';
import { updateContentIndex } from './utils';
export interface KhojSetting {
enableOfflineChat: boolean;
@ -8,6 +9,7 @@ export interface KhojSetting {
khojUrl: string;
connectedToBackend: boolean;
autoConfigure: boolean;
lastSyncedFiles: TFile[];
}
export const DEFAULT_SETTINGS: KhojSetting = {
@ -17,6 +19,7 @@ export const DEFAULT_SETTINGS: KhojSetting = {
connectedToBackend: false,
autoConfigure: true,
openaiApiKey: '',
lastSyncedFiles: []
}
export class KhojSettingTab extends PluginSettingTab {
@ -118,8 +121,9 @@ export class KhojSettingTab extends PluginSettingTab {
}, 300);
this.plugin.registerInterval(progress_indicator);
await request(`${this.plugin.settings.khojUrl}/api/update?t=markdown&force=true&client=obsidian`);
await request(`${this.plugin.settings.khojUrl}/api/update?t=pdf&force=true&client=obsidian`);
this.plugin.settings.lastSyncedFiles = await updateContentIndex(
this.app.vault, this.plugin.settings, this.plugin.settings.lastSyncedFiles, true
);
new Notice('✅ Updated Khoj index.');
// Reset button once index is updated

View file

@ -1,4 +1,4 @@
import { FileSystemAdapter, Notice, RequestUrlParam, request, Vault, Modal } from 'obsidian';
import { FileSystemAdapter, Notice, RequestUrlParam, request, Vault, Modal, TFile } from 'obsidian';
import { KhojSetting } from 'src/settings'
export function getVaultAbsolutePath(vault: Vault): string {
@ -22,10 +22,70 @@ interface ProcessorData {
};
}
function fileExtensionToMimeType (extension: string): string {
switch (extension) {
case 'pdf':
return 'application/pdf';
case 'png':
return 'image/png';
case 'jpg':
case 'jpeg':
return 'image/jpeg';
case 'md':
case 'markdown':
return 'text/markdown';
case 'org':
return 'text/org';
default:
return 'text/plain';
}
}
export async function updateContentIndex(vault: Vault, setting: KhojSetting, lastSyncedFiles: TFile[], regenerate: boolean = false): Promise<TFile[]> {
// Get all markdown, pdf files in the vault
console.log(`Khoj: Updating Khoj content index...`)
const files = vault.getFiles().filter(file => file.extension === 'md' || file.extension === 'pdf');
const binaryFileTypes = ['pdf', 'png', 'jpg', 'jpeg']
let countOfFilesToIndex = 0;
let countOfFilesToDelete = 0;
// Add all files to index as multipart form data
const formData = new FormData();
for (const file of files) {
countOfFilesToIndex++;
const encoding = binaryFileTypes.includes(file.extension) ? "binary" : "utf8";
const mimeType = fileExtensionToMimeType(file.extension) + (encoding === "utf8" ? "; charset=UTF-8" : "");
const fileContent = await vault.read(file);
formData.append('files', new Blob([fileContent], { type: mimeType }), file.path);
}
// Add any previously synced files to be deleted to multipart form data
for (const lastSyncedFile of lastSyncedFiles) {
if (!files.includes(lastSyncedFile)) {
countOfFilesToDelete++;
formData.append('files', new Blob([]), lastSyncedFile.path);
}
}
// Call Khoj backend to update index with all markdown, pdf files
const response = await fetch(`${setting.khojUrl}/api/v1/index/update?force=${regenerate}&client=obsidian`, {
method: 'POST',
headers: {
'x-api-key': 'secret',
},
body: formData,
});
if (!response.ok) {
new Notice(`Failed to update Khoj content index. Ensure Khoj server connected or raise issue on Khoj Discord/Github\nError: ${response.statusText}`);
} else {
console.log(`✅ Refreshed Khoj content index. Updated: ${countOfFilesToIndex} files, Deleted: ${countOfFilesToDelete} files.`);
}
return files;
}
export async function configureKhojBackend(vault: Vault, setting: KhojSetting, notify: boolean = true) {
let vaultPath = getVaultAbsolutePath(vault);
let mdInVault = `${vaultPath}/**/*.md`;
let pdfInVault = `${vaultPath}/**/*.pdf`;
let khojConfigUrl = `${setting.khojUrl}/api/config/data`;
// Check if khoj backend is configured, note if cannot connect to backend
@ -43,11 +103,8 @@ export async function configureKhojBackend(vault: Vault, setting: KhojSetting, n
if (!setting.connectedToBackend) return;
// Set index name from the path of the current vault
let indexName = vaultPath.replace(/\//g, '_').replace(/\\/g, '_').replace(/ /g, '_').replace(/:/g, '_');
// Get default config fields from khoj backend
let defaultConfig = await request(`${khojConfigUrl}/default`).then(response => JSON.parse(response));
let khojDefaultMdIndexDirectory = getIndexDirectoryFromBackendConfig(defaultConfig["content-type"]["markdown"]["embeddings-file"]);
let khojDefaultPdfIndexDirectory = getIndexDirectoryFromBackendConfig(defaultConfig["content-type"]["pdf"]["embeddings-file"]);
let khojDefaultChatDirectory = getIndexDirectoryFromBackendConfig(defaultConfig["processor"]["conversation"]["conversation-logfile"]);
let khojDefaultChatModelName = defaultConfig["processor"]["conversation"]["openai"]["chat-model"];
@ -55,99 +112,7 @@ export async function configureKhojBackend(vault: Vault, setting: KhojSetting, n
await request(khoj_already_configured ? khojConfigUrl : `${khojConfigUrl}/default`)
.then(response => JSON.parse(response))
.then(data => {
khoj_already_configured = data["content-type"] != null;
// If khoj backend not configured yet
if (!khoj_already_configured) {
// Create khoj content-type config with only markdown configured
data["content-type"] = {
"markdown": {
"input-filter": [mdInVault],
"input-files": null,
"embeddings-file": `${khojDefaultMdIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojDefaultMdIndexDirectory}/${indexName}.jsonl.gz`,
}
}
const hasPdfFiles = app.vault.getFiles().some(file => file.extension === 'pdf');
if (hasPdfFiles) {
data["content-type"]["pdf"] = {
"input-filter": [pdfInVault],
"input-files": null,
"embeddings-file": `${khojDefaultPdfIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojDefaultPdfIndexDirectory}/${indexName}.jsonl.gz`,
}
}
}
// Else if khoj config has no markdown content config
else if (!data["content-type"]["markdown"]) {
// Add markdown config to khoj content-type config
// Set markdown config to index markdown files in configured obsidian vault
data["content-type"]["markdown"] = {
"input-filter": [mdInVault],
"input-files": null,
"embeddings-file": `${khojDefaultMdIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojDefaultMdIndexDirectory}/${indexName}.jsonl.gz`,
}
}
// Else if khoj is not configured to index markdown files in configured obsidian vault
else if (
data["content-type"]["markdown"]["input-files"] != null ||
data["content-type"]["markdown"]["input-filter"] == null ||
data["content-type"]["markdown"]["input-filter"].length != 1 ||
data["content-type"]["markdown"]["input-filter"][0] !== mdInVault) {
// Update markdown config in khoj content-type config
// Set markdown config to only index markdown files in configured obsidian vault
let khojMdIndexDirectory = getIndexDirectoryFromBackendConfig(data["content-type"]["markdown"]["embeddings-file"]);
data["content-type"]["markdown"] = {
"input-filter": [mdInVault],
"input-files": null,
"embeddings-file": `${khojMdIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojMdIndexDirectory}/${indexName}.jsonl.gz`,
}
}
if (khoj_already_configured && !data["content-type"]["pdf"]) {
const hasPdfFiles = app.vault.getFiles().some(file => file.extension === 'pdf');
if (hasPdfFiles) {
data["content-type"]["pdf"] = {
"input-filter": [pdfInVault],
"input-files": null,
"embeddings-file": `${khojDefaultPdfIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojDefaultPdfIndexDirectory}/${indexName}.jsonl.gz`,
}
} else {
data["content-type"]["pdf"] = null;
}
}
// Else if khoj is not configured to index pdf files in configured obsidian vault
else if (khoj_already_configured &&
(
data["content-type"]["pdf"]["input-files"] != null ||
data["content-type"]["pdf"]["input-filter"] == null ||
data["content-type"]["pdf"]["input-filter"].length != 1 ||
data["content-type"]["pdf"]["input-filter"][0] !== pdfInVault)) {
let hasPdfFiles = app.vault.getFiles().some(file => file.extension === 'pdf');
if (hasPdfFiles) {
// Update pdf config in khoj content-type config
// Set pdf config to only index pdf files in configured obsidian vault
let khojPdfIndexDirectory = getIndexDirectoryFromBackendConfig(data["content-type"]["pdf"]["embeddings-file"]);
data["content-type"]["pdf"] = {
"input-filter": [pdfInVault],
"input-files": null,
"embeddings-file": `${khojPdfIndexDirectory}/${indexName}.pt`,
"compressed-jsonl": `${khojPdfIndexDirectory}/${indexName}.jsonl.gz`,
}
} else {
data["content-type"]["pdf"] = null;
}
}
let conversationLogFile = data?.["processor"]?.["conversation"]?.["conversation-logfile"] ?? `${khojDefaultChatDirectory}/conversation.json`;
let processorData: ProcessorData = {
"conversation": {
"conversation-logfile": conversationLogFile,
@ -158,9 +123,7 @@ export async function configureKhojBackend(vault: Vault, setting: KhojSetting, n
// If the Open AI API Key was configured in the plugin settings
if (!!setting.openaiApiKey) {
let openAIChatModel = data?.["processor"]?.["conversation"]?.["openai"]?.["chat-model"] ?? khojDefaultChatModelName;
processorData = {
"conversation": {
"conversation-logfile": conversationLogFile,

View file

@ -103,7 +103,7 @@ def configure_routes(app):
app.mount("/static", StaticFiles(directory=constants.web_directory), name="static")
app.include_router(api, prefix="/api")
app.include_router(api_beta, prefix="/api/beta")
app.include_router(indexer, prefix="/v1/indexer")
app.include_router(indexer, prefix="/api/v1/index")
app.include_router(web_client)
@ -117,7 +117,7 @@ if not state.demo:
state.content_index = configure_content(
state.content_index, state.config.content_type, all_files, state.search_models
)
logger.info("📬 Content index updated via Scheduler")
logger.info("📪 Content index updated via Scheduler")
except Exception as e:
logger.error(f"🚨 Error updating content index via Scheduler: {e}", exc_info=True)

View file

@ -20,6 +20,7 @@ warnings.filterwarnings("ignore", message=r"legacy way to download files from th
# External Packages
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from rich.logging import RichHandler
import schedule
@ -31,6 +32,15 @@ from khoj.utils.cli import cli
# Initialize the Application Server
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["app://obsidian.md", "http://localhost:*", "https://app.khoj.dev/*", "app://khoj.dev"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Set Locale
locale.setlocale(locale.LC_ALL, "")

View file

@ -65,7 +65,7 @@ class PdfToJsonl(TextToJsonl):
# Write the PDF file to a temporary file, as it is stored in byte format in the pdf_file object and the PyPDFLoader expects a file path
tmp_file = f"tmp_pdf_file.pdf"
with open(f"{tmp_file}", "wb") as f:
bytes = base64.b64decode(pdf_files[pdf_file])
bytes = pdf_files[pdf_file]
f.write(bytes)
loader = PyMuPDFLoader(f"{tmp_file}")
pdf_entries_per_file = [page.page_content for page in loader.load()]

View file

@ -625,7 +625,7 @@ def update(
if state.processor_config:
components.append("Conversation processor")
components_msg = ", ".join(components)
logger.info(f"📬 {components_msg} updated via API")
logger.info(f"📪 {components_msg} updated via API")
update_telemetry_state(
request=request,

View file

@ -1,11 +1,11 @@
# Standard Packages
import logging
import sys
from typing import Optional, Union, Dict
# External Packages
from fastapi import APIRouter, HTTPException, Header, Request, Body, Response
from fastapi import APIRouter, HTTPException, Header, Request, Response, UploadFile
from pydantic import BaseModel
from khoj.routers.helpers import update_telemetry_state
# Internal Packages
from khoj.utils import state, constants
@ -56,42 +56,30 @@ class IndexerInput(BaseModel):
plaintext: Optional[dict[str, str]] = None
@indexer.post("/batch")
async def index_batch(
@indexer.post("/update")
async def update(
request: Request,
files: list[UploadFile],
x_api_key: str = Header(None),
regenerate: bool = False,
search_type: Optional[Union[state.SearchType, str]] = None,
force: bool = False,
t: Optional[Union[state.SearchType, str]] = None,
client: Optional[str] = None,
user_agent: Optional[str] = Header(None),
referer: Optional[str] = Header(None),
host: Optional[str] = Header(None),
):
if x_api_key != "secret":
raise HTTPException(status_code=401, detail="Invalid API Key")
state.config_lock.acquire()
try:
logger.info(f"Received batch indexing request")
index_batch_request_acc = b""
async for chunk in request.stream():
index_batch_request_acc += chunk
data_bytes = sys.getsizeof(index_batch_request_acc)
unit = "KB"
data_size = data_bytes / 1024
if data_size > 1000:
unit = "MB"
data_size = data_size / 1024
if data_size > 1000:
unit = "GB"
data_size = data_size / 1024
data_size_metric = f"{data_size:.2f} {unit}"
logger.info(f"Received {data_size_metric} of data")
index_batch_request = IndexBatchRequest.parse_raw(index_batch_request_acc)
logger.info(f"Received {len(index_batch_request.files)} files")
logger.info(f"📬 Updating content index via API call by {client}")
org_files: Dict[str, str] = {}
markdown_files: Dict[str, str] = {}
pdf_files: Dict[str, str] = {}
plaintext_files: Dict[str, str] = {}
for file in index_batch_request.files:
file_type = get_file_type(file.path)
for file in files:
file_type, encoding = get_file_type(file.content_type)
dict_to_update = None
if file_type == "org":
dict_to_update = org_files
@ -103,9 +91,11 @@ async def index_batch(
dict_to_update = plaintext_files
if dict_to_update is not None:
dict_to_update[file.path] = file.content
dict_to_update[file.filename] = (
file.file.read().decode("utf-8") if encoding == "utf-8" else file.file.read()
)
else:
logger.info(f"Skipping unsupported streamed file: {file.path}")
logger.warning(f"Skipped indexing unsupported file type sent by {client} client: {file.filename}")
indexer_input = IndexerInput(
org=org_files,
@ -115,7 +105,7 @@ async def index_batch(
)
if state.config == None:
logger.info("First run, initializing state.")
logger.info("📬 Initializing content index on first run.")
default_full_config = FullConfig(
content_type=None,
search_type=SearchConfig.parse_obj(constants.default_config["search-type"]),
@ -142,15 +132,29 @@ async def index_batch(
state.config.content_type,
indexer_input.dict(),
state.search_models,
regenerate=regenerate,
t=search_type,
regenerate=force,
t=t,
full_corpus=False,
)
except Exception as e:
logger.error(f"Failed to process batch indexing request: {e}", exc_info=True)
logger.error(
f"🚨 Failed to {force} update {t} content index triggered via API call by {client}: {e}", exc_info=True
)
finally:
state.config_lock.release()
update_telemetry_state(
request=request,
telemetry_type="api",
api="index/update",
client=client,
user_agent=user_agent,
referer=referer,
host=host,
)
logger.info(f"📪 Content index updated via API call by {client}")
return Response(content="OK", status_code=200)

View file

@ -210,7 +210,7 @@ def get_pdf_files(config: TextContentConfig):
for file in all_pdf_files:
with open(file, "rb") as f:
try:
filename_to_content_map[file] = base64.b64encode(f.read()).decode("utf-8")
filename_to_content_map[file] = f.read()
except Exception as e:
logger.warning(f"Unable to read file: {file} as PDF. Skipping file.")
logger.warning(e, exc_info=True)

View file

@ -66,20 +66,25 @@ def merge_dicts(priority_dict: dict, default_dict: dict):
return merged_dict
def get_file_type(filepath: str) -> str:
"Get file type from file path"
file_type = Path(filepath).suffix[1:]
def get_file_type(file_type: str) -> tuple[str, str]:
"Get file type from file mime type"
if file_type in ["md", "markdown"]:
return "markdown"
elif file_type in ["org", "orgmode"]:
return "org"
elif file_type in ["txt", "text", "html", "xml", "htm", "rst"]:
return "plaintext"
elif file_type in ["pdf"]:
return "pdf"
return file_type
encoding = file_type.split("=")[1].strip().lower() if ";" in file_type else None
file_type = file_type.split(";")[0].strip() if ";" in file_type else file_type
if file_type in ["text/markdown"]:
return "markdown", encoding
elif file_type in ["text/org"]:
return "org", encoding
elif file_type in ["application/pdf"]:
return "pdf", encoding
elif file_type in ["image/jpeg"]:
return "jpeg", encoding
elif file_type in ["image/png"]:
return "png", encoding
elif file_type in ["text/plain", "text/html", "application/xml", "text/x-rst"]:
return "plaintext", encoding
else:
return "other", encoding
def load_model(

View file

@ -6,6 +6,7 @@ from urllib.parse import quote
# External Packages
from fastapi.testclient import TestClient
import pytest
# Internal Packages
from khoj.main import app
@ -60,13 +61,13 @@ def test_regenerate_with_invalid_content_type(client):
# ----------------------------------------------------------------------------------------------------
def test_index_batch(client):
def test_index_update(client):
# Arrange
request_body = get_sample_files_data()
files = get_sample_files_data()
headers = {"x-api-key": "secret"}
# Act
response = client.post("/v1/indexer/batch", json=request_body, headers=headers)
response = client.post("/api/v1/index/update", files=files, headers=headers)
# Assert
assert response.status_code == 200
@ -76,12 +77,11 @@ def test_index_batch(client):
def test_regenerate_with_valid_content_type(client):
for content_type in ["all", "org", "markdown", "image", "pdf", "notion", "plugin1"]:
# Arrange
request_body = get_sample_files_data()
files = get_sample_files_data()
headers = {"x-api-key": "secret"}
# Act
response = client.post(f"/v1/indexer/batch?search_type={content_type}", json=request_body, headers=headers)
response = client.post(f"/api/v1/index/update?t={content_type}", files=files, headers=headers)
# Assert
assert response.status_code == 200, f"Returned status: {response.status_code} for content type: {content_type}"
@ -92,17 +92,17 @@ def test_regenerate_with_github_fails_without_pat(client):
response = client.get(f"/api/update?force=true&t=github")
# Arrange
request_body = get_sample_files_data()
files = get_sample_files_data()
headers = {"x-api-key": "secret"}
# Act
response = client.post(f"/v1/indexer/batch?search_type=github", json=request_body, headers=headers)
response = client.post(f"/api/v1/index/update?t=github", files=files, headers=headers)
# Assert
assert response.status_code == 200, f"Returned status: {response.status_code} for content type: github"
# ----------------------------------------------------------------------------------------------------
@pytest.mark.skip(reason="Flaky test on parallel test runs")
def test_get_configured_types_via_api(client):
# Act
response = client.get(f"/api/config/types")
@ -288,24 +288,20 @@ def test_notes_search_with_exclude_filter(
def get_sample_files_data():
return {
"org": {
"path/to/filename.org": "* practicing piano",
"path/to/filename1.org": "** top 3 reasons why I moved to SF",
"path/to/filename2.org": "* how to build a search engine",
},
"pdf": {
"path/to/filename.pdf": "Moore's law does not apply to consumer hardware",
"path/to/filename1.pdf": "The sun is a ball of helium",
"path/to/filename2.pdf": "Effect of sunshine on baseline human happiness",
},
"plaintext": {
"path/to/filename.txt": "data,column,value",
"path/to/filename1.txt": "<html>my first web page</html>",
"path/to/filename2.txt": "2021-02-02 Journal Entry",
},
"markdown": {
"path/to/filename.md": "# Notes from client call",
"path/to/filename1.md": "## Studying anthropological records from the Fatimid caliphate",
"path/to/filename2.md": "**Understanding science through the lens of art**",
},
"files": ("path/to/filename.org", "* practicing piano", "text/org"),
"files": ("path/to/filename1.org", "** top 3 reasons why I moved to SF", "text/org"),
"files": ("path/to/filename2.org", "* how to build a search engine", "text/org"),
"files": ("path/to/filename.pdf", "Moore's law does not apply to consumer hardware", "application/pdf"),
"files": ("path/to/filename1.pdf", "The sun is a ball of helium", "application/pdf"),
"files": ("path/to/filename2.pdf", "Effect of sunshine on baseline human happiness", "application/pdf"),
"files": ("path/to/filename.txt", "data,column,value", "text/plain"),
"files": ("path/to/filename1.txt", "<html>my first web page</html>", "text/plain"),
"files": ("path/to/filename2.txt", "2021-02-02 Journal Entry", "text/plain"),
"files": ("path/to/filename.md", "# Notes from client call", "text/markdown"),
"files": (
"path/to/filename1.md",
"## Studying anthropological records from the Fatimid caliphate",
"text/markdown",
),
"files": ("path/to/filename2.md", "**Understanding science through the lens of art**", "text/markdown"),
}

View file

@ -1,7 +1,6 @@
# Standard Packages
import json
import os
import base64
# Internal Packages
from khoj.processor.pdf.pdf_to_jsonl import PdfToJsonl
@ -16,7 +15,7 @@ def test_single_page_pdf_to_jsonl():
# Extract Entries from specified Pdf files
# Read singlepage.pdf into memory as bytes
with open("tests/data/pdf/singlepage.pdf", "rb") as f:
pdf_bytes = base64.b64encode(f.read()).decode("utf-8")
pdf_bytes = f.read()
data = {"tests/data/pdf/singlepage.pdf": pdf_bytes}
entries, entry_to_file_map = PdfToJsonl.extract_pdf_entries(pdf_files=data)
@ -36,7 +35,7 @@ def test_multi_page_pdf_to_jsonl():
# Act
# Extract Entries from specified Pdf files
with open("tests/data/pdf/multipage.pdf", "rb") as f:
pdf_bytes = base64.b64encode(f.read()).decode("utf-8")
pdf_bytes = f.read()
data = {"tests/data/pdf/multipage.pdf": pdf_bytes}
entries, entry_to_file_map = PdfToJsonl.extract_pdf_entries(pdf_files=data)