WorkflowChatTransport
The
@workflow/ai package is currently in active development and should be considered experimental.A chat transport implementation for the AI SDK that provides reliable message streaming with automatic reconnection to interrupted streams. This transport is a drop-in replacement for the default AI SDK transport, enabling seamless recovery from network issues, page refreshes, or Vercel Function timeouts.
WorkflowChatTransport implements the ChatTransport interface from the AI SDK and is designed to work with workflow-based chat applications. It requires endpoints that return the x-workflow-run-id header to enable stream resumption.
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
export default function Chat() {
const { messages, sendMessage } = useChat({
transport: new WorkflowChatTransport(),
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>{m.content}</div>
))}
</div>
);
}API Signature
Class
| Name | Type | Description |
|---|---|---|
api | any | |
fetch | any | |
onChatSendMessage | any | |
onChatEnd | any | |
maxConsecutiveErrors | any | |
prepareSendMessagesRequest | any | |
prepareReconnectToStreamRequest | any | |
sendMessages | (options: SendMessagesOptions<UI_MESSAGE> & ChatRequestOptions) => Promise<ReadableStream<UIMessageChunk>> | Sends messages to the chat endpoint and returns a stream of response chunks. This method handles the entire chat lifecycle including: - Sending messages to the /api/chat endpoint - Streaming response chunks - Automatic reconnection if the stream is interrupted |
sendMessagesIterator | any | |
reconnectToStream | (options: ReconnectToStreamOptions & ChatRequestOptions) => Promise<ReadableStream<UIMessageChunk> | null> | Reconnects to an existing chat stream that was previously interrupted. This method is useful for resuming a chat session after network issues, page refreshes, or Vercel Function timeouts. |
reconnectToStreamIterator | any | |
onFinish | any |
WorkflowChatTransportOptions
| Name | Type | Description |
|---|---|---|
api | string | API endpoint for chat requests Defaults to /api/chat if not provided |
fetch | { (input: RequestInfo | URL, init?: RequestInit | undefined): Promise<Response>; (input: string | Request | URL, init?: RequestInit | undefined): Promise<...>; } | Custom fetch implementation to use for HTTP requests. Defaults to the global fetch function if not provided. |
onChatSendMessage | OnChatSendMessage<UI_MESSAGE> | Callback invoked after successfully sending messages to the chat endpoint. Useful for tracking chat history and inspecting response headers. |
onChatEnd | OnChatEnd | Callback invoked when a chat stream ends (receives a "finish" chunk). Useful for cleanup operations or state updates. |
maxConsecutiveErrors | number | Maximum number of consecutive errors allowed during reconnection attempts. Defaults to 3 if not provided. |
prepareSendMessagesRequest | PrepareSendMessagesRequest<UI_MESSAGE> | Function to prepare the request for sending messages. Allows customizing the API endpoint, headers, credentials, and body. |
prepareReconnectToStreamRequest | PrepareReconnectToStreamRequest | Function to prepare the request for reconnecting to a stream. Allows customizing the API endpoint, headers, and credentials. |
Key Features
- Automatic Reconnection: Automatically recovers from interrupted streams with configurable retry limits
- Workflow Integration: Seamlessly works with workflow-based endpoints that provide the
x-workflow-run-idheader - Customizable Requests: Allows intercepting and modifying requests via
prepareSendMessagesRequestandprepareReconnectToStreamRequest - Stream Callbacks: Provides hooks for tracking chat lifecycle via
onChatSendMessageandonChatEnd - Custom Fetch: Supports custom fetch implementations for advanced use cases
Good to Know
- The transport expects chat endpoints to return the
x-workflow-run-idheader in the response to enable stream resumption - By default, the transport posts to
/api/chatand reconnects via/api/chat/{runId}/stream - The
onChatSendMessagecallback receives the full response object, allowing you to extract and store the workflow run ID for session resumption - Stream interruptions are automatically detected when a "finish" chunk is not received in the initial response
- The
maxConsecutiveErrorsoption controls how many reconnection attempts are made before giving up (default: 3)
Examples
Basic Chat Setup
'use client';
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
import { useState } from 'react';
export default function BasicChat() {
const [input, setInput] = useState('');
const { messages, sendMessage } = useChat({
transport: new WorkflowChatTransport(),
});
return (
<div>
<div className="space-y-4">
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
</div>
<form
onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput('');
}}
>
<input
value={input}
placeholder="Say something..."
onChange={(e) => setInput(e.currentTarget.value)}
/>
</form>
</div>
);
}With Session Persistence and Resumption
'use client';
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
import { useMemo, useState } from 'react';
export default function ChatWithResumption() {
const [input, setInput] = useState('');
const activeWorkflowRunId = useMemo(() => {
if (typeof window === 'undefined') return;
return localStorage.getItem('active-workflow-run-id') ?? undefined;
}, []);
const { messages, sendMessage } = useChat({
resume: !!activeWorkflowRunId,
transport: new WorkflowChatTransport({
onChatSendMessage: (response, options) => {
// Save chat history to localStorage
localStorage.setItem(
'chat-history',
JSON.stringify(options.messages)
);
// Extract and store the workflow run ID for session resumption
const workflowRunId = response.headers.get('x-workflow-run-id');
if (workflowRunId) {
localStorage.setItem('active-workflow-run-id', workflowRunId);
}
},
onChatEnd: ({ chatId, chunkIndex }) => {
console.log(`Chat ${chatId} completed with ${chunkIndex} chunks`);
// Clear the active run ID when chat completes
localStorage.removeItem('active-workflow-run-id');
},
}),
});
return (
<div>
<div className="space-y-4">
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
</div>
<form
onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput('');
}}
>
<input
value={input}
placeholder="Say something..."
onChange={(e) => setInput(e.currentTarget.value)}
/>
</form>
</div>
);
}With Custom Request Configuration
'use client';
import { useChat } from '@ai-sdk/react';
import { WorkflowChatTransport } from '@workflow/ai';
import { useState } from 'react';
export default function ChatWithCustomConfig() {
const [input, setInput] = useState('');
const { messages, sendMessage } = useChat({
transport: new WorkflowChatTransport({
prepareSendMessagesRequest: async (config) => {
return {
...config,
api: '/api/chat',
headers: {
...config.headers,
'Authorization': `Bearer ${process.env.NEXT_PUBLIC_API_TOKEN}`,
'X-Custom-Header': 'custom-value',
},
credentials: 'include',
};
},
prepareReconnectToStreamRequest: async (config) => {
return {
...config,
headers: {
...config.headers,
'Authorization': `Bearer ${process.env.NEXT_PUBLIC_API_TOKEN}`,
},
credentials: 'include',
};
},
maxConsecutiveErrors: 5,
}),
});
return (
<div>
<div className="space-y-4">
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
</div>
<form
onSubmit={(e) => {
e.preventDefault();
sendMessage({ text: input });
setInput('');
}}
>
<input
value={input}
placeholder="Say something..."
onChange={(e) => setInput(e.currentTarget.value)}
/>
</form>
</div>
);
}See Also
- DurableAgent - Building durable AI agents within workflows
- AI SDK
useChatDocumentation - UsinguseChatwith custom transports - Workflows and Steps - Understanding workflow fundamentals
- "flight-booking-app" Example - An example application which uses
WorkflowChatTransport