Ingest new format for server sent events within the HTTP streamed response

- Note that the SSR for next doesn't support rendering on the client-side, so it'll only update it one big chunk
- Fix unique key error in the chatmessage history for incoming messages
- Remove websocket value usage in the chat history side panel
- Remove other websocket code from the chat page
This commit is contained in:
sabaimran 2024-08-01 12:50:43 +05:30
parent a6339bb973
commit cd85a51980
6 changed files with 173 additions and 131 deletions

View file

@ -9,12 +9,12 @@ import NavMenu from '../components/navMenu/navMenu';
import { useSearchParams } from 'next/navigation'
import Loading from '../components/loading/loading';
import { handleCompiledReferences, handleImageResponse, setupWebSocket } from '../common/chatFunctions';
import { convertMessageChunkToJson, handleImageResponse, RawReferenceData } from '../common/chatFunctions';
import 'katex/dist/katex.min.css';
import { StreamMessage } from '../components/chatMessage/chatMessage';
import { welcomeConsole } from '../common/utils';
import { useIPLocationData, welcomeConsole } from '../common/utils';
import ChatInputArea, { ChatOptions } from '../components/chatInputArea/chatInputArea';
import { useAuthenticatedData } from '../common/auth';
import { AgentData } from '../agents/page';
@ -97,83 +97,22 @@ function ChatBodyData(props: ChatBodyDataProps) {
</>
);
}
export default function Chat() {
const [chatOptionsData, setChatOptionsData] = useState<ChatOptions | null>(null);
const [isLoading, setLoading] = useState(true);
const [title, setTitle] = useState('Khoj AI - Chat');
const [conversationId, setConversationID] = useState<string | null>(null);
const [chatWS, setChatWS] = useState<WebSocket | null>(null);
const [messages, setMessages] = useState<StreamMessage[]>([]);
const [queryToProcess, setQueryToProcess] = useState<string>('');
const [processQuerySignal, setProcessQuerySignal] = useState(false);
const [uploadedFiles, setUploadedFiles] = useState<string[]>([]);
const [isMobileWidth, setIsMobileWidth] = useState(false);
const locationData = useIPLocationData();
const authenticatedData = useAuthenticatedData();
welcomeConsole();
const handleWebSocketMessage = (event: MessageEvent) => {
let chunk = event.data;
let currentMessage = messages.find(message => !message.completed);
if (!currentMessage) {
console.error("No current message found");
return;
}
// Process WebSocket streamed data
if (chunk === "start_llm_response") {
console.log("Started streaming", new Date());
} else if (chunk === "end_llm_response") {
currentMessage.completed = true;
} else {
// Get the current message
// Process and update state with the new message
if (chunk.includes("application/json")) {
chunk = JSON.parse(chunk);
}
const contentType = chunk["content-type"];
if (contentType === "application/json") {
try {
if (chunk.image || chunk.detail) {
let responseWithReference = handleImageResponse(chunk);
console.log("Image response", responseWithReference);
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else if (chunk.type == "status") {
currentMessage.trainOfThought.push(chunk.message);
} else if (chunk.type == "rate_limit") {
console.log("Rate limit message", chunk);
currentMessage.rawResponse = chunk.message;
} else {
console.log("any message", chunk);
}
} catch (error) {
console.error("Error processing message", error);
currentMessage.completed = true;
} finally {
// no-op
}
} else {
// Update the current message with the new chunk
if (chunk && chunk.includes("### compiled references:")) {
let responseWithReference = handleCompiledReferences(chunk, "");
currentMessage.rawResponse += responseWithReference.response;
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else {
// If the chunk is not a JSON object, just display it as is
currentMessage.rawResponse += chunk;
}
}
};
// Update the state with the new message, currentMessage
setMessages([...messages]);
}
useEffect(() => {
fetch('/api/chat/options')
.then(response => response.json())
@ -198,19 +137,7 @@ export default function Chat() {
}, []);
useEffect(() => {
if (chatWS) {
chatWS.onmessage = handleWebSocketMessage;
}
}, [chatWS, messages]);
//same as ChatBodyData for local storage message
useEffect(() => {
const storedMessage = localStorage.getItem("message");
setQueryToProcess(storedMessage || '');
}, []);
useEffect(() => {
if (chatWS && queryToProcess) {
if (queryToProcess) {
const newStreamMessage: StreamMessage = {
rawResponse: "",
trainOfThought: [],
@ -221,44 +148,118 @@ export default function Chat() {
rawQuery: queryToProcess || "",
};
setMessages(prevMessages => [...prevMessages, newStreamMessage]);
if (chatWS.readyState === WebSocket.OPEN) {
chatWS.send(queryToProcess);
setProcessQuerySignal(true);
}
else {
console.error("WebSocket is not open. ReadyState:", chatWS.readyState);
}
setQueryToProcess('');
setProcessQuerySignal(true);
}
}, [queryToProcess, chatWS]);
}, [queryToProcess]);
useEffect(() => {
if (processQuerySignal && chatWS && chatWS.readyState === WebSocket.OPEN) {
setProcessQuerySignal(false);
chatWS.onmessage = handleWebSocketMessage;
chatWS.send(queryToProcess);
localStorage.removeItem("message");
if (processQuerySignal) {
chat();
}
}, [processQuerySignal, chatWS]);
}, [processQuerySignal]);
useEffect(() => {
const setupWebSocketConnection = async () => {
if (conversationId && (!chatWS || chatWS.readyState === WebSocket.CLOSED)) {
if (queryToProcess) {
const newWS = await setupWebSocket(conversationId, queryToProcess);
localStorage.removeItem("message");
setChatWS(newWS);
}
else {
const newWS = await setupWebSocket(conversationId);
setChatWS(newWS);
async function readChatStream(response: Response) {
if (!response.ok) throw new Error(response.statusText);
if (!response.body) throw new Error("Response body is null");
const reader = response.body.getReader();
const decoder = new TextDecoder();
const eventDelimiter = '␃🔚␗';
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
setQueryToProcess('');
setProcessQuerySignal(false);
break;
}
const chunk = decoder.decode(value, { stream: true });
buffer += chunk;
let newEventIndex;
while ((newEventIndex = buffer.indexOf(eventDelimiter)) !== -1) {
const event = buffer.slice(0, newEventIndex);
buffer = buffer.slice(newEventIndex + eventDelimiter.length);
if (event) {
processMessageChunk(event);
}
}
};
setupWebSocketConnection();
}, [conversationId]);
}
}
async function chat() {
localStorage.removeItem("message");
let chatAPI = `/api/chat?q=${encodeURIComponent(queryToProcess)}&conversation_id=${conversationId}&stream=true&client=web`;
if (locationData) {
chatAPI += `&region=${locationData.region}&country=${locationData.country}&city=${locationData.city}&timezone=${locationData.timezone}`;
}
const response = await fetch(chatAPI);
try {
await readChatStream(response);
} catch (err) {
console.log(err);
}
}
function processMessageChunk(rawChunk: string) {
const chunk = convertMessageChunkToJson(rawChunk);
const currentMessage = messages.find(message => !message.completed);
if (!currentMessage) {
return;
}
if (!chunk || !chunk.type) {
return;
}
if (chunk.type === "status") {
const statusMessage = chunk.data as string;
currentMessage.trainOfThought.push(statusMessage);
} else if (chunk.type === "references") {
const references = chunk.data as RawReferenceData;
if (references.context) {
currentMessage.context = references.context;
}
if (references.onlineContext) {
currentMessage.onlineContext = references.onlineContext;
}
} else if (chunk.type === "message") {
const chunkData = chunk.data;
if (chunkData !== null && typeof chunkData === 'object') {
try {
const jsonData = chunkData as any;
if (jsonData.image || jsonData.detail) {
let responseWithReference = handleImageResponse(chunk.data, true);
if (responseWithReference.response) currentMessage.rawResponse = responseWithReference.response;
if (responseWithReference.online) currentMessage.onlineContext = responseWithReference.online;
if (responseWithReference.context) currentMessage.context = responseWithReference.context;
} else if (jsonData.response) {
currentMessage.rawResponse = jsonData.response;
}
else {
console.log("any message", chunk);
}
} catch (e) {
currentMessage.rawResponse += chunkData;
}
} else {
currentMessage.rawResponse += chunkData;
}
} else if (chunk.type === "end_llm_response") {
currentMessage.completed = true;
}
setMessages([...messages]);
}
const handleConversationIdChange = (newConversationId: string) => {
setConversationID(newConversationId);
@ -276,7 +277,6 @@ export default function Chat() {
</title>
<div>
<SidePanel
webSocketConnected={chatWS !== null}
conversationId={conversationId}
uploadedFiles={uploadedFiles}
isMobileWidth={isMobileWidth}

View file

@ -1,6 +1,13 @@
import { Context, OnlineContextData } from "../components/chatMessage/chatMessage";
interface ResponseWithReferences {
export interface RawReferenceData {
context?: Context[];
onlineContext?: {
[key: string]: OnlineContextData
}
}
export interface ResponseWithReferences {
context?: Context[];
online?: {
[key: string]: OnlineContextData
@ -108,7 +115,42 @@ export const setupWebSocket = async (conversationId: string, initialMessage?: st
return chatWS;
};
export function handleImageResponse(imageJson: any) {
interface MessageChunk {
type: string;
data: string | object;
}
export function convertMessageChunkToJson(chunk: string): MessageChunk {
if (chunk.startsWith("{") && chunk.endsWith("}")) {
try {
const jsonChunk = JSON.parse(chunk);
if (!jsonChunk.type) {
return {
type: "message",
data: jsonChunk
};
}
return jsonChunk;
} catch (error) {
return {
type: "message",
data: chunk
};
}
} else if (chunk.length > 0) {
return {
type: "message",
data: chunk
};
} else {
return {
type: "message",
data: ""
};
}
}
export function handleImageResponse(imageJson: any, liveStream: boolean): ResponseWithReferences {
let rawResponse = "";
@ -123,7 +165,7 @@ export function handleImageResponse(imageJson: any) {
} else if (imageJson.intentType === "text-to-image-v3") {
rawResponse = `![](data:image/webp;base64,${imageJson.image})`;
}
if (inferredQuery) {
if (inferredQuery && !liveStream) {
rawResponse += `\n\n**Inferred Query**:\n\n${inferredQuery}`;
}
}

View file

@ -17,6 +17,7 @@ import { Lightbulb } from "@phosphor-icons/react";
import ProfileCard from '../profileCard/profileCard';
import { getIconFromIconName } from '@/app/common/iconUtils';
import { AgentData } from '@/app/agents/page';
import React from 'react';
interface ChatResponse {
status: string;
@ -120,7 +121,6 @@ export default function ChatHistory(props: ChatHistoryProps) {
}, [props.conversationId]);
useEffect(() => {
console.log(props.incomingMessages);
if (props.incomingMessages) {
const lastMessage = props.incomingMessages[props.incomingMessages.length - 1];
if (lastMessage && !lastMessage.completed) {
@ -195,7 +195,7 @@ export default function ChatHistory(props: ChatHistoryProps) {
setFetchingData(false);
} else {
if (chatData.response.agent && chatData.response.conversation_id) {
const chatMetadata ={
const chatMetadata = {
chat: [],
agent: chatData.response.agent,
conversation_id: chatData.response.conversation_id,
@ -256,7 +256,7 @@ export default function ChatHistory(props: ChatHistoryProps) {
<div ref={ref}>
<div className={styles.chatHistory} ref={chatHistoryRef}>
<div ref={sentinelRef} style={{ height: '1px' }}>
{fetchingData && <InlineLoading message="Loading Conversation" className='opacity-50'/>}
{fetchingData && <InlineLoading message="Loading Conversation" className='opacity-50' />}
</div>
{(data && data.chat) && data.chat.map((chatMessage, index) => (
<ChatMessage
@ -271,7 +271,8 @@ export default function ChatHistory(props: ChatHistoryProps) {
{
props.incomingMessages && props.incomingMessages.map((message, index) => {
return (
<>
<React.Fragment key={`incomingMessage${index}`}>
<ChatMessage
key={`${index}outgoing`}
isMobileWidth={isMobileWidth}
@ -314,7 +315,7 @@ export default function ChatHistory(props: ChatHistoryProps) {
borderLeftColor={`${data?.agent.color}-500`}
isLastMessage={true}
/>
</>
</React.Fragment>
)
})
}

View file

@ -123,6 +123,11 @@ div.trainOfThought.primary p {
color: inherit;
}
div.trainOfThoughtElement {
display: grid;
grid-template-columns: auto 1fr;
}
@media screen and (max-width: 768px) {
div.youfullHistory {
max-width: 90%;

View file

@ -10,7 +10,7 @@ import 'katex/dist/katex.min.css';
import { TeaserReferencesSection, constructAllReferences } from '../referencePanel/referencePanel';
import { ThumbsUp, ThumbsDown, Copy, Brain, Cloud, Folder, Book, Aperture, SpeakerHigh, MagnifyingGlass, Pause } from '@phosphor-icons/react';
import { ThumbsUp, ThumbsDown, Copy, Brain, Cloud, Folder, Book, Aperture, SpeakerHigh, MagnifyingGlass, Pause, Palette } from '@phosphor-icons/react';
import * as DomPurify from 'dompurify';
import { InlineLoading } from '../loading/loading';
@ -180,10 +180,14 @@ function chooseIconFromHeader(header: string, iconColor: string) {
return <MagnifyingGlass className={`${classNames}`} />;
}
if (compareHeader.includes("summary") || compareHeader.includes("summarize")) {
if (compareHeader.includes("summary") || compareHeader.includes("summarize") || compareHeader.includes("enhanc")) {
return <Aperture className={`${classNames}`} />;
}
if (compareHeader.includes("paint")) {
return <Palette className={`${classNames}`} />;
}
return <Brain className={`${classNames}`} />;
}
@ -195,7 +199,7 @@ export function TrainOfThought(props: TrainOfThoughtProps) {
const icon = chooseIconFromHeader(header, iconColor);
let markdownRendered = DomPurify.sanitize(md.render(props.message));
return (
<div className={`flex items-center ${props.primary ? 'text-gray-400' : 'text-gray-300'} ${styles.trainOfThought} ${props.primary ? styles.primary : ''}`} >
<div className={`${styles.trainOfThoughtElement} items-center ${props.primary ? 'text-gray-400' : 'text-gray-300'} ${styles.trainOfThought} ${props.primary ? styles.primary : ''}`} >
{icon}
<div dangerouslySetInnerHTML={{ __html: markdownRendered }} />
</div>

View file

@ -320,7 +320,6 @@ function FilesMenu(props: FilesMenuProps) {
}
interface SessionsAndFilesProps {
webSocketConnected?: boolean;
setEnabled: (enabled: boolean) => void;
subsetOrganizedData: GroupedChatHistory | null;
organizedData: GroupedChatHistory | null;
@ -591,12 +590,6 @@ function ChatSessionsModal({ data, showSidePanel }: ChatSessionsModalProps) {
);
}
interface UserProfileProps {
userProfile: UserProfile;
webSocketConnected?: boolean;
collapsed: boolean;
}
const fetchChatHistory = async (url: string) => {
const response = await fetch(url, {
method: 'GET',
@ -618,7 +611,6 @@ export const useChatSessionsFetchRequest = (url: string) => {
};
interface SidePanelProps {
webSocketConnected?: boolean;
conversationId: string | null;
uploadedFiles: string[];
isMobileWidth: boolean;
@ -691,7 +683,6 @@ export default function SidePanel(props: SidePanelProps) {
</DrawerHeader>
<div className={`${styles.panelWrapper}`}>
<SessionsAndFiles
webSocketConnected={props.webSocketConnected}
setEnabled={setEnabled}
subsetOrganizedData={subsetOrganizedData}
organizedData={organizedData}
@ -724,7 +715,6 @@ export default function SidePanel(props: SidePanelProps) {
authenticatedData && !props.isMobileWidth && enabled &&
<div className={`${styles.panelWrapper}`}>
<SessionsAndFiles
webSocketConnected={props.webSocketConnected}
setEnabled={setEnabled}
subsetOrganizedData={subsetOrganizedData}
organizedData={organizedData}