LLM performance metric tracking ()

* WIP performance metric tracking

* fix: patch UI trying to .toFixed() null metric
Anthropic tracking migraiton
cleanup logs

* Apipie implmentation, not tested

* Cleanup Anthropic notes, Add support for AzureOpenAI tracking

* bedrock token metric tracking

* Cohere support

* feat: improve default stream handler to track for provider who are actually OpenAI compliant in usage reporting
add deepseek support

* feat: Add FireworksAI tracking reporting
fix: improve handler when usage:null is reported (why?)

* Add token reporting for GenericOpenAI

* token reporting for koboldcpp + lmstudio

* lint

* support Groq token tracking

* HF token tracking

* token tracking for togetherai

* LiteLLM token tracking

* linting + Mitral token tracking support

* XAI token metric reporting

* native provider runner

* LocalAI token tracking

* Novita token tracking

* OpenRouter token tracking

* Apipie stream metrics

* textwebgenui token tracking

* perplexity token reporting

* ollama token reporting

* lint

* put back comment

* Rip out LC ollama wrapper and use official library

* patch images with new ollama lib

* improve ollama offline message

* fix image handling in ollama llm provider

* lint

* NVIDIA NIM token tracking

* update openai compatbility responses

* UI/UX show/hide metrics on click for user preference

* update bedrock client

---------

Co-authored-by: shatfield4 <seanhatfield5@gmail.com>
This commit is contained in:
Timothy Carambat 2024-12-16 14:31:17 -08:00 committed by GitHub
parent 15abc3f803
commit dd7c4675d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 1769 additions and 565 deletions
frontend/src
components/WorkspaceChat/ChatContainer
ChatHistory
HistoricalMessage
Actions
RenderMetrics
index.jsx
index.jsx
index.jsx
ChatTooltips
index.jsx
utils/chat
server
.env.example
endpoints/api/admin
utils
AiProviders
anthropic
apipie
azureOpenAi
bedrock
cohere
deepseek
fireworksAi
gemini
genericOpenAi
groq
huggingface
koboldCPP
liteLLM
lmStudio
localAi
mistral
native
novita
nvidiaNim
ollama
openAi
openRouter
perplexity
textGenWebUI
togetherAi
xai
chats
helpers

View file

@ -0,0 +1,118 @@
import { numberWithCommas } from "@/utils/numbers";
import React, { useEffect, useState, useContext } from "react";
const MetricsContext = React.createContext();
const SHOW_METRICS_KEY = "anythingllm_show_chat_metrics";
const SHOW_METRICS_EVENT = "anythingllm_show_metrics_change";
/**
* @param {number} duration - duration in milliseconds
* @returns {string}
*/
function formatDuration(duration) {
try {
return duration < 1
? `${(duration * 1000).toFixed(0)}ms`
: `${duration.toFixed(3)}s`;
} catch {
return "";
}
}
/**
* Format the output TPS to a string
* @param {number} outputTps - output TPS
* @returns {string}
*/
function formatTps(outputTps) {
try {
return outputTps < 1000
? outputTps.toFixed(2)
: numberWithCommas(outputTps.toFixed(0));
} catch {
return "";
}
}
/**
* Get the show metrics setting from localStorage `anythingllm_show_chat_metrics` key
* @returns {boolean}
*/
function getAutoShowMetrics() {
return window?.localStorage?.getItem(SHOW_METRICS_KEY) === "true";
}
/**
* Toggle the show metrics setting in localStorage `anythingllm_show_chat_metrics` key
* @returns {void}
*/
function toggleAutoShowMetrics() {
const currentValue = getAutoShowMetrics() || false;
window?.localStorage?.setItem(SHOW_METRICS_KEY, !currentValue);
window.dispatchEvent(
new CustomEvent(SHOW_METRICS_EVENT, {
detail: { showMetricsAutomatically: !currentValue },
})
);
return !currentValue;
}
/**
* Provider for the metrics context that controls the visibility of the metrics
* per-chat based on the user's preference.
* @param {React.ReactNode} children
* @returns {React.ReactNode}
*/
export function MetricsProvider({ children }) {
const [showMetricsAutomatically, setShowMetricsAutomatically] =
useState(getAutoShowMetrics());
useEffect(() => {
function handleShowingMetricsEvent(e) {
if (!e?.detail?.hasOwnProperty("showMetricsAutomatically")) return;
setShowMetricsAutomatically(e.detail.showMetricsAutomatically);
}
console.log("Adding event listener for metrics visibility");
window.addEventListener(SHOW_METRICS_EVENT, handleShowingMetricsEvent);
return () =>
window.removeEventListener(SHOW_METRICS_EVENT, handleShowingMetricsEvent);
}, []);
return (
<MetricsContext.Provider
value={{ showMetricsAutomatically, setShowMetricsAutomatically }}
>
{children}
</MetricsContext.Provider>
);
}
/**
* Render the metrics for a given chat, if available
* @param {metrics: {duration:number, outputTps: number}} props
* @returns
*/
export default function RenderMetrics({ metrics = {} }) {
// Inherit the showMetricsAutomatically state from the MetricsProvider so the state is shared across all chats
const { showMetricsAutomatically, setShowMetricsAutomatically } =
useContext(MetricsContext);
if (!metrics?.duration || !metrics?.outputTps) return null;
return (
<button
type="button"
onClick={() => setShowMetricsAutomatically(toggleAutoShowMetrics())}
data-tooltip-id="metrics-visibility"
data-tooltip-content={
showMetricsAutomatically
? "Click to only show metrics when hovering"
: "Click to show metrics as soon as they are available"
}
className={`border-none flex justify-end items-center gap-x-[8px] ${showMetricsAutomatically ? "opacity-100" : "opacity-0"} md:group-hover:opacity-100 transition-all duration-300`}
>
<p className="cursor-pointer text-xs font-mono text-theme-text-secondary opacity-50">
{formatDuration(metrics.duration)} ({formatTps(metrics.outputTps)}{" "}
tok/s)
</p>
</button>
);
}

View file

