Market Data Stream Example
A two-app example demonstrating real-time data streaming between io.Connect applications using the Interop Streams API.
Architecture
- Price Server — creates a stream and pushes price updates per instrument
- Price Consumer — subscribes to the stream with a specific instrument branch
Price Server (Stream Publisher)
PriceServer.tsx
import { useContext, useEffect } from "react";
import { IOConnectContext } from "@interopio/react-hooks";
interface PriceUpdate {
symbol: string;
price: number;
volume: number;
timestamp: number;
}
function PriceServer() {
const io = useContext(IOConnectContext);
useEffect(() => {
if (!io) return;
let intervalId: ReturnType<typeof setInterval>;
const init = async () => {
const stream = await io.interop.createStream(
{ name: "MarketData.LivePrices" },
{
subscriptionRequestHandler: (request) => {
const symbol = request.arguments?.symbol;
if (symbol) {
request.acceptOnBranch(symbol);
} else {
request.reject("Symbol argument required");
}
},
}
);
const symbols = ["AAPL", "MSFT", "GOOGL", "TSLA"];
const basePrices: Record<string, number> = {
AAPL: 150, MSFT: 420, GOOGL: 175, TSLA: 240,
};
intervalId = setInterval(() => {
symbols.forEach((symbol) => {
const update: PriceUpdate = {
symbol,
price: basePrices[symbol] + (Math.random() - 0.5) * 5,
volume: Math.floor(Math.random() * 10000),
timestamp: Date.now(),
};
stream.push(update, symbol);
});
}, 1000);
};
init();
return () => clearInterval(intervalId);
}, [io]);
return <p>Price stream active — broadcasting to subscribers.</p>;
}Price Consumer (Stream Subscriber)
PriceConsumer.tsx
import { useContext, useEffect, useState } from "react";
import { IOConnectContext } from "@interopio/react-hooks";
interface PriceUpdate {
symbol: string;
price: number;
volume: number;
timestamp: number;
}
function PriceConsumer({ symbol = "AAPL" }: { symbol?: string }) {
const io = useContext(IOConnectContext);
const [price, setPrice] = useState<PriceUpdate | null>(null);
useEffect(() => {
if (!io) return;
let subscription: { close: () => void } | undefined;
const subscribe = async () => {
subscription = await io.interop.subscribe("MarketData.LivePrices", {
arguments: { symbol },
onData: (streamData) => setPrice(streamData.data as PriceUpdate),
onClosed: () => console.warn("Price stream closed"),
});
};
subscribe();
return () => subscription?.close();
}, [io, symbol]);
if (!price) return <p>Waiting for {symbol} data...</p>;
return (
<div>
<h2>{price.symbol}</h2>
<p style={{ fontSize: "2rem", fontWeight: 700 }}>
${price.price.toFixed(2)}
</p>
<p>Volume: {price.volume.toLocaleString()}</p>
<p>Updated: {new Date(price.timestamp).toLocaleTimeString()}</p>
</div>
);
}Stream BranchesThe server uses
stream.push(data, branchKey) to send data to specific branches. Each subscriber receives only the data for its requested symbol.