From cd85a519806223afacd5366f9e5c8fa6e3900a58 Mon Sep 17 00:00:00 2001 From: sabaimran Date: Thu, 1 Aug 2024 12:50:43 +0530 Subject: [PATCH] Ingest new format for server sent events within the HTTP streamed response - Note that the SSR for next doesn't support rendering on the client-side, so it'll only update it one big chunk - Fix unique key error in the chatmessage history for incoming messages - Remove websocket value usage in the chat history side panel - Remove other websocket code from the chat page --- src/interface/web/app/chat/page.tsx | 220 +++++++++--------- src/interface/web/app/common/chatFunctions.ts | 48 +++- .../components/chatHistory/chatHistory.tsx | 11 +- .../chatMessage/chatMessage.module.css | 5 + .../components/chatMessage/chatMessage.tsx | 10 +- .../sidePanel/chatHistorySidePanel.tsx | 10 - 6 files changed, 173 insertions(+), 131 deletions(-) diff --git a/src/interface/web/app/chat/page.tsx b/src/interface/web/app/chat/page.tsx index f81287fd..43533ab8 100644 --- a/src/interface/web/app/chat/page.tsx +++ b/src/interface/web/app/chat/page.tsx @@ -9,12 +9,12 @@ import NavMenu from '../components/navMenu/navMenu'; import { useSearchParams } from 'next/navigation' import Loading from '../components/loading/loading'; -import { handleCompiledReferences, handleImageResponse, setupWebSocket } from '../common/chatFunctions'; +import { convertMessageChunkToJson, handleImageResponse, RawReferenceData } from '../common/chatFunctions'; import 'katex/dist/katex.min.css'; import { StreamMessage } from '../components/chatMessage/chatMessage'; -import { welcomeConsole } from '../common/utils'; +import { useIPLocationData, welcomeConsole } from '../common/utils'; import ChatInputArea, { ChatOptions } from '../components/chatInputArea/chatInputArea'; import { useAuthenticatedData } from '../common/auth'; import { AgentData } from '../agents/page'; @@ -97,83 +97,22 @@ function ChatBodyData(props: ChatBodyDataProps) { ); } + export default function Chat() { const [chatOptionsData, setChatOptionsData] = useState(null); const [isLoading, setLoading] = useState(true); const [title, setTitle] = useState('Khoj AI - Chat'); const [conversationId, setConversationID] = useState(null); - const [chatWS, setChatWS] = useState(null); const [messages, setMessages] = useState([]); const [queryToProcess, setQueryToProcess] = useState(''); const [processQuerySignal, setProcessQuerySignal] = useState(false); const [uploadedFiles, setUploadedFiles] = useState([]); const [isMobileWidth, setIsMobileWidth] = useState(false); + const locationData = useIPLocationData(); const authenticatedData = useAuthenticatedData(); welcomeConsole(); - const handleWebSocketMessage = (event: MessageEvent) => { - let chunk = event.data; - let currentMessage = messages.find(message => !message.completed); - if (!currentMessage) { - console.error("No current message found"); - return; - } - - // Process WebSocket streamed data - if (chunk === "start_llm_response") { - console.log("Started streaming", new Date()); - } else if (chunk === "end_llm_response") { - currentMessage.completed = true; - } else { - // Get the current message - // Process and update state with the new message - if (chunk.includes("application/json")) { - chunk = JSON.parse(chunk); - } - - const contentType = chunk["content-type"]; - if (contentType === "application/json") { - try { - if (chunk.image || chunk.detail) { - let responseWithReference = handleImageResponse(chunk); - console.log("Image response", responseWithReference); - if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; - if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; - if (responseWithReference.context) currentMessage.context = responseWithReference.context; - } else if (chunk.type == "status") { - currentMessage.trainOfThought.push(chunk.message); - } else if (chunk.type == "rate_limit") { - console.log("Rate limit message", chunk); - currentMessage.rawResponse = chunk.message; - } else { - console.log("any message", chunk); - } - } catch (error) { - console.error("Error processing message", error); - currentMessage.completed = true; - } finally { - // no-op - } - } else { - // Update the current message with the new chunk - if (chunk && chunk.includes("### compiled references:")) { - let responseWithReference = handleCompiledReferences(chunk, ""); - currentMessage.rawResponse += responseWithReference.response; - - if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; - if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; - if (responseWithReference.context) currentMessage.context = responseWithReference.context; - } else { - // If the chunk is not a JSON object, just display it as is - currentMessage.rawResponse += chunk; - } - } - }; - // Update the state with the new message, currentMessage - setMessages([...messages]); - } - useEffect(() => { fetch('/api/chat/options') .then(response => response.json()) @@ -198,19 +137,7 @@ export default function Chat() { }, []); useEffect(() => { - if (chatWS) { - chatWS.onmessage = handleWebSocketMessage; - } - }, [chatWS, messages]); - - //same as ChatBodyData for local storage message - useEffect(() => { - const storedMessage = localStorage.getItem("message"); - setQueryToProcess(storedMessage || ''); - }, []); - - useEffect(() => { - if (chatWS && queryToProcess) { + if (queryToProcess) { const newStreamMessage: StreamMessage = { rawResponse: "", trainOfThought: [], @@ -221,44 +148,118 @@ export default function Chat() { rawQuery: queryToProcess || "", }; setMessages(prevMessages => [...prevMessages, newStreamMessage]); - - if (chatWS.readyState === WebSocket.OPEN) { - chatWS.send(queryToProcess); - setProcessQuerySignal(true); - } - else { - console.error("WebSocket is not open. ReadyState:", chatWS.readyState); - } - - setQueryToProcess(''); + setProcessQuerySignal(true); } - }, [queryToProcess, chatWS]); + }, [queryToProcess]); useEffect(() => { - if (processQuerySignal && chatWS && chatWS.readyState === WebSocket.OPEN) { - setProcessQuerySignal(false); - chatWS.onmessage = handleWebSocketMessage; - chatWS.send(queryToProcess); - localStorage.removeItem("message"); + if (processQuerySignal) { + chat(); } - }, [processQuerySignal, chatWS]); + }, [processQuerySignal]); - useEffect(() => { - const setupWebSocketConnection = async () => { - if (conversationId && (!chatWS || chatWS.readyState === WebSocket.CLOSED)) { - if (queryToProcess) { - const newWS = await setupWebSocket(conversationId, queryToProcess); - localStorage.removeItem("message"); - setChatWS(newWS); - } - else { - const newWS = await setupWebSocket(conversationId); - setChatWS(newWS); + async function readChatStream(response: Response) { + if (!response.ok) throw new Error(response.statusText); + if (!response.body) throw new Error("Response body is null"); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + const eventDelimiter = '␃🔚␗'; + let buffer = ""; + + while (true) { + + const { done, value } = await reader.read(); + if (done) { + setQueryToProcess(''); + setProcessQuerySignal(false); + break; + } + + const chunk = decoder.decode(value, { stream: true }); + + buffer += chunk; + + let newEventIndex; + while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) { + const event = buffer.slice(0, newEventIndex); + buffer = buffer.slice(newEventIndex + eventDelimiter.length); + if (event) { + processMessageChunk(event); } } - }; - setupWebSocketConnection(); - }, [conversationId]); + + } + } + + async function chat() { + localStorage.removeItem("message"); + let chatAPI = `/api/chat?q=${encodeURIComponent(queryToProcess)}&conversation_id=${conversationId}&stream=true&client=web`; + if (locationData) { + chatAPI += `®ion=${locationData.region}&country=${locationData.country}&city=${locationData.city}&timezone=${locationData.timezone}`; + } + + const response = await fetch(chatAPI); + try { + await readChatStream(response); + } catch (err) { + console.log(err); + } + } + + function processMessageChunk(rawChunk: string) { + const chunk = convertMessageChunkToJson(rawChunk); + const currentMessage = messages.find(message => !message.completed); + + if (!currentMessage) { + return; + } + + if (!chunk || !chunk.type) { + return; + } + + if (chunk.type === "status") { + const statusMessage = chunk.data as string; + currentMessage.trainOfThought.push(statusMessage); + } else if (chunk.type === "references") { + const references = chunk.data as RawReferenceData; + + if (references.context) { + currentMessage.context = references.context; + } + + if (references.onlineContext) { + currentMessage.onlineContext = references.onlineContext; + } + } else if (chunk.type === "message") { + const chunkData = chunk.data; + + if (chunkData !== null && typeof chunkData === 'object') { + try { + const jsonData = chunkData as any; + if (jsonData.image || jsonData.detail) { + let responseWithReference = handleImageResponse(chunk.data, true); + if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response; + if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online; + if (responseWithReference.context) currentMessage.context = responseWithReference.context; + } else if (jsonData.response) { + currentMessage.rawResponse = jsonData.response; + } + else { + console.log("any message", chunk); + } + } catch (e) { + currentMessage.rawResponse += chunkData; + } + } else { + currentMessage.rawResponse += chunkData; + } + } else if (chunk.type === "end_llm_response") { + currentMessage.completed = true; + } + setMessages([...messages]); + } const handleConversationIdChange = (newConversationId: string) => { setConversationID(newConversationId); @@ -276,7 +277,6 @@ export default function Chat() {
0) { + return { + type: "message", + data: chunk + }; + } else { + return { + type: "message", + data: "" + }; + } +} + +export function handleImageResponse(imageJson: any, liveStream: boolean): ResponseWithReferences { let rawResponse = ""; @@ -123,7 +165,7 @@ export function handleImageResponse(imageJson: any) { } else if (imageJson.intentType === "text-to-image-v3") { rawResponse = `![](data:image/webp;base64,${imageJson.image})`; } - if (inferredQuery) { + if (inferredQuery && !liveStream) { rawResponse += `\n\n**Inferred Query**:\n\n${inferredQuery}`; } } diff --git a/src/interface/web/app/components/chatHistory/chatHistory.tsx b/src/interface/web/app/components/chatHistory/chatHistory.tsx index a6fde5c9..2505dbf7 100644 --- a/src/interface/web/app/components/chatHistory/chatHistory.tsx +++ b/src/interface/web/app/components/chatHistory/chatHistory.tsx @@ -17,6 +17,7 @@ import { Lightbulb } from "@phosphor-icons/react"; import ProfileCard from '../profileCard/profileCard'; import { getIconFromIconName } from '@/app/common/iconUtils'; import { AgentData } from '@/app/agents/page'; +import React from 'react'; interface ChatResponse { status: string; @@ -120,7 +121,6 @@ export default function ChatHistory(props: ChatHistoryProps) { }, [props.conversationId]); useEffect(() => { - console.log(props.incomingMessages); if (props.incomingMessages) { const lastMessage = props.incomingMessages[props.incomingMessages.length - 1]; if (lastMessage && !lastMessage.completed) { @@ -195,7 +195,7 @@ export default function ChatHistory(props: ChatHistoryProps) { setFetchingData(false); } else { if (chatData.response.agent && chatData.response.conversation_id) { - const chatMetadata ={ + const chatMetadata = { chat: [], agent: chatData.response.agent, conversation_id: chatData.response.conversation_id, @@ -256,7 +256,7 @@ export default function ChatHistory(props: ChatHistoryProps) {
- {fetchingData && } + {fetchingData && }
{(data && data.chat) && data.chat.map((chatMessage, index) => ( { return ( - <> + + - + ) }) } diff --git a/src/interface/web/app/components/chatMessage/chatMessage.module.css b/src/interface/web/app/components/chatMessage/chatMessage.module.css index 592e30af..809acc2e 100644 --- a/src/interface/web/app/components/chatMessage/chatMessage.module.css +++ b/src/interface/web/app/components/chatMessage/chatMessage.module.css @@ -123,6 +123,11 @@ div.trainOfThought.primary p { color: inherit; } +div.trainOfThoughtElement { + display: grid; + grid-template-columns: auto 1fr; +} + @media screen and (max-width: 768px) { div.youfullHistory { max-width: 90%; diff --git a/src/interface/web/app/components/chatMessage/chatMessage.tsx b/src/interface/web/app/components/chatMessage/chatMessage.tsx index 6b05fdf9..a926afb9 100644 --- a/src/interface/web/app/components/chatMessage/chatMessage.tsx +++ b/src/interface/web/app/components/chatMessage/chatMessage.tsx @@ -10,7 +10,7 @@ import 'katex/dist/katex.min.css'; import { TeaserReferencesSection, constructAllReferences } from '../referencePanel/referencePanel'; -import { ThumbsUp, ThumbsDown, Copy, Brain, Cloud, Folder, Book, Aperture, SpeakerHigh, MagnifyingGlass, Pause } from '@phosphor-icons/react'; +import { ThumbsUp, ThumbsDown, Copy, Brain, Cloud, Folder, Book, Aperture, SpeakerHigh, MagnifyingGlass, Pause, Palette } from '@phosphor-icons/react'; import * as DomPurify from 'dompurify'; import { InlineLoading } from '../loading/loading'; @@ -180,10 +180,14 @@ function chooseIconFromHeader(header: string, iconColor: string) { return ; } - if (compareHeader.includes("summary") || compareHeader.includes("summarize")) { + if (compareHeader.includes("summary") || compareHeader.includes("summarize") || compareHeader.includes("enhanc")) { return ; } + if (compareHeader.includes("paint")) { + return ; + } + return ; } @@ -195,7 +199,7 @@ export function TrainOfThought(props: TrainOfThoughtProps) { const icon = chooseIconFromHeader(header, iconColor); let markdownRendered = DomPurify.sanitize(md.render(props.message)); return ( -
+
{icon}
diff --git a/src/interface/web/app/components/sidePanel/chatHistorySidePanel.tsx b/src/interface/web/app/components/sidePanel/chatHistorySidePanel.tsx index cd0f8bea..b9209b49 100644 --- a/src/interface/web/app/components/sidePanel/chatHistorySidePanel.tsx +++ b/src/interface/web/app/components/sidePanel/chatHistorySidePanel.tsx @@ -320,7 +320,6 @@ function FilesMenu(props: FilesMenuProps) { } interface SessionsAndFilesProps { - webSocketConnected?: boolean; setEnabled: (enabled: boolean) => void; subsetOrganizedData: GroupedChatHistory | null; organizedData: GroupedChatHistory | null; @@ -591,12 +590,6 @@ function ChatSessionsModal({ data, showSidePanel }: ChatSessionsModalProps) { ); } -interface UserProfileProps { - userProfile: UserProfile; - webSocketConnected?: boolean; - collapsed: boolean; -} - const fetchChatHistory = async (url: string) => { const response = await fetch(url, { method: 'GET', @@ -618,7 +611,6 @@ export const useChatSessionsFetchRequest = (url: string) => { }; interface SidePanelProps { - webSocketConnected?: boolean; conversationId: string | null; uploadedFiles: string[]; isMobileWidth: boolean; @@ -691,7 +683,6 @@ export default function SidePanel(props: SidePanelProps) {