Migrate document processor to class ()

* Migrate document processor to class

* forgot "new"
This commit is contained in:
Timothy Carambat 2024-02-16 16:32:25 -08:00 committed by GitHub
parent 51dbff0dcb
commit aad32db5e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 295 additions and 292 deletions
server
endpoints
swagger
utils

View file

@ -1,19 +1,13 @@
const { Telemetry } = require("../../../models/telemetry");
const { validApiKey } = require("../../../utils/middleware/validApiKey");
const { setupMulter } = require("../../../utils/files/multer");
const {
checkProcessorAlive,
acceptedFileTypes,
processDocument,
processLink,
processRawText,
} = require("../../../utils/files/documentProcessor");
const {
viewLocalFiles,
findDocumentInDocuments,
} = require("../../../utils/files");
const { reqBody } = require("../../../utils/http");
const { EventLogs } = require("../../../models/eventLogs");
const { CollectorApi } = require("../../../utils/collectorApi");
const { handleUploads } = setupMulter();
function apiDocumentEndpoints(app) {
@ -80,8 +74,9 @@ function apiDocumentEndpoints(app) {
}
*/
try {
const Collector = new CollectorApi();
const { originalname } = request.file;
const processingOnline = await checkProcessorAlive();
const processingOnline = await Collector.online();
if (!processingOnline) {
response
@ -95,7 +90,7 @@ function apiDocumentEndpoints(app) {
}
const { success, reason, documents } =
await processDocument(originalname);
await Collector.processDocument(originalname);
if (!success) {
response
.status(500)
@ -104,7 +99,7 @@ function apiDocumentEndpoints(app) {
return;
}
console.log(
Collector.log(
`Document ${originalname} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("document_uploaded");
@ -177,8 +172,9 @@ function apiDocumentEndpoints(app) {
}
*/
try {
const Collector = new CollectorApi();
const { link } = reqBody(request);
const processingOnline = await checkProcessorAlive();
const processingOnline = await Collector.online();
if (!processingOnline) {
response
@ -191,7 +187,8 @@ function apiDocumentEndpoints(app) {
return;
}
const { success, reason, documents } = await processLink(link);
const { success, reason, documents } =
await Collector.processLink(link);
if (!success) {
response
.status(500)
@ -200,7 +197,7 @@ function apiDocumentEndpoints(app) {
return;
}
console.log(
Collector.log(
`Link ${link} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("link_uploaded");
@ -278,9 +275,10 @@ function apiDocumentEndpoints(app) {
}
*/
try {
const Collector = new CollectorApi();
const requiredMetadata = ["title"];
const { textContent, metadata = {} } = reqBody(request);
const processingOnline = await checkProcessorAlive();
const processingOnline = await Collector.online();
if (!processingOnline) {
response
@ -322,7 +320,7 @@ function apiDocumentEndpoints(app) {
return;
}
const { success, reason, documents } = await processRawText(
const { success, reason, documents } = await Collector.processRawText(
textContent,
metadata
);
@ -334,7 +332,7 @@ function apiDocumentEndpoints(app) {
return;
}
console.log(
Collector.log(
`Document created successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("raw_document_uploaded");
@ -391,61 +389,6 @@ function apiDocumentEndpoints(app) {
}
});
app.get("/v1/document/:docName", [validApiKey], async (request, response) => {
/*
#swagger.tags = ['Documents']
#swagger.description = 'Get a single document by its unique AnythingLLM document name'
#swagger.parameters['docName'] = {
in: 'path',
description: 'Unique document name to find (name in /documents)',
required: true,
type: 'string'
}
#swagger.responses[200] = {
content: {
"application/json": {
schema: {
type: 'object',
example: {
"localFiles": {
"name": "documents",
"type": "folder",
items: [
{
"name": "my-stored-document.txt-uuid1234.json",
"type": "file",
"id": "bb07c334-4dab-4419-9462-9d00065a49a1",
"url": "file://my-stored-document.txt",
"title": "my-stored-document.txt",
"cached": false
},
]
}
}
}
}
}
}
#swagger.responses[403] = {
schema: {
"$ref": "#/definitions/InvalidAPIKey"
}
}
*/
try {
const { docName } = request.params;
const document = await findDocumentInDocuments(docName);
if (!document) {
response.sendStatus(404).end();
return;
}
response.status(200).json({ document });
} catch (e) {
console.log(e.message, e);
response.sendStatus(500).end();
}
});
app.get(
"/v1/document/accepted-file-types",
[validApiKey],
@ -489,7 +432,7 @@ function apiDocumentEndpoints(app) {
}
*/
try {
const types = await acceptedFileTypes();
const types = await new CollectorApi().acceptedFileTypes();
if (!types) {
response.sendStatus(404).end();
return;
@ -552,6 +495,63 @@ function apiDocumentEndpoints(app) {
}
}
);
// Be careful and place as last route to prevent override of the other /document/ GET
// endpoints!
app.get("/v1/document/:docName", [validApiKey], async (request, response) => {
/*
#swagger.tags = ['Documents']
#swagger.description = 'Get a single document by its unique AnythingLLM document name'
#swagger.parameters['docName'] = {
in: 'path',
description: 'Unique document name to find (name in /documents)',
required: true,
type: 'string'
}
#swagger.responses[200] = {
content: {
"application/json": {
schema: {
type: 'object',
example: {
"localFiles": {
"name": "documents",
"type": "folder",
items: [
{
"name": "my-stored-document.txt-uuid1234.json",
"type": "file",
"id": "bb07c334-4dab-4419-9462-9d00065a49a1",
"url": "file://my-stored-document.txt",
"title": "my-stored-document.txt",
"cached": false
},
]
}
}
}
}
}
}
#swagger.responses[403] = {
schema: {
"$ref": "#/definitions/InvalidAPIKey"
}
}
*/
try {
const { docName } = request.params;
const document = await findDocumentInDocuments(docName);
if (!document) {
response.sendStatus(404).end();
return;
}
response.status(200).json({ document });
} catch (e) {
console.log(e.message, e);
response.sendStatus(500).end();
}
});
}
module.exports = { apiDocumentEndpoints };

View file

@ -1,7 +1,5 @@
const { Telemetry } = require("../../models/telemetry");
const {
forwardExtensionRequest,
} = require("../../utils/files/documentProcessor");
const { CollectorApi } = require("../../utils/collectorApi");
const {
flexUserRoleValid,
ROLES,
@ -16,11 +14,12 @@ function extensionEndpoints(app) {
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const responseFromProcessor = await forwardExtensionRequest({
endpoint: "/ext/github-repo/branches",
method: "POST",
body: request.body,
});
const responseFromProcessor =
await new CollectorApi().forwardExtensionRequest({
endpoint: "/ext/github-repo/branches",
method: "POST",
body: request.body,
});
response.status(200).json(responseFromProcessor);
} catch (e) {
console.error(e);
@ -34,11 +33,12 @@ function extensionEndpoints(app) {
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const responseFromProcessor = await forwardExtensionRequest({
endpoint: "/ext/github-repo",
method: "POST",
body: request.body,
});
const responseFromProcessor =
await new CollectorApi().forwardExtensionRequest({
endpoint: "/ext/github-repo",
method: "POST",
body: request.body,
});
await Telemetry.sendTelemetry("extension_invoked", {
type: "github_repo",
});
@ -55,11 +55,12 @@ function extensionEndpoints(app) {
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
try {
const responseFromProcessor = await forwardExtensionRequest({
endpoint: "/ext/youtube-transcript",
method: "POST",
body: request.body,
});
const responseFromProcessor =
await new CollectorApi().forwardExtensionRequest({
endpoint: "/ext/youtube-transcript",
method: "POST",
body: request.body,
});
await Telemetry.sendTelemetry("extension_invoked", {
type: "youtube_transcript",
});

View file

@ -2,10 +2,6 @@ process.env.NODE_ENV === "development"
? require("dotenv").config({ path: `.env.${process.env.NODE_ENV}` })
: require("dotenv").config();
const { viewLocalFiles, normalizePath } = require("../utils/files");
const {
checkProcessorAlive,
acceptedFileTypes,
} = require("../utils/files/documentProcessor");
const { purgeDocument, purgeFolder } = require("../utils/files/purgeDocument");
const { getVectorDbClass } = require("../utils/helpers");
const { updateENV, dumpENV } = require("../utils/helpers/updateENV");
@ -49,6 +45,7 @@ const {
exportChatsAsType,
} = require("../utils/helpers/chat/convertTo");
const { EventLogs } = require("../models/eventLogs");
const { CollectorApi } = require("../utils/collectorApi");
function systemEndpoints(app) {
if (!app) return;
@ -297,7 +294,7 @@ function systemEndpoints(app) {
[validatedRequest],
async (_, response) => {
try {
const online = await checkProcessorAlive();
const online = await new CollectorApi().online();
response.sendStatus(online ? 200 : 503);
} catch (e) {
console.log(e.message, e);
@ -311,7 +308,7 @@ function systemEndpoints(app) {
[validatedRequest],
async (_, response) => {
try {
const types = await acceptedFileTypes();
const types = await new CollectorApi().acceptedFileTypes();
if (!types) {
response.sendStatus(404).end();
return;

View file

@ -5,11 +5,6 @@ const { DocumentVectors } = require("../models/vectors");
const { WorkspaceChats } = require("../models/workspaceChats");
const { getVectorDbClass } = require("../utils/helpers");
const { setupMulter } = require("../utils/files/multer");
const {
checkProcessorAlive,
processDocument,
processLink,
} = require("../utils/files/documentProcessor");
const { validatedRequest } = require("../utils/middleware/validatedRequest");
const { Telemetry } = require("../models/telemetry");
const {
@ -22,6 +17,7 @@ const {
} = require("../models/workspacesSuggestedMessages");
const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace");
const { convertToChatHistory } = require("../utils/helpers/chat/responses");
const { CollectorApi } = require("../utils/collectorApi");
const { handleUploads } = setupMulter();
function workspaceEndpoints(app) {
@ -98,8 +94,9 @@ function workspaceEndpoints(app) {
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
handleUploads.single("file"),
async function (request, response) {
const Collector = new CollectorApi();
const { originalname } = request.file;
const processingOnline = await checkProcessorAlive();
const processingOnline = await Collector.online();
if (!processingOnline) {
response
@ -112,13 +109,13 @@ function workspaceEndpoints(app) {
return;
}
const { success, reason } = await processDocument(originalname);
const { success, reason } = await Collector.processDocument(originalname);
if (!success) {
response.status(500).json({ success: false, error: reason }).end();
return;
}
console.log(
Collector.log(
`Document ${originalname} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("document_uploaded");
@ -137,8 +134,9 @@ function workspaceEndpoints(app) {
"/workspace/:slug/upload-link",
[validatedRequest, flexUserRoleValid([ROLES.admin, ROLES.manager])],
async (request, response) => {
const Collector = new CollectorApi();
const { link = "" } = reqBody(request);
const processingOnline = await checkProcessorAlive();
const processingOnline = await Collector.online();
if (!processingOnline) {
response
@ -151,13 +149,13 @@ function workspaceEndpoints(app) {
return;
}
const { success, reason } = await processLink(link);
const { success, reason } = await Collector.processLink(link);
if (!success) {
response.status(500).json({ success: false, error: reason }).end();
return;
}
console.log(
Collector.log(
`Link ${link} uploaded processed and successfully. It is now available in documents.`
);
await Telemetry.sendTelemetry("link_uploaded");

View file

@ -1140,81 +1140,6 @@
}
}
},
"/v1/document/{docName}": {
"get": {
"tags": [
"Documents"
],
"description": "Get a single document by its unique AnythingLLM document name",
"parameters": [
{
"name": "docName",
"in": "path",
"required": true,
"schema": {
"type": "string"
},
"description": "Unique document name to find (name in /documents)"
},
{
"name": "Authorization",
"in": "header",
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": "OK",
"content": {
"application/json": {
"schema": {
"type": "object",
"example": {
"localFiles": {
"name": "documents",
"type": "folder",
"items": [
{
"name": "my-stored-document.txt-uuid1234.json",
"type": "file",
"id": "bb07c334-4dab-4419-9462-9d00065a49a1",
"url": "file://my-stored-document.txt",
"title": "my-stored-document.txt",
"cached": false
}
]
}
}
}
}
}
},
"403": {
"description": "Forbidden",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/InvalidAPIKey"
}
},
"application/xml": {
"schema": {
"$ref": "#/components/schemas/InvalidAPIKey"
}
}
}
},
"404": {
"description": "Not Found"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/v1/document/accepted-file-types": {
"get": {
"tags": [
@ -1340,6 +1265,81 @@
}
}
},
"/v1/document/{docName}": {
"get": {
"tags": [
"Documents"
],
"description": "Get a single document by its unique AnythingLLM document name",
"parameters": [
{
"name": "docName",
"in": "path",
"required": true,
"schema": {
"type": "string"
},
"description": "Unique document name to find (name in /documents)"
},
{
"name": "Authorization",
"in": "header",
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": "OK",
"content": {
"application/json": {
"schema": {
"type": "object",
"example": {
"localFiles": {
"name": "documents",
"type": "folder",
"items": [
{
"name": "my-stored-document.txt-uuid1234.json",
"type": "file",
"id": "bb07c334-4dab-4419-9462-9d00065a49a1",
"url": "file://my-stored-document.txt",
"title": "my-stored-document.txt",
"cached": false
}
]
}
}
}
}
}
},
"403": {
"description": "Forbidden",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/InvalidAPIKey"
}
},
"application/xml": {
"schema": {
"$ref": "#/components/schemas/InvalidAPIKey"
}
}
}
},
"404": {
"description": "Not Found"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/v1/workspace/new": {
"post": {
"tags": [

View file

@ -0,0 +1,117 @@
// When running locally will occupy the 0.0.0.0 hostname space but when deployed inside
// of docker this endpoint is not exposed so it is only on the Docker instances internal network
// so no additional security is needed on the endpoint directly. Auth is done however by the express
// middleware prior to leaving the node-side of the application so that is good enough >:)
class CollectorApi {
constructor() {
this.endpoint = "http://0.0.0.0:8888";
}
log(text, ...args) {
console.log(`\x1b[36m[CollectorApi]\x1b[0m ${text}`, ...args);
}
async online() {
return await fetch(this.endpoint)
.then((res) => res.ok)
.catch(() => false);
}
async acceptedFileTypes() {
return await fetch(`${this.endpoint}/accepts`)
.then((res) => {
if (!res.ok) throw new Error("failed to GET /accepts");
return res.json();
})
.then((res) => res)
.catch((e) => {
this.log(e.message);
return null;
});
}
async processDocument(filename = "") {
if (!filename) return false;
return await fetch(`${this.endpoint}/process`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ filename }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
this.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
async processLink(link = "") {
if (!link) return false;
return await fetch(`${this.endpoint}/process-link`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ link }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
this.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
async processRawText(textContent = "", metadata = {}) {
return await fetch(`${this.endpoint}/process-raw-text`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ textContent, metadata }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
this.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
// We will not ever expose the document processor to the frontend API so instead we relay
// all requests through the server. You can use this function to directly expose a specific endpoint
// on the document processor.
async forwardExtensionRequest({ endpoint, method, body }) {
return await fetch(`${this.endpoint}${endpoint}`, {
method,
body, // Stringified JSON!
headers: {
"Content-Type": "application/json",
},
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
this.log(e.message);
return { success: false, data: {}, reason: e.message };
});
}
}
module.exports.CollectorApi = CollectorApi;

View file

@ -1,110 +0,0 @@
// When running locally will occupy the 0.0.0.0 hostname space but when deployed inside
// of docker this endpoint is not exposed so it is only on the Docker instances internal network
// so no additional security is needed on the endpoint directly. Auth is done however by the express
// middleware prior to leaving the node-side of the application so that is good enough >:)
const PROCESSOR_API = "http://0.0.0.0:8888";
async function checkProcessorAlive() {
return await fetch(`${PROCESSOR_API}`)
.then((res) => res.ok)
.catch((e) => false);
}
async function acceptedFileTypes() {
return await fetch(`${PROCESSOR_API}/accepts`)
.then((res) => {
if (!res.ok) throw new Error("Could not reach");
return res.json();
})
.then((res) => res)
.catch(() => null);
}
async function processDocument(filename = "") {
if (!filename) return false;
return await fetch(`${PROCESSOR_API}/process`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ filename }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
console.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
async function processLink(link = "") {
if (!link) return false;
return await fetch(`${PROCESSOR_API}/process-link`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ link }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
console.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
async function processRawText(textContent = "", metadata = {}) {
return await fetch(`${PROCESSOR_API}/process-raw-text`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ textContent, metadata }),
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
console.log(e.message);
return { success: false, reason: e.message, documents: [] };
});
}
// We will not ever expose the document processor to the frontend API so instead we relay
// all requests through the server. You can use this function to directly expose a specific endpoint
// on the document processor.
async function forwardExtensionRequest({ endpoint, method, body }) {
return await fetch(`${PROCESSOR_API}${endpoint}`, {
method,
body, // Stringified JSON!
headers: {
"Content-Type": "application/json",
},
})
.then((res) => {
if (!res.ok) throw new Error("Response could not be completed");
return res.json();
})
.then((res) => res)
.catch((e) => {
console.log(e.message);
return { success: false, data: {}, reason: e.message };
});
}
module.exports = {
checkProcessorAlive,
processDocument,
processLink,
processRawText,
acceptedFileTypes,
forwardExtensionRequest,
};