Market Data Stream Example

A two-app example demonstrating real-time data streaming between io.Connect applications using the Interop Streams API.

Architecture

  • Price Server — creates a stream and pushes price updates per instrument
  • Price Consumer — subscribes to the stream with a specific instrument branch

Price Server (Stream Publisher)

PriceServer.tsx
import { useContext, useEffect } from "react";
import { IOConnectContext } from "@interopio/react-hooks";

interface PriceUpdate {
  symbol: string;
  price: number;
  volume: number;
  timestamp: number;
}

function PriceServer() {
  const io = useContext(IOConnectContext);

  useEffect(() => {
    if (!io) return;

    let intervalId: ReturnType<typeof setInterval>;

    const init = async () => {
      const stream = await io.interop.createStream(
        { name: "MarketData.LivePrices" },
        {
          subscriptionRequestHandler: (request) => {
            const symbol = request.arguments?.symbol;
            if (symbol) {
              request.acceptOnBranch(symbol);
            } else {
              request.reject("Symbol argument required");
            }
          },
        }
      );

      const symbols = ["AAPL", "MSFT", "GOOGL", "TSLA"];
      const basePrices: Record<string, number> = {
        AAPL: 150, MSFT: 420, GOOGL: 175, TSLA: 240,
      };

      intervalId = setInterval(() => {
        symbols.forEach((symbol) => {
          const update: PriceUpdate = {
            symbol,
            price: basePrices[symbol] + (Math.random() - 0.5) * 5,
            volume: Math.floor(Math.random() * 10000),
            timestamp: Date.now(),
          };
          stream.push(update, symbol);
        });
      }, 1000);
    };
    init();

    return () => clearInterval(intervalId);
  }, [io]);

  return <p>Price stream active — broadcasting to subscribers.</p>;
}

Price Consumer (Stream Subscriber)

PriceConsumer.tsx
import { useContext, useEffect, useState } from "react";
import { IOConnectContext } from "@interopio/react-hooks";

interface PriceUpdate {
  symbol: string;
  price: number;
  volume: number;
  timestamp: number;
}

function PriceConsumer({ symbol = "AAPL" }: { symbol?: string }) {
  const io = useContext(IOConnectContext);
  const [price, setPrice] = useState<PriceUpdate | null>(null);

  useEffect(() => {
    if (!io) return;

    let subscription: { close: () => void } | undefined;

    const subscribe = async () => {
      subscription = await io.interop.subscribe("MarketData.LivePrices", {
        arguments: { symbol },
        onData: (streamData) => setPrice(streamData.data as PriceUpdate),
        onClosed: () => console.warn("Price stream closed"),
      });
    };
    subscribe();

    return () => subscription?.close();
  }, [io, symbol]);

  if (!price) return <p>Waiting for {symbol} data...</p>;

  return (
    <div>
      <h2>{price.symbol}</h2>
      <p style={{ fontSize: "2rem", fontWeight: 700 }}>
        ${price.price.toFixed(2)}
      </p>
      <p>Volume: {price.volume.toLocaleString()}</p>
      <p>Updated: {new Date(price.timestamp).toLocaleTimeString()}</p>
    </div>
  );
}
Stream BranchesThe server uses stream.push(data, branchKey) to send data to specific branches. Each subscriber receives only the data for its requested symbol.