Getting Started with Streamr

Getting Started with Streamr

Streamr is a decentralized, real-time data network that operates on a peer-to-peer (P2P) computer network where each node connects with other nodes without a central server. Streamr uses a publish-subscribe system where one peer writes data to the network, and others read the data. This continuous flow of data allows for efficient and effective real-time data transport and communication.

This tutorial will walk you through the Streamr network and how to get started. We will create a simple Next.js application that will consume data from a public stream and create and write to one. We will plot and visualize this data using a line chart and become acquainted with the Streamr network.

D_D Newsletter CTA

The Functionalities

Streamr's network consists of both off-chain and on-chain segments. The network serves as a peer-to-peer data channel for off-chain publishing and subscribing (pub/sub). It uses the blockchain for access control, utilizing Ethereum-based cryptography and identity. The on-chain smart contract registry is deployed on the Polygon Network. So you will need Matic (the native currency of the Polygon chain) when creating a stream.

Streamr Nodes

There are two types of nodes; light nodes and broker nodes.

The light node is imported into our application as a library and runs locally as part of our application. Using the light node our application becomes part of the network.

The broker node runs separately, and our application connects to it remotely using supported protocols like HTTP, WebSockets, and MQTT.

In this tutorial, we will run a light node that we will install into the application.

Tech Stack

  • NextJS
  • Javascript
  • Streamr Light Node (streamr-client)
  • chart.js
  • react-chartjs-2
  • streamr-client-react
  • Ethereum wallet account
  • CSS

Requirements

You should install Node.js, NPM, or Yarn on your machine to follow along in this tutorial. If you don't have Node.js, you can install it from the official website.

This tutorial assumes you are familiar with React and JavaScript and have an Ethereum wallet account on MetaMask or any wallet provider.

Strap up, and let's proceed to the next section and see all this in action!

Code Setup

Navigate to your project directory and install Nextjs; NextJS is the frontend framework of the application that we are building.

npx create-next-app@latest

Install the Streamr light node client. This installs the light node into our application, making it part of the Streamr network.

npm install streamr-client

Install Streamr React Client. This package enables the use of React hooks. We will use the useClient and useSubscribe hooks of this package in the application.

npm install streamr-client-react

Install chart.js and its react implementation package react-chartjs-2.

npm install chart.js react-chartjs-2

After installing all the packages your dependencies in the package.json file should look like this:

 "dependencies": {
    "chart.js": "^4.2.1",
    "next": "13.2.4",
    "react-chartjs-2": "^5.2.0",
    "react-dom": "18.2.0",
    "streamr-client": "^8.1.0",
    "streamr-client-react": "^3.0.0-hkt.1"
  }

Functional Steps

This section describes the functional steps taken in building the application. The tutorial will show how to read or consume and create a stream.

Consuming a stream

Open the Next.js application we installed a few steps back. Go to the _app.js file and replace the content inside with the following;

import "@/styles/globals.css";
import Provider from "streamr-client-react";

export default function App({ Component, pageProps }) {
  let options = {}
  if (typeof window !== "undefined") {
    options = {
      auth: { ethereum: window.ethereum },
    };
  }

  return (
    <Provider {...options}>
      <Component {...pageProps} />
    </Provider>
  );
}

At the top of the file, we imported the Provider component from the installed package streamr-client-react. The Provider is a wrapper for our entire app, making some valuable hooks like useClient and useSubscribe available.

Inside the function, we created an options variable and checked if we could access the window object (Client environment). We defined an auth key as { ethereum: window.ethereum }. The Streamr network uses a standard Ethereum address to authenticate if a user can read or write to a stream.

Open index.js and replace the content with the following;

import Head from "next/head";
import { Inter } from "next/font/google";
import styles from "@/styles/Home.module.css";
import { useSubscribe } from "streamr-client-react";
import { useEffect, useState } from "react";
import { Chart } from "react-chartjs-2";
import "chart.js/auto";
import LoadingSpinner from "@/components/loading";

const inter = Inter({ subsets: ["latin"] });

const streamId =
  "0x7d275b79eaed6b00eb1fe7e1174c1c6f2e711283/ethereum/gas-price";

export const options = {
  responsive: true,
  animation: false,
  maintainAspectRatio: false,
  plugins: {
    legend: {
      position: "top",
    },
    title: {
      display: true,
      text: "Ethereum BaseFee",
    },
  },
};