@ -3,6 +3,7 @@ import useCopyText from "@/hooks/useCopyText";
import { Check, ThumbsUp, ArrowsClockwise, Copy } from "@phosphor-icons/react";
import Workspace from "@/models/workspace";
import { EditMessageAction } from "./EditMessage";
import RenderMetrics from "./RenderMetrics";
import ActionMenu from "./ActionMenu";
const Actions = ({
@ -15,6 +16,7 @@ const Actions = ({
forkThread,
isEditing,
role,
metrics = {},
}) => {
const [selectedFeedback, setSelectedFeedback] = useState(feedbackScore);
const handleFeedback = async (newFeedback) => {
@ -58,6 +60,7 @@ const Actions = ({
/>
</div>
</div>
<RenderMetrics metrics={metrics} />
</div>
);
};

View file

@ -26,6 +26,7 @@ const HistoricalMessage = ({
regenerateMessage,
saveEditedMessage,
forkThread,
metrics = {},
}) => {
const { isEditing } = useEditMessage({ chatId, role });
const { isDeleted, completeDelete, onEndAnimation } = useWatchDeleteMessage({
@ -117,6 +118,7 @@ const HistoricalMessage = ({
isEditing={isEditing}
role={role}
forkThread={forkThread}
metrics={metrics}
/>
</div>
{role === "assistant" && <Citations sources={sources} />}

View file

@ -227,6 +227,7 @@ export default function ChatHistory({
isLastMessage={isLastBotReply}
saveEditedMessage={saveEditedMessage}
forkThread={forkThread}
metrics={props.metrics}
/>
);
})}

View file

@ -61,6 +61,12 @@ export function ChatTooltips() {
// as the citation modal is z-indexed above the chat history
className="tooltip !text-xs z-[100]"
/>
<Tooltip
id="metrics-visibility"
place="bottom"
delayShow={300}
className="tooltip !text-xs"
/>
</>
);
}

View file

@ -18,6 +18,7 @@ import SpeechRecognition, {
useSpeechRecognition,
} from "react-speech-recognition";
import { ChatTooltips } from "./ChatTooltips";
import { MetricsProvider } from "./ChatHistory/HistoricalMessage/Actions/RenderMetrics";
export default function ChatContainer({ workspace, knownHistory = [] }) {
const { threadSlug = null } = useParams();
@ -268,14 +269,16 @@ export default function ChatContainer({ workspace, knownHistory = [] }) {
>
{isMobile && <SidebarMobileHeader />}
<DnDFileUploaderWrapper>
<ChatHistory
history={chatHistory}
workspace={workspace}
sendCommand={sendCommand}
updateHistory={setChatHistory}
regenerateAssistantMessage={regenerateAssistantMessage}
hasAttachments={files.length > 0}
/>
<MetricsProvider>
<ChatHistory
history={chatHistory}
workspace={workspace}
sendCommand={sendCommand}
updateHistory={setChatHistory}
regenerateAssistantMessage={regenerateAssistantMessage}
hasAttachments={files.length > 0}
/>
</MetricsProvider>
<PromptInput
submit={handleSubmit}
onChange={handleMessageChange}

View file

@ -19,6 +19,7 @@ export default function handleChat(
close,
chatId = null,
action = null,
metrics = {},
} = chatResult;
if (type === "abort" || type === "statusResponse") {
@ -35,6 +36,7 @@ export default function handleChat(
error,
animate: false,
pending: false,
metrics,
},
]);
_chatHistory.push({
@ -47,6 +49,7 @@ export default function handleChat(
error,
animate: false,
pending: false,
metrics,
});
} else if (type === "textResponse") {
setLoadingResponse(false);
@ -62,6 +65,7 @@ export default function handleChat(
animate: !close,
pending: false,
chatId,
metrics,
},
]);
_chatHistory.push({
@ -74,21 +78,42 @@ export default function handleChat(
animate: !close,
pending: false,
chatId,
metrics,
});
} else if (type === "textResponseChunk") {
} else if (
type === "textResponseChunk" ||
type === "finalizeResponseStream"
) {
const chatIdx = _chatHistory.findIndex((chat) => chat.uuid === uuid);
if (chatIdx !== -1) {
const existingHistory = { ..._chatHistory[chatIdx] };
const updatedHistory = {
...existingHistory,
content: existingHistory.content + textResponse,
sources,
error,
closed: close,
animate: !close,
pending: false,
chatId,
};
let updatedHistory;
// If the response is finalized, we can set the loading state to false.
// and append the metrics to the history.
if (type === "finalizeResponseStream") {
updatedHistory = {
...existingHistory,
closed: close,
animate: !close,
pending: false,
chatId,
metrics,
};
setLoadingResponse(false);
} else {
updatedHistory = {
...existingHistory,
content: existingHistory.content + textResponse,
sources,
error,
closed: close,
animate: !close,
pending: false,
chatId,
metrics,
};
}
_chatHistory[chatIdx] = updatedHistory;
} else {
_chatHistory.push({
@ -101,6 +126,7 @@ export default function handleChat(
animate: !close,
pending: false,
chatId,
metrics,
});
}
setChatHistory([..._chatHistory]);
@ -125,6 +151,7 @@ export default function handleChat(
error: null,
animate: false,
pending: false,
metrics,
};
_chatHistory[chatIdx] = updatedHistory;

View file

@ -52,6 +52,10 @@ SIG_SALT='salt' # Please generate random string at least 32 chars long.
# PERPLEXITY_API_KEY='my-perplexity-key'
# PERPLEXITY_MODEL_PREF='codellama-34b-instruct'
# LLM_PROVIDER='deepseek'
# DEEPSEEK_API_KEY=YOUR_API_KEY
# DEEPSEEK_MODEL_PREF='deepseek-chat'
# LLM_PROVIDER='openrouter'
# OPENROUTER_API_KEY='my-openrouter-key'
# OPENROUTER_MODEL_PREF='openrouter/auto'

View file

@ -610,24 +610,20 @@ function apiAdminEndpoints(app) {
const workspaceUsers = await Workspace.workspaceUsers(workspace.id);
if (!workspace) {
response
.status(404)
.json({
success: false,
error: `Workspace ${workspaceSlug} not found`,
users: workspaceUsers,
});
response.status(404).json({
success: false,
error: `Workspace ${workspaceSlug} not found`,
users: workspaceUsers,
});
return;
}
if (userIds.length === 0) {
response
.status(404)
.json({
success: false,
error: `No valid user IDs provided.`,
users: workspaceUsers,
});
response.status(404).json({
success: false,
error: `No valid user IDs provided.`,
users: workspaceUsers,
});
return;
}
@ -637,13 +633,11 @@ function apiAdminEndpoints(app) {
workspace.id,
userIds
);
return response
.status(200)
.json({
success,
error,
users: await Workspace.workspaceUsers(workspace.id),
});
return response.status(200).json({
success,
error,
users: await Workspace.workspaceUsers(workspace.id),
});
}
// Add new users to the workspace if they are not already in the workspace
@ -653,13 +647,11 @@ function apiAdminEndpoints(app) {
);
if (usersToAdd.length > 0)
await WorkspaceUser.createManyUsers(usersToAdd, workspace.id);
response
.status(200)
.json({
success: true,
error: null,
users: await Workspace.workspaceUsers(workspace.id),
});
response.status(200).json({
success: true,
error: null,
users: await Workspace.workspaceUsers(workspace.id),
});
} catch (e) {
console.error(e);
response.sendStatus(500).end();

View file

@ -5,6 +5,9 @@ const {
} = require("../../helpers/chat/responses");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { MODEL_MAP } = require("../modelMap");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
class AnthropicLLM {
constructor(embedder = null, modelPreference = null) {
@ -111,18 +114,31 @@ class AnthropicLLM {
);
try {
const response = await this.anthropic.messages.create({
model: this.model,
max_tokens: 4096,
system: messages[0].content, // Strip out the system message
messages: messages.slice(1), // Pop off the system message
temperature: Number(temperature ?? this.defaultTemp),
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.anthropic.messages.create({
model: this.model,
max_tokens: 4096,
system: messages[0].content, // Strip out the system message
messages: messages.slice(1), // Pop off the system message
temperature: Number(temperature ?? this.defaultTemp),
})
);
return response.content[0].text;
const promptTokens = result.output.usage.input_tokens;
const completionTokens = result.output.usage.output_tokens;
return {
textResponse: result.output.content[0].text,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
} catch (error) {
console.log(error);
return error;
return { textResponse: error, metrics: {} };
}
}
@ -132,26 +148,45 @@ class AnthropicLLM {
`Anthropic chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.anthropic.messages.stream({
model: this.model,
max_tokens: 4096,
system: messages[0].content, // Strip out the system message
messages: messages.slice(1), // Pop off the system message
temperature: Number(temperature ?? this.defaultTemp),
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.anthropic.messages.stream({
model: this.model,
max_tokens: 4096,
system: messages[0].content, // Strip out the system message
messages: messages.slice(1), // Pop off the system message
temperature: Number(temperature ?? this.defaultTemp),
}),
messages,
false
);
return measuredStreamRequest;
}
/**
* Handles the stream response from the Anthropic API.
* @param {Object} response - the response object
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the Anthropic API w/tracking
* @param {Object} responseProps - the response properties
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
return new Promise((resolve) => {
let fullText = "";
const { uuid = v4(), sources = [] } = responseProps;
let usage = {
prompt_tokens: 0,
completion_tokens: 0,
};
// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
stream.on("error", (event) => {
@ -173,11 +208,18 @@ class AnthropicLLM {
error: parseErrorMsg(event),
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
});
stream.on("streamEvent", (message) => {
const data = message;
if (data.type === "message_start")
usage.prompt_tokens = data?.message?.usage?.input_tokens;
if (data.type === "message_delta")
usage.completion_tokens = data?.usage?.output_tokens;
if (
data.type === "content_block_delta" &&
data.delta.type === "text_delta"
@ -208,6 +250,7 @@ class AnthropicLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
}
});

View file

@ -4,10 +4,13 @@ const {
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const fs = require("fs");
const path = require("path");
const { safeJsonParse } = require("../../http");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const cacheFolder = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "models", "apipie")
@ -188,19 +191,35 @@ class ApiPieLLM {
`ApiPie chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps:
(result.output.usage?.completion_tokens || 0) / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -209,13 +228,16 @@ class ApiPieLLM {
`ApiPie chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {
@ -228,7 +250,12 @@ class ApiPieLLM {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
try {
@ -258,6 +285,9 @@ class ApiPieLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}
@ -271,6 +301,9 @@ class ApiPieLLM {
error: e.message,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
});

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
writeResponseChunk,
clientAbortedHandler,
@ -114,11 +117,28 @@ class AzureOpenAiLLM {
"No OPEN_MODEL_PREF ENV defined. This must the name of a deployment on your Azure account for an LLM chat model like GPT-3.5."
);
const data = await this.openai.getChatCompletions(this.model, messages, {
temperature,
});
if (!data.hasOwnProperty("choices")) return null;
return data.choices[0].message.content;
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.getChatCompletions(this.model, messages, {
temperature,
})
);
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.promptTokens || 0,
completion_tokens: result.output.usage.completionTokens || 0,
total_tokens: result.output.usage.totalTokens || 0,
outputTps: result.output.usage.completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = [], { temperature = 0.7 }) {
@ -127,28 +147,43 @@ class AzureOpenAiLLM {
"No OPEN_MODEL_PREF ENV defined. This must the name of a deployment on your Azure account for an LLM chat model like GPT-3.5."
);
const stream = await this.openai.streamChatCompletions(
this.model,
messages,
{
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
await this.openai.streamChatCompletions(this.model, messages, {
temperature,
n: 1,
}
}),
messages
);
return stream;
return measuredStreamRequest;
}
/**
* Handles the stream response from the AzureOpenAI API.
* Azure does not return the usage metrics in the stream response, but 1msg = 1token
* so we can estimate the completion tokens by counting the number of messages.
* @param {Object} response - the response object
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the AzureOpenAI API w/tracking
* @param {Object} responseProps - the response properties
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
let usage = {
completion_tokens: 0,
};
// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
for await (const event of stream) {
@ -156,6 +191,8 @@ class AzureOpenAiLLM {
const delta = choice.delta?.content;
if (!delta) continue;
fullText += delta;
usage.completion_tokens++;
writeResponseChunk(response, {
uuid,
sources: [],
@ -176,6 +213,7 @@ class AzureOpenAiLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
});
}

View file

@ -4,6 +4,9 @@ const {
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
// Docs: https://js.langchain.com/v0.2/docs/integrations/chat/bedrock_converse
class AWSBedrockLLM {
@ -82,7 +85,7 @@ class AWSBedrockLLM {
}
// For streaming we use Langchain's wrapper to handle weird chunks
// or otherwise absorb headaches that can arise from Ollama models
// or otherwise absorb headaches that can arise from Bedrock models
#convertToLangchainPrototypes(chats = []) {
const {
HumanMessage,
@ -219,40 +222,73 @@ class AWSBedrockLLM {
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const model = this.#bedrockClient({ temperature });
const textResponse = await model
.pipe(new StringOutputParser())
.invoke(this.#convertToLangchainPrototypes(messages))
.catch((e) => {
throw new Error(
`AWSBedrock::getChatCompletion failed to communicate with Ollama. ${e.message}`
);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
model
.pipe(new StringOutputParser())
.invoke(this.#convertToLangchainPrototypes(messages))
.catch((e) => {
throw new Error(
`AWSBedrock::getChatCompletion failed to communicate with Bedrock client. ${e.message}`
);
})
);
if (!textResponse || !textResponse.length)
throw new Error(`AWSBedrock::getChatCompletion text response was empty.`);
if (!result.output || result.output.length === 0) return null;
return textResponse;
// Langchain does not return the usage metrics in the response so we estimate them
const promptTokens = LLMPerformanceMonitor.countTokens(messages);
const completionTokens = LLMPerformanceMonitor.countTokens([
{ content: result.output },
]);
return {
textResponse: result.output,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const model = this.#bedrockClient({ temperature });
const stream = await model
.pipe(new StringOutputParser())
.stream(this.#convertToLangchainPrototypes(messages));
return stream;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
model
.pipe(new StringOutputParser())
.stream(this.#convertToLangchainPrototypes(messages)),
messages
);
return measuredStreamRequest;
}
/**
* Handles the stream response from the AWS Bedrock API.
* Bedrock does not support usage metrics in the stream response so we need to estimate them.
* @param {Object} response - the response object
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the AWS Bedrock API w/tracking
* @param {Object} responseProps - the response properties
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
let usage = {
completion_tokens: 0,
};
// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
try {
@ -266,6 +302,7 @@ class AWSBedrockLLM {
? chunk.content
: chunk;
fullText += content;
if (!!content) usage.completion_tokens++; // Dont count empty chunks
writeResponseChunk(response, {
uuid,
sources: [],
@ -285,6 +322,7 @@ class AWSBedrockLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
} catch (error) {
writeResponseChunk(response, {
@ -298,6 +336,7 @@ class AWSBedrockLLM {
}`,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
}
});
}

View file

@ -2,6 +2,9 @@ const { v4 } = require("uuid");
const { writeResponseChunk } = require("../../helpers/chat/responses");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const { MODEL_MAP } = require("../modelMap");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
class CohereLLM {
constructor(embedder = null) {
@ -101,15 +104,33 @@ class CohereLLM {
const message = messages[messages.length - 1].content; // Get the last message
const cohereHistory = this.#convertChatHistoryCohere(messages.slice(0, -1)); // Remove the last message and convert to Cohere
const chat = await this.cohere.chat({
model: this.model,
message: message,
chatHistory: cohereHistory,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.cohere.chat({
model: this.model,
message: message,
chatHistory: cohereHistory,
temperature,
})
);
if (!chat.hasOwnProperty("text")) return null;
return chat.text;
if (
!result.output.hasOwnProperty("text") ||
result.output.text.length === 0
)
return null;
const promptTokens = result.output.meta?.tokens?.inputTokens || 0;
const completionTokens = result.output.meta?.tokens?.outputTokens || 0;
return {
textResponse: result.output.text,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -120,21 +141,35 @@ class CohereLLM {
const message = messages[messages.length - 1].content; // Get the last message
const cohereHistory = this.#convertChatHistoryCohere(messages.slice(0, -1)); // Remove the last message and convert to Cohere
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.cohere.chatStream({
model: this.model,
message: message,
chatHistory: cohereHistory,
temperature,
}),
messages,
false
);
const stream = await this.cohere.chatStream({
model: this.model,
message: message,
chatHistory: cohereHistory,
temperature,
});
return { type: "stream", stream: stream };
return measuredStreamRequest;
}
/**
* Handles the stream response from the Cohere API.
* @param {Object} response - the response object
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream - the stream response from the Cohere API w/tracking
* @param {Object} responseProps - the response properties
* @returns {Promise<string>}
*/
async handleStream(response, stream, responseProps) {
return new Promise(async (resolve) => {
let fullText = "";
const { uuid = v4(), sources = [] } = responseProps;
let fullText = "";
let usage = {
prompt_tokens: 0,
completion_tokens: 0,
};
const handleAbort = () => {
writeResponseChunk(response, {
@ -146,12 +181,19 @@ class CohereLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream.endMeasurement(usage);
resolve(fullText);
};
response.on("close", handleAbort);
try {
for await (const chat of stream.stream) {
for await (const chat of stream) {
if (chat.eventType === "stream-end") {
const usageMetrics = chat?.response?.meta?.tokens || {};
usage.prompt_tokens = usageMetrics.inputTokens || 0;
usage.completion_tokens = usageMetrics.outputTokens || 0;
}
if (chat.eventType === "text-generation") {
const text = chat.text;
fullText += text;
@ -176,6 +218,7 @@ class CohereLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream.endMeasurement(usage);
resolve(fullText);
} catch (error) {
writeResponseChunk(response, {
@ -187,6 +230,7 @@ class CohereLLM {
error: error.message,
});
response.removeListener("close", handleAbort);
stream.endMeasurement(usage);
resolve(fullText);
}
});

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -74,19 +77,34 @@ class DeepSeekLLM {
`DeepSeek chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -95,13 +113,18 @@ class DeepSeekLLM {
`DeepSeek chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -84,15 +87,30 @@ class FireworksAiLLM {
`FireworksAI chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -101,13 +119,17 @@ class FireworksAiLLM {
`FireworksAI chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
writeResponseChunk,
clientAbortedHandler,
@ -227,13 +230,29 @@ class GeminiLLM {
history: this.formatMessages(messages),
safetySettings: this.#safetySettings(),
});
const result = await chatThread.sendMessage(prompt);
const response = result.response;
const responseText = response.text();
const { output: result, duration } =
await LLMPerformanceMonitor.measureAsyncFunction(
chatThread.sendMessage(prompt)
);
const responseText = result.response.text();
if (!responseText) throw new Error("Gemini: No response could be parsed.");
return responseText;
const promptTokens = LLMPerformanceMonitor.countTokens(messages);
const completionTokens = LLMPerformanceMonitor.countTokens([
{ content: responseText },
]);
return {
textResponse: responseText,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: (promptTokens + completionTokens) / duration,
duration,
},
};
}
async streamGetChatCompletion(messages = [], _opts = {}) {
@ -249,11 +268,14 @@ class GeminiLLM {
history: this.formatMessages(messages),
safetySettings: this.#safetySettings(),
});
const responseStream = await chatThread.sendMessageStream(prompt);
if (!responseStream.stream)
throw new Error("Could not stream response stream from Gemini.");
const responseStream = await LLMPerformanceMonitor.measureStream(
(await chatThread.sendMessageStream(prompt)).stream,
messages
);
return responseStream.stream;
if (!responseStream)
throw new Error("Could not stream response stream from Gemini.");
return responseStream;
}
async compressMessages(promptArgs = {}, rawHistory = []) {
@ -264,6 +286,10 @@ class GeminiLLM {
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
// Usage is not available for Gemini streams
// so we need to calculate the completion tokens manually
// because 1 chunk != 1 token in gemini responses and it buffers
// many tokens before sending them to the client as a "chunk"
return new Promise(async (resolve) => {
let fullText = "";
@ -272,7 +298,14 @@ class GeminiLLM {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens([
{ content: fullText },
]),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
for await (const chunk of stream) {
@ -292,6 +325,7 @@ class GeminiLLM {
close: true,
error: e.message,
});
stream?.endMeasurement({ completion_tokens: 0 });
resolve(e.message);
return;
}
@ -316,6 +350,11 @@ class GeminiLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens([
{ content: fullText },
]),
});
resolve(fullText);
});
}

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -139,31 +142,52 @@ class GenericOpenAiLLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
max_tokens: this.maxTokens,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
max_tokens: this.maxTokens,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output?.usage?.prompt_tokens || 0,
completion_tokens: result.output?.usage?.completion_tokens || 0,
total_tokens: result.output?.usage?.total_tokens || 0,
outputTps:
(result.output?.usage?.completion_tokens || 0) / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
max_tokens: this.maxTokens,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
max_tokens: this.maxTokens,
}),
messages
// runPromptTokenCalculation: true - There is not way to know if the generic provider connected is returning
// the correct usage metrics if any at all since any provider could be connected.
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -170,19 +173,36 @@ class GroqLLM {
`GroqAI:chatCompletion: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps:
result.output.usage.completion_tokens /
result.output.usage.completion_time,
duration: result.output.usage.total_time,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -191,13 +211,18 @@ class GroqLLM {
`GroqAI:streamChatCompletion: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -87,25 +90,48 @@ class HuggingFaceLLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps:
(result.output.usage?.completion_tokens || 0) / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -3,6 +3,9 @@ const {
clientAbortedHandler,
writeResponseChunk,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const { v4: uuidv4 } = require("uuid");
class KoboldCPPLLM {
@ -122,38 +125,71 @@ class KoboldCPPLLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
const promptTokens = LLMPerformanceMonitor.countTokens(messages);
const completionTokens = LLMPerformanceMonitor.countTokens([
{ content: result.output.choices[0].message.content },
]);
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
// Custom handler for KoboldCPP stream responses
return new Promise(async (resolve) => {
let fullText = "";
const handleAbort = () => clientAbortedHandler(resolve, fullText);
let usage = {
prompt_tokens: LLMPerformanceMonitor.countTokens(stream.messages || []),
completion_tokens: 0,
};
const handleAbort = () => {
usage.completion_tokens = LLMPerformanceMonitor.countTokens([
{ content: fullText },
]);
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
for await (const chunk of stream) {
@ -187,6 +223,10 @@ class KoboldCPPLLM {
error: false,
});
response.removeListener("close", handleAbort);
usage.completion_tokens = LLMPerformanceMonitor.countTokens([
{ content: fullText },
]);
stream?.endMeasurement(usage);
resolve(fullText);
}
}

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -121,31 +124,53 @@ class LiteLLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
max_tokens: parseInt(this.maxTokens), // LiteLLM requires int
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
max_tokens: parseInt(this.maxTokens), // LiteLLM requires int
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps:
(result.output.usage?.completion_tokens || 0) / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
max_tokens: parseInt(this.maxTokens), // LiteLLM requires int
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
max_tokens: parseInt(this.maxTokens), // LiteLLM requires int
}),
messages
// runPromptTokenCalculation: true - We manually count the tokens because they may or may not be provided in the stream
// responses depending on LLM connected. If they are provided, then we counted for nothing, but better than nothing.
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
// hybrid of openAi LLM chat completion for LMStudio
class LMStudioLLM {
@ -128,15 +131,30 @@ class LMStudioLLM {
`LMStudio chat: ${this.model} is not valid or defined model for chat completion!`
);
const result = await this.lmstudio.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.lmstudio.chat.completions.create({
model: this.model,
messages,
temperature,
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps: result.output.usage?.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -145,13 +163,16 @@ class LMStudioLLM {
`LMStudio chat: ${this.model} is not valid or defined model for chat completion!`
);
const streamRequest = await this.lmstudio.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.lmstudio.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -114,15 +117,35 @@ class LocalAiLLM {
`LocalAI chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
const promptTokens = LLMPerformanceMonitor.countTokens(messages);
const completionTokens = LLMPerformanceMonitor.countTokens(
result.output.choices[0].message.content
);
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -131,13 +154,16 @@ class LocalAiLLM {
`LocalAi chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -103,15 +106,34 @@ class MistralLLM {
`Mistral chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -120,13 +142,17 @@ class MistralLLM {
`Mistral chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -5,6 +5,9 @@ const {
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
// Docs: https://js.langchain.com/docs/integrations/chat/llama_cpp
const ChatLlamaCpp = (...args) =>
@ -126,16 +129,44 @@ class NativeLLM {
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const model = await this.#llamaClient({ temperature });
const response = await model.call(messages);
return response.content;
const result = await LLMPerformanceMonitor.measureAsyncFunction(
model.call(messages)
);
if (!result.output?.content) return null;
const promptTokens = LLMPerformanceMonitor.countTokens(messages);
const completionTokens = LLMPerformanceMonitor.countTokens(
result.output.content
);
return {
textResponse: result.output.content,
metrics: {
prompt_tokens: promptTokens,
completion_tokens: completionTokens,
total_tokens: promptTokens + completionTokens,
outputTps: completionTokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const model = await this.#llamaClient({ temperature });
const responseStream = await model.stream(messages);
return responseStream;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
model.stream(messages),
messages
);
return measuredStreamRequest;
}
/**
* Handles the default stream response for a chat.
* @param {import("express").Response} response
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream
* @param {Object} responseProps
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
@ -146,7 +177,12 @@ class NativeLLM {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
for await (const chunk of stream) {
@ -176,6 +212,9 @@ class NativeLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
});
}

View file

@ -7,6 +7,9 @@ const {
const fs = require("fs");
const path = require("path");
const { safeJsonParse } = require("../../http");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const cacheFolder = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "models", "novita")
@ -188,19 +191,34 @@ class NovitaLLM {
`Novita chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -209,15 +227,25 @@ class NovitaLLM {
`Novita chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
/**
* Handles the default stream response for a chat.
* @param {import("express").Response} response
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream
* @param {Object} responseProps
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const timeoutThresholdMs = this.timeout;
const { uuid = uuidv4(), sources = [] } = responseProps;
@ -230,7 +258,12 @@ class NovitaLLM {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
// NOTICE: Not all Novita models will return a stop reason
@ -259,6 +292,9 @@ class NovitaLLM {
});
clearInterval(timeoutCheck);
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}, 500);
@ -291,6 +327,9 @@ class NovitaLLM {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}
@ -304,6 +343,9 @@ class NovitaLLM {
error: e.message,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
});

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -153,15 +156,34 @@ class NvidiaNimLLM {
`Nvidia NIM chat: ${this.model} is not valid or defined model for chat completion!`
);
const result = await this.nvidiaNim.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.nvidiaNim.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -170,13 +192,16 @@ class NvidiaNimLLM {
`Nvidia NIM chat: ${this.model} is not valid or defined model for chat completion!`
);
const streamRequest = await this.nvidiaNim.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.nvidiaNim.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,9 +1,12 @@
const { StringOutputParser } = require("@langchain/core/output_parsers");
const {
writeResponseChunk,
clientAbortedHandler,
} = require("../../helpers/chat/responses");
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const { Ollama } = require("ollama");
// Docs: https://github.com/jmorganca/ollama/blob/main/docs/api.md
class OllamaAILLM {
@ -23,49 +26,11 @@ class OllamaAILLM {
user: this.promptWindowLimit() * 0.7,
};
this.client = new Ollama({ host: this.basePath });
this.embedder = embedder ?? new NativeEmbedder();
this.defaultTemp = 0.7;
}
#ollamaClient({ temperature = 0.07 }) {
const { ChatOllama } = require("@langchain/community/chat_models/ollama");
return new ChatOllama({
baseUrl: this.basePath,
model: this.model,
keepAlive: this.keepAlive,
useMLock: true,
// There are currently only two performance settings so if its not "base" - its max context.
...(this.performanceMode === "base"
? {}
: { numCtx: this.promptWindowLimit() }),
temperature,
});
}
// For streaming we use Langchain's wrapper to handle weird chunks
// or otherwise absorb headaches that can arise from Ollama models
#convertToLangchainPrototypes(chats = []) {
const {
HumanMessage,
SystemMessage,
AIMessage,
} = require("@langchain/core/messages");
const langchainChats = [];
const roleToMessageMap = {
system: SystemMessage,
user: HumanMessage,
assistant: AIMessage,
};
for (const chat of chats) {
if (!roleToMessageMap.hasOwnProperty(chat.role)) continue;
const MessageClass = roleToMessageMap[chat.role];
langchainChats.push(new MessageClass({ content: chat.content }));
}
return langchainChats;
}
#appendContext(contextTexts = []) {
if (!contextTexts || !contextTexts.length) return "";
return (
@ -105,21 +70,29 @@ class OllamaAILLM {
/**
* Generates appropriate content array for a message + attachments.
* @param {{userPrompt:string, attachments: import("../../helpers").Attachment[]}}
* @returns {string|object[]}
* @returns {{content: string, images: string[]}}
*/
#generateContent({ userPrompt, attachments = [] }) {
if (!attachments.length) {
return { content: userPrompt };
}
if (!attachments.length) return { content: userPrompt };
const images = attachments.map(
(attachment) => attachment.contentString.split("base64,").slice(-1)[0]
);
return { content: userPrompt, images };
}
const content = [{ type: "text", text: userPrompt }];
for (let attachment of attachments) {
content.push({
type: "image_url",
image_url: attachment.contentString,
});
/**
* Handles errors from the Ollama API to make them more user friendly.
* @param {Error} e
*/
#errorHandler(e) {
switch (e.message) {
case "fetch failed":
throw new Error(
"Your Ollama instance could not be reached or is not responding. Please make sure it is running the API server and your connection information is correct in AnythingLLM."
);
default:
return e;
}
return { content: content.flat() };
}
/**
@ -149,41 +122,103 @@ class OllamaAILLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const model = this.#ollamaClient({ temperature });
const textResponse = await model
.pipe(new StringOutputParser())
.invoke(this.#convertToLangchainPrototypes(messages))
.catch((e) => {
throw new Error(
`Ollama::getChatCompletion failed to communicate with Ollama. ${e.message}`
);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.client
.chat({
model: this.model,
stream: false,
messages,
keep_alive: this.keepAlive,
options: {
temperature,
useMLock: true,
// There are currently only two performance settings so if its not "base" - its max context.
...(this.performanceMode === "base"
? {}
: { numCtx: this.promptWindowLimit() }),
},
})
.then((res) => {
return {
content: res.message.content,
usage: {
prompt_tokens: res.prompt_eval_count,
completion_tokens: res.eval_count,
total_tokens: res.prompt_eval_count + res.eval_count,
},
};
})
.catch((e) => {
throw new Error(
`Ollama::getChatCompletion failed to communicate with Ollama. ${this.#errorHandler(e).message}`
);
})
);
if (!textResponse || !textResponse.length)
if (!result.output.content || !result.output.content.length)
throw new Error(`Ollama::getChatCompletion text response was empty.`);
return textResponse;
return {
textResponse: result.output.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens,
completion_tokens: result.output.usage.completion_tokens,
total_tokens: result.output.usage.total_tokens,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const model = this.#ollamaClient({ temperature });
const stream = await model
.pipe(new StringOutputParser())
.stream(this.#convertToLangchainPrototypes(messages));
return stream;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.client.chat({
model: this.model,
stream: true,
messages,
keep_alive: this.keepAlive,
options: {
temperature,
useMLock: true,
// There are currently only two performance settings so if its not "base" - its max context.
...(this.performanceMode === "base"
? {}
: { numCtx: this.promptWindowLimit() }),
},
}),
messages,
false
).catch((e) => {
throw this.#errorHandler(e);
});
return measuredStreamRequest;
}
/**
* Handles streaming responses from Ollama.
* @param {import("express").Response} response
* @param {import("../../helpers/chat/LLMPerformanceMonitor").MonitoredStream} stream
* @param {import("express").Request} request
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
return new Promise(async (resolve) => {
let fullText = "";
let usage = {
prompt_tokens: 0,
completion_tokens: 0,
};
// Establish listener to early-abort a streaming response
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
try {
@ -193,30 +228,36 @@ class OllamaAILLM {
"Stream returned undefined chunk. Aborting reply - check model provider logs."
);
const content = chunk.hasOwnProperty("content")
? chunk.content
: chunk;
fullText += content;
writeResponseChunk(response, {
uuid,
sources: [],
type: "textResponseChunk",
textResponse: content,
close: false,
error: false,
});
}
if (chunk.done) {
usage.prompt_tokens = chunk.prompt_eval_count;
usage.completion_tokens = chunk.eval_count;
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
break;
}
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: "",
close: true,
error: false,
});
response.removeListener("close", handleAbort);
resolve(fullText);
if (chunk.hasOwnProperty("message")) {
const content = chunk.message.content;
fullText += content;
writeResponseChunk(response, {
uuid,
sources,
type: "textResponseChunk",
textResponse: content,
close: false,
error: false,
});
}
}
} catch (error) {
writeResponseChunk(response, {
uuid,
@ -229,6 +270,8 @@ class OllamaAILLM {
}`,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
}
});
}

View file

@ -3,6 +3,9 @@ const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
const { MODEL_MAP } = require("../modelMap");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
class OpenAiLLM {
constructor(embedder = null, modelPreference = null) {
@ -132,19 +135,34 @@ class OpenAiLLM {
`OpenAI chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -153,13 +171,19 @@ class OpenAiLLM {
`OpenAI chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature: this.isO1Model ? 1 : temperature, // o1 models only accept temperature 1
}),
messages
// runPromptTokenCalculation: true - We manually count the tokens because OpenAI does not provide them in the stream
// since we are not using the OpenAI API version that supports this `stream_options` param.
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -7,6 +7,9 @@ const {
const fs = require("fs");
const path = require("path");
const { safeJsonParse } = require("../../http");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const cacheFolder = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, "models", "openrouter")
@ -190,19 +193,34 @@ class OpenRouterLLM {
`OpenRouter chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -211,15 +229,32 @@ class OpenRouterLLM {
`OpenRouter chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
// We have to manually count the tokens
// OpenRouter has a ton of providers and they all can return slightly differently
// some return chunk.usage on STOP, some do it after stop, its inconsistent.
// So it is possible reported metrics are inaccurate since we cannot reliably
// catch the metrics before resolving the stream - so we just pretend this functionality
// is not available.
);
return measuredStreamRequest;
}
/**
* Handles the default stream response for a chat.
* @param {import("express").Response} response
* @param {import('../../helpers/chat/LLMPerformanceMonitor').MonitoredStream} stream
* @param {Object} responseProps
* @returns {Promise<string>}
*/
handleStream(response, stream, responseProps) {
const timeoutThresholdMs = this.timeout;
const { uuid = uuidv4(), sources = [] } = responseProps;
@ -232,7 +267,12 @@ class OpenRouterLLM {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
// NOTICE: Not all OpenRouter models will return a stop reason
@ -261,6 +301,9 @@ class OpenRouterLLM {
});
clearInterval(timeoutCheck);
response.removeListener("close", handleAbort);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}, 500);
@ -293,6 +336,10 @@ class OpenRouterLLM {
error: false,
});
response.removeListener("close", handleAbort);
clearInterval(timeoutCheck);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
}
@ -306,6 +353,10 @@ class OpenRouterLLM {
error: e.message,
});
response.removeListener("close", handleAbort);
clearInterval(timeoutCheck);
stream?.endMeasurement({
completion_tokens: LLMPerformanceMonitor.countTokens(fullText),
});
resolve(fullText);
}
});

View file

@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
function perplexityModels() {
const { MODELS } = require("./models.js");
@ -86,19 +89,34 @@ class PerplexityLLM {
`Perplexity chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps: result.output.usage?.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -107,13 +125,16 @@ class PerplexityLLM {
`Perplexity chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
class TextGenWebUILLM {
constructor(embedder = null) {
@ -119,29 +122,47 @@ class TextGenWebUILLM {
}
async getChatCompletion(messages = null, { temperature = 0.7 }) {
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps: result.output.usage?.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
});
return streamRequest;
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -2,6 +2,9 @@ const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
function togetherAiModels() {
const { MODELS } = require("./models.js");
@ -109,15 +112,34 @@ class TogetherAiLLM {
`TogetherAI chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions.create({
model: this.model,
messages,
temperature,
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage?.prompt_tokens || 0,
completion_tokens: result.output.usage?.completion_tokens || 0,
total_tokens: result.output.usage?.total_tokens || 0,
outputTps: result.output.usage?.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -126,13 +148,17 @@ class TogetherAiLLM {
`TogetherAI chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -1,4 +1,7 @@
const { NativeEmbedder } = require("../../EmbeddingEngines/native");
const {
LLMPerformanceMonitor,
} = require("../../helpers/chat/LLMPerformanceMonitor");
const {
handleDefaultStreamResponseV2,
} = require("../../helpers/chat/responses");
@ -114,19 +117,34 @@ class XAiLLM {
`xAI chat: ${this.model} is not valid for chat completion!`
);
const result = await this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
});
const result = await LLMPerformanceMonitor.measureAsyncFunction(
this.openai.chat.completions
.create({
model: this.model,
messages,
temperature,
})
.catch((e) => {
throw new Error(e.message);
})
);
if (!result.hasOwnProperty("choices") || result.choices.length === 0)
if (
!result.output.hasOwnProperty("choices") ||
result.output.choices.length === 0
)
return null;
return result.choices[0].message.content;
return {
textResponse: result.output.choices[0].message.content,
metrics: {
prompt_tokens: result.output.usage.prompt_tokens || 0,
completion_tokens: result.output.usage.completion_tokens || 0,
total_tokens: result.output.usage.total_tokens || 0,
outputTps: result.output.usage.completion_tokens / result.duration,
duration: result.duration,
},
};
}
async streamGetChatCompletion(messages = null, { temperature = 0.7 }) {
@ -135,13 +153,18 @@ class XAiLLM {
`xAI chat: ${this.model} is not valid for chat completion!`
);
const streamRequest = await this.openai.chat.completions.create({
model: this.model,
stream: true,
const measuredStreamRequest = await LLMPerformanceMonitor.measureStream(
this.openai.chat.completions.create({
model: this.model,
stream: true,
messages,
temperature,
}),
messages,
temperature,
});
return streamRequest;
false
);
return measuredStreamRequest;
}
handleStream(response, stream, responseProps) {

View file

@ -18,6 +18,7 @@ const { Telemetry } = require("../../models/telemetry");
* @property {object[]} sources
* @property {boolean} close
* @property {string|null} error
* @property {object} metrics
*/
/**
@ -120,6 +121,7 @@ async function chatSync({
text: textResponse,
sources: [],
type: chatMode,
metrics: {},
},
include: false,
apiSessionId: sessionId,
@ -132,6 +134,7 @@ async function chatSync({
close: true,
error: null,
textResponse,
metrics: {},
};
}
@ -193,6 +196,7 @@ async function chatSync({
sources: [],
close: true,
error: vectorSearchResults.message,
metrics: {},
};
}
@ -228,6 +232,7 @@ async function chatSync({
text: textResponse,
sources: [],
type: chatMode,
metrics: {},
},
threadId: thread?.id || null,
include: false,
@ -242,6 +247,7 @@ async function chatSync({
close: true,
error: null,
textResponse,
metrics: {},
};
}
@ -259,9 +265,10 @@ async function chatSync({
);
// Send the text completion.
const textResponse = await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
const { textResponse, metrics: performanceMetrics } =
await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
if (!textResponse) {
return {
@ -271,13 +278,19 @@ async function chatSync({
sources: [],
close: true,
error: "No text completion could be completed with this input.",
metrics: performanceMetrics,
};
}
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: textResponse, sources, type: chatMode },
response: {
text: textResponse,
sources,
type: chatMode,
metrics: performanceMetrics,
},
threadId: thread?.id || null,
apiSessionId: sessionId,
user,
@ -291,6 +304,7 @@ async function chatSync({
chatId: chat.id,
textResponse,
sources,
metrics: performanceMetrics,
};
}
@ -396,6 +410,7 @@ async function streamChat({
attachments: [],
close: true,
error: null,
metrics: {},
});
await WorkspaceChats.new({
workspaceId: workspace.id,
@ -405,6 +420,7 @@ async function streamChat({
sources: [],
type: chatMode,
attachments: [],
metrics: {},
},
threadId: thread?.id || null,
apiSessionId: sessionId,
@ -418,6 +434,7 @@ async function streamChat({
// 1. Chatting in "chat" mode and may or may _not_ have embeddings
// 2. Chatting in "query" mode and has at least 1 embedding
let completeText;
let metrics = {};
let contextTexts = [];
let sources = [];
let pinnedDocIdentifiers = [];
@ -479,6 +496,7 @@ async function streamChat({
sources: [],
close: true,
error: vectorSearchResults.message,
metrics: {},
});
return;
}
@ -514,6 +532,7 @@ async function streamChat({
sources: [],
close: true,
error: null,
metrics: {},
});
await WorkspaceChats.new({
@ -524,6 +543,7 @@ async function streamChat({
sources: [],
type: chatMode,
attachments: [],
metrics: {},
},
threadId: thread?.id || null,
apiSessionId: sessionId,
@ -552,9 +572,12 @@ async function streamChat({
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
completeText = await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
const { textResponse, metrics: performanceMetrics } =
await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = textResponse;
metrics = performanceMetrics;
writeResponseChunk(response, {
uuid,
sources,
@ -562,6 +585,7 @@ async function streamChat({
textResponse: completeText,
close: true,
error: false,
metrics,
});
} else {
const stream = await LLMConnector.streamGetChatCompletion(messages, {
@ -571,13 +595,14 @@ async function streamChat({
uuid,
sources,
});
metrics = stream.metrics;
}
if (completeText?.length > 0) {
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: completeText, sources, type: chatMode },
response: { text: completeText, sources, type: chatMode, metrics },
threadId: thread?.id || null,
apiSessionId: sessionId,
user,
@ -589,6 +614,7 @@ async function streamChat({
close: true,
error: false,
chatId: chat.id,
metrics,
});
return;
}

View file

@ -54,6 +54,7 @@ async function streamChatWithForEmbed(
}
let completeText;
let metrics = {};
let contextTexts = [];
let sources = [];
let pinnedDocIdentifiers = [];
@ -164,9 +165,12 @@ async function streamChatWithForEmbed(
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
completeText = await LLMConnector.getChatCompletion(messages, {
temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
const { textResponse, metrics: performanceMetrics } =
await LLMConnector.getChatCompletion(messages, {
temperature: embed.workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = textResponse;
metrics = performanceMetrics;
writeResponseChunk(response, {
uuid,
sources: [],
@ -183,12 +187,13 @@ async function streamChatWithForEmbed(
uuid,
sources: [],
});
metrics = stream.metrics;
}
await EmbedChats.new({
embedId: embed.id,
prompt: message,
response: { text: completeText, type: chatMode, sources },
response: { text: completeText, type: chatMode, sources, metrics },
connection_information: response.locals.connection
? {
...response.locals.connection,

View file

@ -156,10 +156,13 @@ async function chatSync({
});
// Send the text completion.
const textResponse = await LLMConnector.getChatCompletion(messages, {
temperature:
temperature ?? workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
const { textResponse, metrics } = await LLMConnector.getChatCompletion(
messages,
{
temperature:
temperature ?? workspace?.openAiTemp ?? LLMConnector.defaultTemp,
}
);
if (!textResponse) {
return formatJSON(
@ -171,14 +174,14 @@ async function chatSync({
error: "No text completion could be completed with this input.",
textResponse: null,
},
{ model: workspace.slug, finish_reason: "no_content" }
{ model: workspace.slug, finish_reason: "no_content", usage: metrics }
);
}
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: prompt,
response: { text: textResponse, sources, type: chatMode },
response: { text: textResponse, sources, type: chatMode, metrics },
});
return formatJSON(
@ -191,7 +194,7 @@ async function chatSync({
textResponse,
sources,
},
{ model: workspace.slug, finish_reason: "stop" }
{ model: workspace.slug, finish_reason: "stop", usage: metrics }
);
}
@ -414,7 +417,12 @@ async function streamChat({
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: prompt,
response: { text: completeText, sources, type: chatMode },
response: {
text: completeText,
sources,
type: chatMode,
metrics: stream.metrics,
},
});
writeResponseChunk(
@ -428,7 +436,12 @@ async function streamChat({
chatId: chat.id,
textResponse: "",
},
{ chunked: true, model: workspace.slug, finish_reason: "stop" }
{
chunked: true,
model: workspace.slug,
finish_reason: "stop",
usage: stream.metrics,
}
)
);
return;
@ -444,13 +457,21 @@ async function streamChat({
error: false,
textResponse: "",
},
{ chunked: true, model: workspace.slug, finish_reason: "stop" }
{
chunked: true,
model: workspace.slug,
finish_reason: "stop",
usage: stream.metrics,
}
)
);
return;
}
function formatJSON(chat, { chunked = false, model, finish_reason = null }) {
function formatJSON(
chat,
{ chunked = false, model, finish_reason = null, usage = {} }
) {
const data = {
id: chat.uuid ?? chat.id,
object: "chat.completion",
@ -466,6 +487,7 @@ function formatJSON(chat, { chunked = false, model, finish_reason = null }) {
finish_reason: finish_reason,
},
],
usage,
};
return data;

View file

@ -94,6 +94,7 @@ async function streamChatWithWorkspace(
// 1. Chatting in "chat" mode and may or may _not_ have embeddings
// 2. Chatting in "query" mode and has at least 1 embedding
let completeText;
let metrics = {};
let contextTexts = [];
let sources = [];
let pinnedDocIdentifiers = [];
@ -226,9 +227,13 @@ async function streamChatWithWorkspace(
console.log(
`\x1b[31m[STREAMING DISABLED]\x1b[0m Streaming is not available for ${LLMConnector.constructor.name}. Will use regular chat method.`
);
completeText = await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
const { textResponse, metrics: performanceMetrics } =
await LLMConnector.getChatCompletion(messages, {
temperature: workspace?.openAiTemp ?? LLMConnector.defaultTemp,
});
completeText = textResponse;
metrics = performanceMetrics;
writeResponseChunk(response, {
uuid,
sources,
@ -236,6 +241,7 @@ async function streamChatWithWorkspace(
textResponse: completeText,
close: true,
error: false,
metrics,
});
} else {
const stream = await LLMConnector.streamGetChatCompletion(messages, {
@ -245,13 +251,20 @@ async function streamChatWithWorkspace(
uuid,
sources,
});
metrics = stream.metrics;
}
if (completeText?.length > 0) {
const { chat } = await WorkspaceChats.new({
workspaceId: workspace.id,
prompt: message,
response: { text: completeText, sources, type: chatMode, attachments },
response: {
text: completeText,
sources,
type: chatMode,
attachments,
metrics,
},
threadId: thread?.id || null,
user,
});
@ -262,6 +275,7 @@ async function streamChatWithWorkspace(
close: true,
error: false,
chatId: chat.id,
metrics,
});
return;
}
@ -271,6 +285,7 @@ async function streamChatWithWorkspace(
type: "finalizeResponseStream",
close: true,
error: false,
metrics,
});
return;
}

View file

@ -0,0 +1,101 @@
const { TokenManager } = require("../tiktoken");
/**
* @typedef {import("openai/streaming").Stream<import("openai").OpenAI.ChatCompletionChunk>} OpenAICompatibleStream
* @typedef {(reportedUsage: {[key: string]: number, completion_tokens?: number, prompt_tokens?: number}) => StreamMetrics} EndMeasurementFunction
* @typedef {Array<{content: string}>} Messages
*/
/**
* @typedef {Object} StreamMetrics
* @property {number} prompt_tokens - the number of tokens in the prompt
* @property {number} completion_tokens - the number of tokens in the completion
* @property {number} total_tokens - the total number of tokens
* @property {number} outputTps - the tokens per second of the output
* @property {number} duration - the duration of the stream
*/
/**
* @typedef {Object} MonitoredStream
* @property {number} start - the start time of the stream
* @property {number} duration - the duration of the stream
* @property {StreamMetrics} metrics - the metrics of the stream
* @property {EndMeasurementFunction} endMeasurement - the method to end the stream and calculate the metrics
*/
class LLMPerformanceMonitor {
static tokenManager = new TokenManager();
/**
* Counts the tokens in the messages.
* @param {Array<{content: string}>} messages - the messages sent to the LLM so we can calculate the prompt tokens since most providers do not return this on stream
* @returns {number}
*/
static countTokens(messages = []) {
try {
return this.tokenManager.statsFrom(messages);
} catch (e) {
return 0;
}
}
/**
* Wraps a function and logs the duration (in seconds) of the function call.
* @param {Function} func
* @returns {Promise<{output: any, duration: number}>}
*/
static measureAsyncFunction(func) {
return (async () => {
const start = Date.now();
const output = await func; // is a promise
const end = Date.now();
return { output, duration: (end - start) / 1000 };
})();
}
/**
* Wraps a completion stream and and attaches a start time and duration property to the stream.
* Also attaches an `endMeasurement` method to the stream that will calculate the duration of the stream and metrics.
* @param {Promise<OpenAICompatibleStream>} func
* @param {Messages} messages - the messages sent to the LLM so we can calculate the prompt tokens since most providers do not return this on stream
* @param {boolean} runPromptTokenCalculation - whether to run the prompt token calculation to estimate the `prompt_tokens` metric. This is useful for providers that do not return this on stream.
* @returns {Promise<MonitoredStream>}
*/
static async measureStream(
func,
messages = [],
runPromptTokenCalculation = true
) {
const stream = await func;
stream.start = Date.now();
stream.duration = 0;
stream.metrics = {
completion_tokens: 0,
prompt_tokens: runPromptTokenCalculation ? this.countTokens(messages) : 0,
total_tokens: 0,
outputTps: 0,
duration: 0,
};
stream.endMeasurement = (reportedUsage = {}) => {
const end = Date.now();
const duration = (end - stream.start) / 1000;
// Merge the reported usage with the existing metrics
// so the math in the metrics object is correct when calculating
stream.metrics = {
...stream.metrics,
...reportedUsage,
};
stream.metrics.total_tokens =
stream.metrics.prompt_tokens + (stream.metrics.completion_tokens || 0);
stream.metrics.outputTps = stream.metrics.completion_tokens / duration;
stream.metrics.duration = duration;
return stream.metrics;
};
return stream;
}
}
module.exports = {
LLMPerformanceMonitor,
};

View file

@ -9,9 +9,29 @@ function clientAbortedHandler(resolve, fullText) {
return;
}
/**
* Handles the default stream response for a chat.
* @param {import("express").Response} response
* @param {import('./LLMPerformanceMonitor').MonitoredStream} stream
* @param {Object} responseProps
* @returns {Promise<string>}
*/
function handleDefaultStreamResponseV2(response, stream, responseProps) {
const { uuid = uuidv4(), sources = [] } = responseProps;
// Why are we doing this?
// OpenAI do enable the usage metrics in the stream response but:
// 1. This parameter is not available in our current API version (TODO: update)
// 2. The usage metrics are not available in _every_ provider that uses this function
// 3. We need to track the usage metrics for every provider that uses this function - not just OpenAI
// Other keys are added by the LLMPerformanceMonitor.measureStream method
let hasUsageMetrics = false;
let usage = {
// prompt_tokens can be in this object if the provider supports it - otherwise we manually count it
// When the stream is created in the LLMProviders `streamGetChatCompletion` `LLMPerformanceMonitor.measureStream` call.
completion_tokens: 0,
};
return new Promise(async (resolve) => {
let fullText = "";
@ -19,7 +39,10 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) {
// in case things go sideways or the user does not like the response.
// We preserve the generated text but continue as if chat was completed
// to preserve previously generated content.
const handleAbort = () => clientAbortedHandler(resolve, fullText);
const handleAbort = () => {
stream?.endMeasurement(usage);
clientAbortedHandler(resolve, fullText);
};
response.on("close", handleAbort);
// Now handle the chunks from the streamed response and append to fullText.
@ -28,8 +51,28 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) {
const message = chunk?.choices?.[0];
const token = message?.delta?.content;
// If we see usage metrics in the chunk, we can use them directly
// instead of estimating them, but we only want to assign values if
// the response object is the exact same key:value pair we expect.
if (
chunk.hasOwnProperty("usage") && // exists
!!chunk.usage && // is not null
Object.values(chunk.usage).length > 0 // has values
) {
if (chunk.usage.hasOwnProperty("prompt_tokens")) {
usage.prompt_tokens = Number(chunk.usage.prompt_tokens);
}
if (chunk.usage.hasOwnProperty("completion_tokens")) {
hasUsageMetrics = true; // to stop estimating counter
usage.completion_tokens = Number(chunk.usage.completion_tokens);
}
}
if (token) {
fullText += token;
// If we never saw a usage metric, we can estimate them by number of completion chunks
if (!hasUsageMetrics) usage.completion_tokens++;
writeResponseChunk(response, {
uuid,
sources: [],
@ -56,6 +99,7 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) {
error: false,
});
response.removeListener("close", handleAbort);
stream?.endMeasurement(usage);
resolve(fullText);
break; // Break streaming when a valid finish_reason is first encountered
}
@ -70,6 +114,7 @@ function handleDefaultStreamResponseV2(response, stream, responseProps) {
close: true,
error: e.message,
});
stream?.endMeasurement(usage);
resolve(fullText); // Return what we currently have - if anything.
}
});
@ -111,6 +156,7 @@ function convertToChatHistory(history = []) {
chatId: id,
sentAt: moment(createdAt).unix(),
feedbackScore,
metrics: data?.metrics || {},
},
]);
}

View file

@ -6,14 +6,38 @@
* @property {string} contentString - full base64 encoded string of file
*/
/**
* @typedef {Object} ResponseMetrics
* @property {number} prompt_tokens - The number of prompt tokens used
* @property {number} completion_tokens - The number of completion tokens used
* @property {number} total_tokens - The total number of tokens used
* @property {number} outputTps - The output tokens per second
* @property {number} duration - The duration of the request in seconds
*
* @typedef {Object} ChatMessage
* @property {string} role - The role of the message sender (e.g. 'user', 'assistant', 'system')
* @property {string} content - The content of the message
*
* @typedef {Object} ChatCompletionResponse
* @property {string} textResponse - The text response from the LLM
* @property {ResponseMetrics} metrics - The response metrics
*
* @typedef {Object} ChatCompletionOptions
* @property {number} temperature - The sampling temperature for the LLM response
*
* @typedef {function(Array<ChatMessage>, ChatCompletionOptions): Promise<ChatCompletionResponse>} getChatCompletionFunction
*
* @typedef {function(Array<ChatMessage>, ChatCompletionOptions): Promise<import("./chat/LLMPerformanceMonitor").MonitoredStream>} streamGetChatCompletionFunction
*/
/**
* @typedef {Object} BaseLLMProvider - A basic llm provider object
* @property {Function} streamingEnabled - Checks if streaming is enabled for chat completions.
* @property {Function} promptWindowLimit - Returns the token limit for the current model.
* @property {Function} isValidChatCompletionModel - Validates if the provided model is suitable for chat completion.
* @property {Function} constructPrompt - Constructs a formatted prompt for the chat completion request.
* @property {Function} getChatCompletion - Gets a chat completion response from OpenAI.
* @property {Function} streamGetChatCompletion - Streams a chat completion response from OpenAI.
* @property {getChatCompletionFunction} getChatCompletion - Gets a chat completion response from OpenAI.
* @property {streamGetChatCompletionFunction} streamGetChatCompletion - Streams a chat completion response from OpenAI.
* @property {Function} handleStream - Handles the streaming response.
* @property {Function} embedTextInput - Embeds the provided text input using the specified embedder.
* @property {Function} embedChunks - Embeds multiple chunks of text using the specified embedder.