Streamr is a decentralized, real-time data network that operates on a peer-to-peer (P2P) computer network where each node connects with other nodes without a central server. Streamr uses a publish-subscribe system where one peer writes data to the network, and others read the data. This continuous flow of data allows for efficient and effective real-time data transport and communication.
This tutorial will walk you through the Streamr network and how to get started. We will create a simple Next.js application that will consume data from a public stream and create and write to one. We will plot and visualize this data using a line chart and become acquainted with the Streamr network.
The Functionalities
Streamr's network consists of both off-chain and on-chain segments. The network serves as a peer-to-peer data channel for off-chain publishing and subscribing (pub/sub). It uses the blockchain for access control, utilizing Ethereum-based cryptography and identity. The on-chain smart contract registry is deployed on the Polygon Network. So you will need Matic (the native currency of the Polygon chain) when creating a stream.
Streamr Nodes
There are two types of nodes; light nodes and broker nodes.
The light node is imported into our application as a library and runs locally as part of our application. Using the light node our application becomes part of the network.
The broker node runs separately, and our application connects to it remotely using supported protocols like HTTP, WebSockets, and MQTT.
In this tutorial, we will run a light node that we will install into the application.
Tech Stack
- NextJS
- Javascript
- Streamr Light Node (streamr-client)
- chart.js
- react-chartjs-2
- streamr-client-react
- Ethereum wallet account
- CSS
Requirements
You should install Node.js, NPM, or Yarn on your machine to follow along in this tutorial. If you don't have Node.js, you can install it from the official website.
This tutorial assumes you are familiar with React and JavaScript and have an Ethereum wallet account on MetaMask or any wallet provider.
Strap up, and let's proceed to the next section and see all this in action!
Code Setup
Navigate to your project directory and install Nextjs; NextJS is the frontend framework of the application that we are building.
npx create-next-app@latest
Install the Streamr light node client. This installs the light node into our application, making it part of the Streamr network.
npm install streamr-client
Install Streamr React Client. This package enables the use of React hooks. We will use the useClient
and useSubscribe
hooks of this package in the application.
npm install streamr-client-react
Install chart.js
and its react implementation package react-chartjs-2
.
npm install chart.js react-chartjs-2
After installing all the packages your dependencies in the package.json
file should look like this:
"dependencies": {
"chart.js": "^4.2.1",
"next": "13.2.4",
"react-chartjs-2": "^5.2.0",
"react-dom": "18.2.0",
"streamr-client": "^8.1.0",
"streamr-client-react": "^3.0.0-hkt.1"
}
Functional Steps
This section describes the functional steps taken in building the application. The tutorial will show how to read or consume and create a stream.
Consuming a stream
Open the Next.js application we installed a few steps back. Go to the _app.js
file and replace the content inside with the following;
import "@/styles/globals.css";
import Provider from "streamr-client-react";
export default function App({ Component, pageProps }) {
let options = {}
if (typeof window !== "undefined") {
options = {
auth: { ethereum: window.ethereum },
};
}
return (
<Provider {...options}>
<Component {...pageProps} />
</Provider>
);
}
At the top of the file, we imported the Provider
component from the installed package streamr-client-react
. The Provider
is a wrapper for our entire app, making some valuable hooks like useClient
and useSubscribe
available.
Inside the function, we created an options
variable and checked if we could access the window
object (Client environment). We defined an auth
key as { ethereum: window.ethereum }
. The Streamr network uses a standard Ethereum address to authenticate if a user can read or write to a stream.
Open index.js
and replace the content with the following;
import Head from "next/head";
import { Inter } from "next/font/google";
import styles from "@/styles/Home.module.css";
import { useSubscribe } from "streamr-client-react";
import { useEffect, useState } from "react";
import { Chart } from "react-chartjs-2";
import "chart.js/auto";
import LoadingSpinner from "@/components/loading";
const inter = Inter({ subsets: ["latin"] });
const streamId =
"0x7d275b79eaed6b00eb1fe7e1174c1c6f2e711283/ethereum/gas-price";
export const options = {
responsive: true,
animation: false,
maintainAspectRatio: false,
plugins: {
legend: {
position: "top",
},
title: {
display: true,
text: "Ethereum BaseFee",
},
},
};
export default function Home() {
const [chartData, setChartData] = useState({});
const [xaxisData, setXAxisData] = useState([]);
const [yaxisData, setYAxisData] = useState([]);
const [yaxisData2, setYAxisData2] = useState([]);
const [yaxisData3, setYAxisData3] = useState([]);
const [yaxisData4, setYAxisData4] = useState([]);
const [loading, setLoading] = useState(true);
useSubscribe(streamId, {
onMessage: (msg) => {
setLoading(false);
console.log(
new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
msg.getContent()
);
const {
baseFee,
ethPrice,
eco: { feeCap },
fast: { feeCap: fastfee },
instant: { feeCap: instantfee },
} = msg.getContent();
setYAxisData((prev) => {
return [...prev, baseFee];
});
setYAxisData2((prev) => {
return [...prev, feeCap];
});
setYAxisData3((prev) => {
return [...prev, fastfee];
});
setYAxisData4((prev) => {
return [...prev, instantfee];
});
setXAxisData((prev) => {
return [
...prev,
new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
];
});
},
});
useEffect(() => {
setChartData({
labels: xaxisData,
datasets: [
{
label: "Base Fee",
data: yaxisData, //basefee
borderColor: "rgb(53, 162, 203)",
backgroundColor: "rgba(53, 162, 235, 0.5)",
},
{
label: "Econony Fee", //Economy fee
data: yaxisData2,
borderColor: "rgb(247, 70, 74)",
backgroundColor: "rgba(247, 70, 74, 0.5)",
},
{
label: "Fast Fee", //Fast fee
data: yaxisData3,
borderColor: "rgb(75, 192, 192)",
backgroundColor: "rgba(75, 192, 192, 0.5)",
},
{
label: "Instant Fee", //fast fee
data: yaxisData4,
borderColor: "rgb(255, 159, 64)",
backgroundColor: "rgba(255, 159, 64, 0.5)",
},
],
});
}, [xaxisData, yaxisData, yaxisData2, yaxisData3, yaxisData4]);
return (
<>
<Head>
<title>Getting started with Streamr</title>
<meta name="description" content="Generated by create next app" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<link rel="icon" href="/favicon.ico" />
</Head>
<main className={styles.main}>
<h1>Getting Started With Streamr</h1>
<div className={styles.center}>{loading && <LoadingSpinner />}</div>
<div className={styles.grid}>
{xaxisData &&
yaxisData &&
xaxisData.length > 0 &&
yaxisData.length > 0 && (
<div style={{ width: "1200px", height: "500px" }}>
<Chart type="line" data={chartData} />
</div>
)}
</div>
</main>
</>
);
}
At the top of the file, we imported useState
and useEffect
. The streamId
of the stream we are consuming is defined in a const
variable called streamId
const streamId = "0x7d275b79eaed6b00eb1fe7e1174c1c6f2e711283/ethereum/gas-price"
The Streamr network uses the streamId
to identify the particular stream we are interested in. We also imported the chart.js
library we will use to display real-time data. This specific stream published data every 5 seconds.
We also imported;
import { useSubscribe } from "streamr-client-react";
We can use useSubscribe
because we have set up our Provider
component on _app.js
. Take a look at the useSubscribe
hook.
useSubscribe(streamId, {
onMessage: (msg) => {
setLoading(false);
console.log(
new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
msg.getContent()
);
const {
baseFee,
ethPrice,
eco: { feeCap },
fast: { feeCap: fastfee },
instant: { feeCap: instantfee },
} = msg.getContent();
setYAxisData((prev) => {
return [...prev, baseFee];
});
setYAxisData2((prev) => {
return [...prev, feeCap];
});
setYAxisData3((prev) => {
return [...prev, fastfee];
});
setYAxisData4((prev) => {
return [...prev, instantfee];
});
setXAxisData((prev) => {
return [
...prev,
new Date(msg.messageId.timestamp).toLocaleTimeString("en-US"),
];
});
},
});
We pass the streamId
to the function; onMessage
event callback, we destructed the incoming message using msg.getContent()
. The values of baseFee
, feeCap
etc., are saved to React state.
The useEffect
function generates the data points for the chart. useEffect
executes our function when any dependencies in the dependency array change. This continuously redraws our chart.
To start the application type on the project terminal:
npm run dev
We could use this stream to monitor Ethereum prices and act if the price gets to a particular value.
Creating a Stream
In this section, we will create a stream from scratch and write data to the stream. We need some Matic in our wallet to update Streamr's stream registry to create a stream.
Creating a stream will be done on the server side of the application. An HTTP call from the client will trigger it.
Let's get started on creating the server-side method to handle the creation of a stream.
Open src/pages/api
and create a create-stream.js
file.
const StreamrClient = require("streamr-client");
export default async function handler(req, res) {
if (req.method === "POST") {
const { stream } = await createStreamrClient();
return res.status(200).json({ success: true , streamId: stream.id});
} else {
return res
.status(405)
.json({ message: "Method not allowed", success: false });
}
}
const createStreamrClient = async () => {
// Initialize the client with an Ethereum account
const streamr = new StreamrClient({
auth: {
privateKey: process.env.Private_Key,
},
});
// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.createStream({
id: "/testing/my-created-streamr", //topic must be unique
});
console.log("stream ID ", stream.id);
return { streamr, stream };
};
At the top of the file, we required streamr-client
, which we will use to create a new StreamrClient
.
Remember, in the client environment, we used window.ethereum
for authentication, but on the backend, we will use the private key of an Ethereum address that holds Matic. We store the private key as an environment variable called Private_Key
.
Note: we can set permission on who can access or write to our stream. See the documentation here(docs.streamr.network/usage/streams/permissi..)
After creating the streamr
client, we use that streamr
instance to create a stream.
// Requires MATIC tokens (Polygon blockchain gas token)
const stream = await streamr.createStream({
id: "/testing/my-created-streamr", //topic must be unique
});
The
id
must be unique, or the transaction will fail. Theid
is prefixed with the wallet address.
We create a handler
for an HTTP call that returns the streamId
to the client.
export default async function handler(req, res) {
//rest of the handler code here
}
Creating a Page to Publish Stream Data
Open src/pages
, create a new pub-data.js
file, and put the following content inside:
import React, { useState } from "react";
import styles from "@/styles/Home.module.css";
import LoadingSpinner from "@/components/loading";
import { useClient } from "streamr-client-react";
export default function PubData() {
const [data, setData] = useState("");
const [loading, setLoading] = useState(false);
const [streamId, setStreamId] = useState(null);
const client = useClient();
const publishDatatoStream = async () => {
try {
setLoading(true);
if (streamId && data) {
await client.publish(streamId, data);
}
setLoading(false);
alert("Message published succesfully");
} catch (error) {
setLoading(false);
alert(
`Oops! Something went wrong. Please refresh and try again. Error ${error}`
);
}
};
const createStream = async () => {
try {
setLoading(true);
const response = await fetch("/api/create-stream", {
method: "POST",
body: data,
});
if (response.status !== 200) {
alert("Oops! Something went wrong. Please refresh and try again.");
setLoading(false);
} else {
alert("stream created");
let responseJSON = await response.json();
const streamId = responseJSON.streamId;
console.log("streamID ", streamId);
setStreamId(streamId);
setLoading(false);
}
// check response, if success is false, dont take them to success page
} catch (error) {
setLoading(false);
alert(
`Oops! Something went wrong. Please refresh and try again. Error ${error}`
);
}
};
return (
<div className={styles.main}>
{loading && <LoadingSpinner />}
<h1>Click To Create Stream </h1>
<button
onClick={createStream}
disabled={loading}
className={styles.button}
>
Create Stream
</button>
<div>
<p>Write Message to Publish</p>
<input
type="text"
onChange={(e) => setData(e.target.value)}
value={data}
className={styles.input}
/>
</div>
<button
onClick={publishDatatoStream}
disabled={loading}
className={styles.button}
>
Publish Message
</button>
</div>
);
}
At the top of the file, we destructed useClient
from streamr-client-react
. The useClient
hook gives us an instance of the client passed to the Provider
. Only the wallet that creates a stream can write to it by default.
In the server code, we created a stream by providing the wallet private key. On the client, we must sign a transaction before writing to that created stream.
To call the server-side API, we define a function called creatStream
. This function makes a fetch
call to the /api/create-stream
API, creating the stream. The function returns the streamId
to the client if all goes well.
Calling the createStream
function creates a stream using the Matic in the wallet used for authentication. Ensure that your wallet contains Matic, or the function call will fail.
After creating the stream, we can publish data to it. The publishDatatoStream
function posts a message to the stream. The function retrieves the stream instance from useClient
and passes the streamId
and the data
we want to publish to the stream.
const publishDatatoStream = async () => {
try { //-> code removed for brevity
if (streamId && data) {
await client.publish(streamId, data);
//-> code removed for brevity
} catch (error) {
setLoading(false);
alert(`Oops! Something went wrong. Please refresh and try again. Error ${error}`);
}
};
Before the data is published, the if-block checks for a streamId
and data
.
Remember, the streamId
is saved to state when we create the stream. The data
variable comes from the input textbox, and we save it to React state.
Write some text on the textbox and click the "Publish Message" button to call the publishDatatoStream
function.
You must sign the transaction with the wallet you used to create the stream.
What's Next
In this tutorial, we have seen how to create and read streams using the Streamr network. We could extend this tutorial only to allow some selected users/wallets to write or read our stream.
Additional Resources
The code for this tutorial can be found here
Thank you for reading.