export default function Home() {
  const [chartData, setChartData] = useState({});
  const [xaxisData, setXAxisData] = useState([]);
  const [yaxisData, setYAxisData] = useState([]);
  const [yaxisData2, setYAxisData2] = useState([]);
  const [yaxisData3, setYAxisData3] = useState([]);
  const [yaxisData4, setYAxisData4] = useState([]);
  const [loading, setLoading] = useState(true);

  useSubscribe(streamId, {
    onMessage: (msg) => {
      setLoading(false);
      console.log(
        new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
        msg.getContent()
      );
      const {
        baseFee,
        ethPrice,
        eco: { feeCap },
        fast: { feeCap: fastfee },
        instant: { feeCap: instantfee },
      } = msg.getContent();
      setYAxisData((prev) => {
        return [...prev, baseFee];
      });

      setYAxisData2((prev) => {
        return [...prev, feeCap];
      });

      setYAxisData3((prev) => {
        return [...prev, fastfee];
      });

      setYAxisData4((prev) => {
        return [...prev, instantfee];
      });

      setXAxisData((prev) => {
        return [
          ...prev,
          new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
        ];
      });
    },
  });

  useEffect(() => {
    setChartData({
      labels: xaxisData,
      datasets: [
        {
          label: "Base Fee",
          data: yaxisData, //basefee
          borderColor: "rgb(53, 162, 203)",
          backgroundColor: "rgba(53, 162, 235, 0.5)",
        },

        {
          label: "Econony Fee", //Economy fee
          data: yaxisData2,
          borderColor: "rgb(247, 70, 74)",
          backgroundColor: "rgba(247, 70, 74, 0.5)",
        },

        {
          label: "Fast Fee", //Fast fee
          data: yaxisData3,
          borderColor: "rgb(75, 192, 192)",
          backgroundColor: "rgba(75, 192, 192, 0.5)",
        },

        {
          label: "Instant Fee", //fast fee
          data: yaxisData4,
          borderColor: "rgb(255, 159, 64)",
          backgroundColor: "rgba(255, 159, 64, 0.5)",
        },
      ],
    });
  }, [xaxisData, yaxisData, yaxisData2, yaxisData3, yaxisData4]);

  return (
    <>
      <Head>
        <title>Getting started with Streamr</title>
        <meta name="description" content="Generated by create next app" />
        <meta name="viewport" content="width=device-width, initial-scale=1" />
        <link rel="icon" href="/favicon.ico" />
      </Head>
      <main className={styles.main}>
        <h1>Getting Started With Streamr</h1>

        <div className={styles.center}>{loading && <LoadingSpinner />}</div>

        <div className={styles.grid}>
          {xaxisData &&
            yaxisData &&
            xaxisData.length > 0 &&
            yaxisData.length > 0 && (
              <div style={{ width: "1200px", height: "500px" }}>
                <Chart type="line" data={chartData} />
              </div>
            )}
        </div>
      </main>
    </>
  );
}

At the top of the file, we imported useState and useEffect. The streamId of the stream we are consuming is defined in a const variable called streamId

const streamId = "0x7d275b79eaed6b00eb1fe7e1174c1c6f2e711283/ethereum/gas-price"

The Streamr network uses the streamId to identify the particular stream we are interested in. We also imported the chart.js library we will use to display real-time data. This specific stream published data every 5 seconds.

We also imported;

import { useSubscribe } from "streamr-client-react";

We can use useSubscribe because we have set up our Provider component on _app.js. Take a look at the useSubscribe hook.

 useSubscribe(streamId, {
    onMessage: (msg) => {
      setLoading(false);
      console.log(
        new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
        msg.getContent()
      );
      const {
        baseFee,
        ethPrice,
        eco: { feeCap },
        fast: { feeCap: fastfee },
        instant: { feeCap: instantfee },
      } = msg.getContent();
      setYAxisData((prev) => {
        return [...prev, baseFee];
      });

      setYAxisData2((prev) => {
        return [...prev, feeCap];
      });

      setYAxisData3((prev) => {
        return [...prev, fastfee];
      });

      setYAxisData4((prev) => {
        return [...prev, instantfee];
      });

      setXAxisData((prev) => {
        return [
          ...prev,
          new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
        ];
      });
    },
  });

We pass the streamId to the function; onMessage event callback, we destructed the incoming message using msg.getContent(). The values of baseFee, feeCap etc., are saved to React state.

The useEffect function generates the data points for the chart. useEffect executes our function when any dependencies in the dependency array change. This continuously redraws our chart.

To start the application type on the project terminal:

npm run dev

We could use this stream to monitor Ethereum prices and act if the price gets to a particular value.

Creating a Stream

In this section, we will create a stream from scratch and write data to the stream. We need some Matic in our wallet to update Streamr's stream registry to create a stream.

Creating a stream will be done on the server side of the application. An HTTP call from the client will trigger it.

Let's get started on creating the server-side method to handle the creation of a stream.

Open src/pages/api and create a create-stream.js file.

const StreamrClient = require("streamr-client");

export default async function handler(req, res) {
    if (req.method === "POST") {
      const { stream } = await createStreamrClient();
      return res.status(200).json({ success: true , streamId: stream.id});
    } else {
      return res
        .status(405)
        .json({ message: "Method not allowed", success: false });
    }
  }

