River
the sane way to work with ai agent streams (type safety and stream resuming out of the box)
Install / Use
/learn @davis7dotsh/RiverREADME
river
doing ai agent streams the right way is really hard, river makes it easy...
- full stack type safety on stream chunks
- TRPC like client api to easily consume a stream
- TRPC like server api to easily create and write to a stream
- library agnostic, works with ai sdk, mastra, custom streams, or anything you want
- resumable/durable streams work out of the box with the redis provider
- tanstack start and sveltekit are currently supported, more coming soon...
table of contents
sveltekit support
<script lang="ts">
import { myRiverClient } from '$lib/river/client';
const { start, resume } = myRiverClient.aRiverStream({
onChunk: (chunk) => {
// fully type safe!
console.log('Chunk received', chunk);
},
onStart: () => {
console.log('Starting stream');
},
onSuccess: (data) => {
console.log('Finished first stream', data.totalChunks, data.totalTimeMs);
},
onFatalError: (error) => {
console.error(error);
},
onInfo: ({ encodedResumptionToken }) => {
// you can resume the stream with this token!
console.log('Resume with:', encodedResumptionToken);
}
});
</script>
tanstack start support
import { myRiverClient } from '@/lib/river/client';
const DemoComponent = () => {
const { start, resume } = myRiverClient.aRiverStream({
onChunk: (chunk) => {
// fully type safe!
console.log('Chunk received', chunk);
},
onStart: () => {
console.log('Starting stream');
},
onSuccess: (data) => {
console.log('Finished first stream', data.totalChunks, data.totalTimeMs);
},
onFatalError: (error) => {
console.error(error);
},
onInfo: ({ encodedResumptionToken }) => {
// you can resume the stream with this token!
console.log('Resume with:', encodedResumptionToken);
}
})
return (...)
}
FULL DOCUMENTATION IS COMING SOON
for now just use the below getting started guides and check out the examples for more complex usage...
examples
you can find examples for how to use river in "real world" projects here:
each one includes guides on running it locally and how to deploy it
tanstack start getting started
this is a basic example of a custom stream that will count the number of vowels, consonants, and special characters in a message (it's contrived I know, but hear me out...):
- init a tanstack start project
bun create @tanstack/start@latest
then delete basically everything in the routes directory other than the index.tsx file and __root.tsx file
- install the dependencies
river stuff
bun add @davis7dotsh/river-core@latest @davis7dotsh/river-adapter-tanstack@latest
peer deps
bun add zod neverthrow
start the dev server
bun dev
- create a river stream
// src/lib/river/streams.ts
import type { TanStackStartAdapterRequest } from '@davis7dotsh/river-adapter-tanstack';
import { createRiverStream, defaultRiverProvider } from '@davis7dotsh/river-core';
type ClassifyChunkType = {
character: string;
type: 'vowel' | 'consonant' | 'special';
};
export const streamClassifyCharacters = createRiverStream<
ClassifyChunkType,
TanStackStartAdapterRequest
>()
.input(z.object({ message: z.string() }))
.provider(defaultRiverProvider())
.runner(async ({ input, stream, abortSignal }) => {
const { message } = input;
const { appendChunk, close } = stream;
const characters = message.split('');
for (const character of characters) {
const type = character.match(/[aeiou]/i)
? 'vowel'
: character.match(/[bcdfghjklmnpqrstvwxyz]/i)
? 'consonant'
: 'special';
await appendChunk({ character, type });
await new Promise((resolve) => setTimeout(resolve, 15));
}
await close();
});
- create a river router
// src/lib/river/router.ts
import { createRiverRouter } from '@davis7dotsh/river-core';
import { streamClassifyCharacters } from './streams';
export const myRiverRouter = createRiverRouter({
classifyCharacters: streamClassifyCharacters
});
export type MyRiverRouter = typeof myRiverRouter;
- create the endpoint handler
// src/routes/api/river/index.ts
import { createFileRoute } from '@tanstack/react-router';
import { riverEndpointHandler } from '@davis7dotsh/river-adapter-tanstack';
import { myRiverRouter } from '@/lib/river/router';
const { GET, POST } = riverEndpointHandler(myRiverRouter);
export const Route = createFileRoute('/api/river/')({
server: {
handlers: {
GET,
POST
}
}
});
- create the client caller
// src/lib/river/client.ts
import { createRiverClient } from '@davis7dotsh/river-adapter-tanstack';
import { MyRiverRouter } from './router';
export const myRiverClient = createRiverClient<MyRiverRouter>('/api/river');
- use your new stream in a component
// src/routes/index.tsx
import { myRiverClient } from '@/lib/river/client';
import { createFileRoute, Link } from '@tanstack/react-router';
import { useState } from 'react';
export const Route = createFileRoute('/basic/')({
component: RouteComponent
});
function RouteComponent() {
return (
<div className="min-h-screen flex flex-col p-6">
<BasicDemo />
</div>
);
}
const BasicDemo = () => {
const [message, setMessage] = useState('Why is TypeScript a better language than Go?');
const trimmedMessage = message.trim();
const [vowelCount, setVowelCount] = useState(0);
const [consonantCount, setConsonantCount] = useState(0);
const [specialCount, setSpecialCount] = useState(0);
const [status, setStatus] = useState<'idle' | 'running' | 'success' | 'error'>('idle');
const { start } = myRiverClient.classifyCharacters.useStream({
onStart: () => {
setStatus('running');
setVowelCount(0);
setConsonantCount(0);
setSpecialCount(0);
},
onChunk: (chunk) => {
switch (chunk.type) {
case 'vowel':
setVowelCount((prev) => prev + 1);
break;
case 'consonant':
setConsonantCount((prev) => prev + 1);
break;
case 'special':
setSpecialCount((prev) => prev + 1);
break;
}
},
onError: (error) => {
console.warn(error);
},
onFatalError: (error) => {
setStatus('error');
console.error(error);
},
onSuccess: () => {
setStatus('success');
}
});
const handleSubmit = (e: React.FormEvent<HTMLFormElement>) => {
e.preventDefault();
start({ message: trimmedMessage });
};
const handleClear = () => {
setMessage('');
setVowelCount(0);
setConsonantCount(0);
setSpecialCount(0);
setStatus('idle');
};
const hasResults = vowelCount > 0 || consonantCount > 0 || specialCount > 0;
const hasContent = trimmedMessage.length > 0;
const showClear = hasResults || hasContent;
return (
<>
<div className="max-w-2xl mx-auto w-full space-y-6 p-6 pt-20">
<h1 className="text-3xl font-bold text-white mb-6">Character Classifier</h1>
<div className="grid grid-cols-3 gap-4 mb-6">
<div className="bg-neutral-800 rounded-lg p-4">
<div className="text-sm font-medium text-neutral-400 mb-1">Vowels</div>
<div className="text-3xl font-bold text-white">{vowelCount}</div>
</div>
<div className="bg-neutral-800 rounded-lg p-4">
<div className="text-sm font-medium text-neutral-400 mb-1">Consonants</div>
<div className="text-3xl font-bold text-white">{consonantCount}</div>
</div>
<div className="bg-neutral-800 rounded-lg p-4">
<div className="text-sm font-medium text-neutral-400 mb-1">Special</div>
<div className="text-3xl font-bold text-white">{specialCount}</div>
</div>
</div>
<form onSubmit={handleSubmit} className="space-y-4">
<div>
<label htmlFor="message" className="block text-sm font-medium text-neutral-300 mb-2">
Message
</label>
<textarea
id="message"
value={message}
onChange={(e) => setMessage(e.target.value)}
disabled={status === 'running'}
rows={4}
className="w-full px-4 py-3 bg-neutral-800 text-white border border-neutral-700 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500 focus:border-transparent disabled:opacity-50 disabled:cursor-not-allowed resize-none"
placeholder="Enter your message here..."
/>
</div>
<div className="flex gap-3">
<button
type="submit"
disabled={status === 'running' || !trimmedMessage}
className="flex-1 bg-blue-600 text-white px-4 py-2 rounded-lg font-medium hover:bg-blue-700 focus:outline-none focus:ring-2 focus:ring-blue-500 disabled:opacity-50 disabled:cursor-not-allowed transition-colors"
>
{status === 'running' ? 'Processing...' : 'Submit'}
</button>
{showClear && status !== 'running' && (
<button
type="button"
onClick={handleClear}
className="px-4 py-2 bg-neutral-700 text-white rounded-lg font-medium hover:bg-neutral-600 focus:outline-none focus:ring-2 focus:ring-neutral-500 transition-colors"
>
Clear
</button>
)}
</div>
{status === 'error' && (
<div className="mt-4 p-3 bg-red-900/30 border border-red-700 rounded-lg text-sm text-red-300">
An error occurred while processing. Please try again.
</div>
)}
</form>
</div>
</>
);
};
sveltekit getting started
this is a basic example of a custom stream that will count the number of vowels, consonants, and special characters in a message (it's contrived I know, but hear me out...):
- init a new sveltekit project
bunx sv create
make sure you pick minimal and typescript
(select tailwindcss, prettier, and then anything else you want)
- install the depe
Related Skills
bluebubbles
349.2kUse when you need to send or manage iMessages via BlueBubbles (recommended iMessage integration). Calls go through the generic message tool with channel="bluebubbles".
node-connect
349.2kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
slack
349.2kUse when you need to control Slack from OpenClaw via the slack tool, including reacting to messages or pinning/unpinning items in Slack channels or DMs.
frontend-design
109.5kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
