Refactor handleStream to LLM Classes ()

This commit is contained in:
Timothy Carambat 2024-02-07 08:15:14 -08:00 committed by GitHub
parent e2a6a2d6c7
commit aca5940650
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 374 additions and 307 deletions
server/utils
AiProviders
azureOpenAi
gemini
huggingface
lmStudio
localAi
mistral
native
ollama
openAi
togetherAi
chats

View file

@ -1,5 +1,6 @@
const { AzureOpenAiEmbedder } = require("../../EmbeddingEngines/azureOpenAi");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
class AzureOpenAiLLM {
constructor(embedder = null, _modelPreference = null) {
@ -135,7 +136,7 @@ class AzureOpenAiLLM {
n: 1,
}
);
return { type: "azureStream", stream };
return stream;
}
async getChatCompletion(messages = [], { temperature = 0.7 }) {
@ -165,7 +166,40 @@ class AzureOpenAiLLM {
n: 1,
}
);
return { type: "azureStream", stream };
return stream;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
for await (const event of stream) {
for (const choice of event.choices) {
const delta = choice.delta?.content;
if (!delta) continue;
fullText += delta;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: delta,
close: false,
error: false,
});
}
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations

View file

@ -1,4 +1,5 @@
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
class GeminiLLM {
constructor(embedder = null, modelPreference = null) {
@ -164,7 +165,7 @@ class GeminiLLM {
if (!responseStream.stream)
throw new Error("Could not stream response stream from Gemini.");
return { type: "geminiStream", ...responseStream };
return responseStream.stream;
}
async streamGetChatCompletion(messages = [], _opts = {}) {
@ -183,7 +184,7 @@ class GeminiLLM {
if (!responseStream.stream)
throw new Error("Could not stream response stream from Gemini.");
return { type: "geminiStream", ...responseStream };
return responseStream.stream;
}
async compressMessages(promptArgs = {}, rawHistory = []) {
@ -192,6 +193,35 @@ class GeminiLLM {
return await messageArrayCompressor(this, messageArray, rawHistory);
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
for await (const chunk of stream) {
fullText += chunk.text();
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: chunk.text(),
close: false,
error: false,
});
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,6 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
class HuggingFaceLLM {
constructor(embedder = null, _modelPreference = null) {
@ -138,7 +139,7 @@ class HuggingFaceLLM {
},
{ responseType: "stream" }
);
return { type: "huggingFaceStream", stream: streamRequest };
return streamRequest;
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
@ -162,7 +163,115 @@ class HuggingFaceLLM {
},
{ responseType: "stream" }
);
return { type: "huggingFaceStream", stream: streamRequest };
return streamRequest;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data:/, "");
if (message !== "[DONE]") {
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {
console.log("Failed to parse message", message);
}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let error = null;
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
error = json?.error || null;
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (!!error) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: null,
close: true,
error,
});
resolve("");
return;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations

View file

@ -1,4 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
// hybrid of openAi LLM chat completion for LMStudio
class LMStudioLLM {
@ -174,6 +175,10 @@ class LMStudioLLM {
return streamRequest;
}
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponse(response, stream, responseProps);
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,4 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
class LocalAiLLM {
constructor(embedder = null, modelPreference = null) {
@ -174,6 +175,10 @@ class LocalAiLLM {
return streamRequest;
}
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponse(response, stream, responseProps);
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,4 +1,5 @@
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
class MistralLLM {
constructor(embedder = null, modelPreference = null) {
@ -164,6 +165,10 @@ class MistralLLM {
return streamRequest;
}
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponse(response, stream, responseProps);
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -2,6 +2,7 @@ const fs = require("fs");
const path = require("path");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
// Docs: https://api.js.langchain.com/classes/chat_models_llama_cpp.ChatLlamaCpp.html
const ChatLlamaCpp = (...args) =>
@ -170,6 +171,41 @@ class NativeLLM {
return responseStream;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
for await (const chunk of stream) {
if (chunk === undefined)
throw new Error(
"Stream returned undefined chunk. Aborting reply - check model provider logs."
);
const content = chunk.hasOwnProperty("content") ? chunk.content : chunk;
fullText += content;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: content,
close: false,
error: false,
});
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,5 +1,6 @@
const { chatPrompt } = require("../../chats");
const { StringOutputParser } = require("langchain/schema/output_parser");
const { writeResponseChunk } = require("../../chats/stream");
// Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md
class OllamaAILLM {
@ -165,6 +166,41 @@ class OllamaAILLM {
return stream;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
for await (const chunk of stream) {
if (chunk === undefined)
throw new Error(
"Stream returned undefined chunk. Aborting reply - check model provider logs."
);
const content = chunk.hasOwnProperty("content") ? chunk.content : chunk;
fullText += content;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: content,
close: false,
error: false,
});
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,5 +1,6 @@
const { OpenAiEmbedder } = require("../../EmbeddingEngines/openAi");
const { chatPrompt } = require("../../chats");
const { handleDefaultStreamResponse } = require("../../chats/stream");
class OpenAiLLM {
constructor(embedder = null, modelPreference = null) {
@ -222,6 +223,10 @@ class OpenAiLLM {
return streamRequest;
}
handleStream(response, stream, responseProps) {
return handleDefaultStreamResponse(response, stream, responseProps);
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations
async embedTextInput(textInput) {
return await this.embedder.embedTextInput(textInput);

View file

@ -1,4 +1,5 @@
const { chatPrompt } = require("../../chats");
const { writeResponseChunk } = require("../../chats/stream");
function togetherAiModels() {
const { MODELS } = require("./models.js");
@ -141,7 +142,7 @@ class TogetherAiLLM {
},
{ responseType: "stream" }
);
return { type: "togetherAiStream", stream: streamRequest };
return streamRequest;
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
@ -175,7 +176,99 @@ class TogetherAiLLM {
},
{ responseType: "stream" }
);
return { type: "togetherAiStream", stream: streamRequest };
return streamRequest;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data: /, "");
if (message !== "[DONE]") {
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
// Simple wrapper for dynamic embedder & normalize interface for all LLM implementations

View file

@ -1,7 +1,7 @@
const { v4: uuidv4 } = require("uuid");
const { getVectorDbClass, getLLMProvider } = require("../helpers");
const { chatPrompt, convertToPromptHistory } = require(".");
const { writeResponseChunk, handleStreamResponses } = require("./stream");
const { writeResponseChunk } = require("./stream");
const { EmbedChats } = require("../../models/embedChats");
async function streamChatWithForEmbed(
@ -150,7 +150,7 @@ async function streamChatWithForEmbed(
const stream = await LLMConnector.streamGetChatCompletion(messages, {
temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = await handleStreamResponses(response, stream, {
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources: [],
});
@ -227,7 +227,7 @@ async function streamEmptyEmbeddingChat({
embed.workspace,
rawHistory
);
completeText = await handleStreamResponses(response, stream, {
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources: [],
});

View file

@ -156,7 +156,7 @@ async function streamChatWithWorkspace(
const stream = await LLMConnector.streamGetChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = await handleStreamResponses(response, stream, {
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources,
});
@ -214,7 +214,7 @@ async function streamEmptyEmbeddingChat({
workspace,
rawHistory
);
completeText = await handleStreamResponses(response, stream, {
completeText = await LLMConnector.handleStream(response, stream, {
uuid,
sources: [],
});
@ -229,301 +229,10 @@ async function streamEmptyEmbeddingChat({
return;
}
// TODO: Refactor this implementation
function handleStreamResponses(response, stream, responseProps) {
// The default way to handle a stream response. Functions best with OpenAI.
function handleDefaultStreamResponse(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
// Gemini likes to return a stream asyncIterator which will
// be a totally different object than other models.
if (stream?.type === "geminiStream") {
return new Promise(async (resolve) => {
let fullText = "";
for await (const chunk of stream.stream) {
fullText += chunk.text();
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: chunk.text(),
close: false,
error: false,
});
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
if (stream?.type === "azureStream") {
return new Promise(async (resolve) => {
let fullText = "";
for await (const event of stream.stream) {
for (const choice of event.choices) {
const delta = choice.delta?.content;
if (!delta) continue;
fullText += delta;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: delta,
close: false,
error: false,
});
}
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
if (stream.type === "togetherAiStream") {
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data: /, "");
if (message !== "[DONE]") {
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
if (stream.type === "huggingFaceStream") {
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
stream.stream.data.on("data", (data) => {
const lines = data
?.toString()
?.split("\n")
.filter((line) => line.trim() !== "");
for (const line of lines) {
let validJSON = false;
const message = chunk + line.replace(/^data:/, "");
if (message !== "[DONE]") {
// JSON chunk is incomplete and has not ended yet
// so we need to stitch it together. You would think JSON
// chunks would only come complete - but they don't!
try {
JSON.parse(message);
validJSON = true;
} catch {
console.log("Failed to parse message", message);
}
if (!validJSON) {
// It can be possible that the chunk decoding is running away
// and the message chunk fails to append due to string length.
// In this case abort the chunk and reset so we can continue.
// ref: https://github.com/Mintplex-Labs/anything-llm/issues/416
try {
chunk += message;
} catch (e) {
console.error(`Chunk appending error`, e);
chunk = "";
}
continue;
} else {
chunk = "";
}
}
if (message == "[DONE]") {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
} else {
let error = null;
let finishReason = null;
let token = "";
try {
const json = JSON.parse(message);
error = json?.error || null;
token = json?.choices?.[0]?.delta?.content;
finishReason = json?.choices?.[0]?.finish_reason || null;
} catch {
continue;
}
if (!!error) {
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: null,
close: true,
error,
});
resolve("");
return;
}
if (token) {
fullText += token;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: token,
close: false,
error: false,
});
}
if (finishReason !== null) {
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
}
}
}
});
});
}
// If stream is not a regular OpenAI Stream (like if using native model, Ollama, or most LangChain interfaces)
// we can just iterate the stream content instead.
if (!stream.hasOwnProperty("data")) {
return new Promise(async (resolve) => {
let fullText = "";
for await (const chunk of stream) {
if (chunk === undefined)
throw new Error(
"Stream returned undefined chunk. Aborting reply - check model provider logs."
);
const content = chunk.hasOwnProperty("content") ? chunk.content : chunk;
fullText += content;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: content,
close: false,
error: false,
});
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
resolve(fullText);
});
}
return new Promise((resolve) => {
let fullText = "";
let chunk = "";
@ -615,5 +324,5 @@ module.exports = {
VALID_CHAT_MODE,
streamChatWithWorkspace,
writeResponseChunk,
handleStreamResponses,
handleDefaultStreamResponse,
};