const createStreamrClient = async () => {
  // Initialize the client with an Ethereum account
  const streamr = new StreamrClient({
    auth: {
      privateKey: process.env.Private_Key,
    },
  });
  // Requires MATIC tokens (Polygon blockchain gas token)
  const stream = await streamr.createStream({
    id: "/testing/my-created-streamr", //topic must be unique
  });

  console.log("stream ID ", stream.id);

  return { streamr, stream };
};

At the top of the file, we required streamr-client, which we will use to create a new StreamrClient.

Remember, in the client environment, we used window.ethereum for authentication, but on the backend, we will use the private key of an Ethereum address that holds Matic. We store the private key as an environment variable called Private_Key.

Note: we can set permission on who can access or write to our stream. See the documentation here(docs.streamr.network/usage/streams/permissi..)

After creating the streamr client, we use that streamr instance to create a stream.

  // Requires MATIC tokens (Polygon blockchain gas token)
  const stream = await streamr.createStream({
    id: "/testing/my-created-streamr", //topic must be unique
  });

The id must be unique, or the transaction will fail. The id is prefixed with the wallet address.

We create a handler for an HTTP call that returns the streamId to the client.

export default async function handler(req, res) {
 //rest of the handler code here
}

Creating a Page to Publish Stream Data

Open src/pages, create a new pub-data.js file, and put the following content inside:

import React, { useState } from "react";
import styles from "@/styles/Home.module.css";
import LoadingSpinner from "@/components/loading";
import { useClient } from "streamr-client-react";

export default function PubData() {
  const [data, setData] = useState("");
  const [loading, setLoading] = useState(false);
  const [streamId, setStreamId] = useState(null);

  const client = useClient();

  const publishDatatoStream = async () => {
    try {
      setLoading(true);
      if (streamId && data) {
        await client.publish(streamId, data);
      }
      setLoading(false);
      alert("Message published succesfully");
    } catch (error) {
      setLoading(false);
      alert(
        `Oops! Something went wrong. Please refresh and try again. Error ${error}`
      );
    }
  };

  const createStream = async () => {
    try {
      setLoading(true);
      const response = await fetch("/api/create-stream", {
        method: "POST",
        body: data,
      });
      if (response.status !== 200) {
        alert("Oops! Something went wrong. Please refresh and try again.");
        setLoading(false);
      } else {
        alert("stream created");
        let responseJSON = await response.json();
        const streamId = responseJSON.streamId;
        console.log("streamID ", streamId);
        setStreamId(streamId);
        setLoading(false);
      }
      // check response, if success is false, dont take them to success page
    } catch (error) {
      setLoading(false);
      alert(
        `Oops! Something went wrong. Please refresh and try again. Error ${error}`
      );
    }
  };

  return (
    <div className={styles.main}>
      {loading && <LoadingSpinner />}

      <h1>Click To Create Stream </h1>

      <button
        onClick={createStream}
        disabled={loading}
        className={styles.button}
      >
        Create Stream
      </button>
      <div>
        <p>Write Message to Publish</p>
        <input
          type="text"
          onChange={(e) => setData(e.target.value)}
          value={data}
          className={styles.input}
        />
      </div>

      <button
        onClick={publishDatatoStream}
        disabled={loading}
        className={styles.button}
      >
        Publish Message
      </button>
    </div>
  );
}

At the top of the file, we destructed useClient from streamr-client-react. The useClient hook gives us an instance of the client passed to the Provider. Only the wallet that creates a stream can write to it by default.

In the server code, we created a stream by providing the wallet private key. On the client, we must sign a transaction before writing to that created stream.

To call the server-side API, we define a function called creatStream. This function makes a fetch call to the /api/create-stream API, creating the stream. The function returns the streamId to the client if all goes well.

Calling the createStream function creates a stream using the Matic in the wallet used for authentication. Ensure that your wallet contains Matic, or the function call will fail.

After creating the stream, we can publish data to it. The publishDatatoStream function posts a message to the stream. The function retrieves the stream instance from useClient and passes the streamId and the data we want to publish to the stream.

const publishDatatoStream = async () => {    
try { //-> code removed for brevity      
if (streamId && data) {
    await client.publish(streamId, data);     
    //-> code removed for brevity
   } catch (error) {
    setLoading(false);
     alert(`Oops! Something went wrong. Please refresh and try again. Error   ${error}`);    
  } 
};

Before the data is published, the if-block checks for a streamId and data.

Remember, the streamId is saved to state when we create the stream. The data variable comes from the input textbox, and we save it to React state.

Write some text on the textbox and click the "Publish Message" button to call the publishDatatoStream function.

You must sign the transaction with the wallet you used to create the stream.

D_D Newsletter CTA

What's Next

In this tutorial, we have seen how to create and read streams using the Streamr network. We could extend this tutorial only to allow some selected users/wallets to write or read our stream.

Additional Resources

The code for this tutorial can be found here

Streamr documentation

React Chart.js

Thank you for reading.