Migrate the shared chat page to also use the new SSE streaming format

This commit is contained in:
sabaimran 2024-08-01 13:14:09 +05:30
parent 833553c3a3
commit bfeb64b48f

View file

@ -10,12 +10,12 @@ import Loading from '../../components/loading/loading';
import 'katex/dist/katex.min.css';
import { welcomeConsole } from '../../common/utils';
import { useIPLocationData, welcomeConsole } from '../../common/utils';
import { useAuthenticatedData } from '@/app/common/auth';
import ChatInputArea, { ChatOptions } from '@/app/components/chatInputArea/chatInputArea';
import { StreamMessage } from '@/app/components/chatMessage/chatMessage';
import { handleCompiledReferences, handleImageResponse, setupWebSocket } from '@/app/common/chatFunctions';
import { convertMessageChunkToJson, handleCompiledReferences, handleImageResponse, RawReferenceData, setupWebSocket } from '@/app/common/chatFunctions';
import { AgentData } from '@/app/agents/page';
@ -95,7 +95,6 @@ export default function SharedChat() {
const [isLoading, setLoading] = useState(true);
const [title, setTitle] = useState('Khoj AI - Chat');
const [conversationId, setConversationID] = useState<string | undefined>(undefined);
const [chatWS, setChatWS] = useState<WebSocket | null>(null);
const [messages, setMessages] = useState<StreamMessage[]>([]);
const [queryToProcess, setQueryToProcess] = useState<string>('');
const [processQuerySignal, setProcessQuerySignal] = useState(false);
@ -103,76 +102,11 @@ export default function SharedChat() {
const [isMobileWidth, setIsMobileWidth] = useState(false);
const [paramSlug, setParamSlug] = useState<string | undefined>(undefined);
const locationData = useIPLocationData();
const authenticatedData = useAuthenticatedData();
welcomeConsole();
const handleWebSocketMessage = (event: MessageEvent) => {
let chunk = event.data;
let currentMessage = messages.find(message => !message.completed);
if (!currentMessage) {
console.error("No current message found");
return;
}
// Process WebSocket streamed data
if (chunk === "start_llm_response") {
console.log("Started streaming", new Date());
} else if (chunk === "end_llm_response") {
currentMessage.completed = true;
} else {
// Get the current message
// Process and update state with the new message
if (chunk.includes("application/json")) {
chunk = JSON.parse(chunk);
}
const contentType = chunk["content-type"];
if (contentType === "application/json") {
try {
if (chunk.image || chunk.detail) {
let responseWithReference = handleImageResponse(chunk);
console.log("Image response", responseWithReference);
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else if (chunk.type == "status") {
currentMessage.trainOfThought.push(chunk.message);
} else if (chunk.type == "rate_limit") {
console.log("Rate limit message", chunk);
currentMessage.rawResponse = chunk.message;
} else {
console.log("any message", chunk);
}
} catch (error) {
console.error("Error processing message", error);
currentMessage.completed = true;
} finally {
// no-op
}
} else {
// Update the current message with the new chunk
if (chunk && chunk.includes("### compiled references:")) {
let responseWithReference = handleCompiledReferences(chunk, "");
currentMessage.rawResponse += responseWithReference.response;
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else {
// If the chunk is not a JSON object, just display it as is
currentMessage.rawResponse += chunk;
}
}
};
// Update the state with the new message, currentMessage
setMessages([...messages]);
}
useEffect(() => {
fetch('/api/chat/options')
@ -201,6 +135,7 @@ export default function SharedChat() {
useEffect(() => {
if (queryToProcess && !conversationId) {
// If the user has not yet started conversing in the chat, create a new conversation
fetch(`/api/chat/share/fork?public_conversation_slug=${paramSlug}`, {
method: 'POST',
headers: {
@ -219,7 +154,7 @@ export default function SharedChat() {
}
if (chatWS && queryToProcess) {
if (queryToProcess) {
// Add a new object to the state
const newStreamMessage: StreamMessage = {
rawResponse: "",
@ -232,40 +167,68 @@ export default function SharedChat() {
}
setMessages(prevMessages => [...prevMessages, newStreamMessage]);
setProcessQuerySignal(true);
} else {
if (!chatWS) {
console.error("No WebSocket connection available");
}
if (!queryToProcess) {
console.error("No query to process");
}
}
}, [queryToProcess]);
useEffect(() => {
if (processQuerySignal && chatWS) {
setProcessQuerySignal(false);
chatWS.onmessage = handleWebSocketMessage;
chatWS?.send(queryToProcess);
if (processQuerySignal) {
chat();
}
}, [processQuerySignal]);
useEffect(() => {
if (chatWS) {
chatWS.onmessage = handleWebSocketMessage;
async function readChatStream(response: Response) {
if (!response.ok) throw new Error(response.statusText);
if (!response.body) throw new Error("Response body is null");
const reader = response.body.getReader();
const decoder = new TextDecoder();
const eventDelimiter = '␃🔚␗';
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
setQueryToProcess('');
setProcessQuerySignal(false);
break;
}
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
let newEventIndex;
while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) {
const event = buffer.slice(0, newEventIndex);
buffer = buffer.slice(newEventIndex + eventDelimiter.length);
if (event) {
processMessageChunk(event);
}
}
}
}, [chatWS]);
}
async function chat() {
if (!queryToProcess || !conversationId) return;
let chatAPI = `/api/chat?q=${encodeURIComponent(queryToProcess)}&conversation_id=${conversationId}&stream=true&client=web`;
if (locationData) {
chatAPI += `&region=${locationData.region}&country=${locationData.country}&city=${locationData.city}&timezone=${locationData.timezone}`;
}
const response = await fetch(chatAPI);
try {
await readChatStream(response);
} catch (err) {
console.log(err);
}
}
useEffect(() => {
(async () => {
if (conversationId) {
const newWS = await setupWebSocket(conversationId, queryToProcess);
if (!newWS) {
console.error("No WebSocket connection available");
return;
}
setChatWS(newWS);
// Add a new object to the state
const newStreamMessage: StreamMessage = {
rawResponse: "",
@ -276,11 +239,66 @@ export default function SharedChat() {
timestamp: (new Date()).toISOString(),
rawQuery: queryToProcess || "",
}
setProcessQuerySignal(true);
setMessages(prevMessages => [...prevMessages, newStreamMessage]);
}
})();
}, [conversationId]);
function processMessageChunk(rawChunk: string) {
const chunk = convertMessageChunkToJson(rawChunk);
const currentMessage = messages.find(message => !message.completed);
if (!currentMessage) {
return;
}
if (!chunk || !chunk.type) {
return;
}
if (chunk.type === "status") {
const statusMessage = chunk.data as string;
currentMessage.trainOfThought.push(statusMessage);
} else if (chunk.type === "references") {
const references = chunk.data as RawReferenceData;
if (references.context) {
currentMessage.context = references.context;
}
if (references.onlineContext) {
currentMessage.onlineContext = references.onlineContext;
}
} else if (chunk.type === "message") {
const chunkData = chunk.data;
if (chunkData !== null && typeof chunkData === 'object') {
try {
const jsonData = chunkData as any;
if (jsonData.image || jsonData.detail) {
let responseWithReference = handleImageResponse(chunk.data, true);
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else if (jsonData.response) {
currentMessage.rawResponse = jsonData.response;
}
else {
console.log("any message", chunk);
}
} catch (e) {
currentMessage.rawResponse += chunkData;
}
} else {
currentMessage.rawResponse += chunkData;
}
} else if (chunk.type === "end_llm_response") {
currentMessage.completed = true;
}
setMessages([...messages]);
}
if (isLoading) {
return <Loading />;
}
@ -301,7 +319,6 @@ export default function SharedChat() {
</title>
<div className={styles.sidePanel}>
<SidePanel
webSocketConnected={!!conversationId ? (chatWS != null) : true}
conversationId={conversationId ?? null}
uploadedFiles={uploadedFiles}
isMobileWidth={isMobileWidth}