Tutorial: Receive Data in Your Event Hub
This tutorial will show you how to route data to an Event Hub and consume it. You will:
- Create an Event Hub Egress Sink
- Configure a Stream to forward data to the Egress Sink
- Send data to the Platform and consume it from the Event Hub
Requirements
- .NET 8.
- If you are not registered to the Spotflow IoT Platform yet, Sign Up.
- You need to have an existing Stream.
A Stream called
default-stream
in Stream Groupdefault-stream-group
is created automatically after you register. Create a new one if you want to use a different Stream. - You need to have an existing Event Hub and Storage Account for checkpointing.
1. Route Data to a New Event Hub Egress Sink
Event Hub Egress Sink represents an existing Event Hub to which the Platform can forward data. Let's create a new one and configure a stream to forward data to it.
- Portal
- CLI
- API
Click on Data Flows in the left sidebar.
Click on the Add Route button next to the stream that should be routed to the Event Hub.
Choose the Create a New Egress Sink option and click on the Next button.
Fill the name of the Egress Sink, e.g.
my-eventhub
. SelectAzure Event Hub
as the egress sink kind. And finally, paste the connection string to your eventhub into the respective input field. Click Create Egress Sink and Continue.The new Event Hub Egress Sink is now created! At this point, it is only needed to create an egress route so the data from your stream are forwarded to this sink. Review the default configuration and optionally customize it. Click Create Egress Route to finish.
The route to the new Event Hub Egress Sink has been created! You should see an arrow going from your stream to the Egress Sink.
Create Event Hub Egress Sink
Supposing you have already installed the CLI and logged in, replace the placeholders <Your Egress Sink Name>
and <Your Event Hub Connection String>
with your Event Hub Egress Sink name and your Event Hub connection string and run the following command to create a new Event Hub Egress Sink:
spotf egress-sink create-or-update eh -n <Your Egress Sink Name> -c "<Your Event Hub Connection String>"
The CLI will confirm the creation and display the Event Hub Egress Sink details:
Name: my-event-hub-sink
WorkspaceId: 925d28eb-e162-47db-bdd0-4a7028e177fc
Properties
├── IsEnabled: true
└── Config
└── AzureEventHub
├── Endpoint: sb://myeventhub.servicebus.windows.net/
├── EventHubName: myeventhub
└── SharedAccessKeyName: Write
Create Route to the Egress Sink
Run the following command:
spotf stream route cu eh -g my-new-stream-group -s my-new-stream -n route-to-event-hub -k my-event-hub-sink
The CLI will confirm the creation and display the Egress Route details:
Name: route-to-event-hub
WorkspaceId: 925d28eb-e162-47db-bdd0-4a7028e177fc
StreamGroupName: my-new-stream-group
StreamName: my-new-stream
Properties
├── IsEnabled: true
├── EgressSinkName: my-event-hub-sink
├── Config
│ └── AzureEventHub
│ ├── ForceExternalizedPayload: false
│ └── PartitionKeyPattern
│ └── IsEnabled: false
└── Input: Messages
The following instructions expect that you have already obtained the API access token from the Portal and that you know the Workspace ID.
Create Event Hub Egress Sink
Replace the placeholders <Your Workspace ID>
, <Your Egress Sink Name>
, <Your API Access Token>
and <Your Event Hub Connection String>
with your Workspace ID, your Event Hub Egress Sink name, your API access token and your Event Hub connection string and run the following command to create a new Event Hub Egress Sink:
- cURL
- PowerShell
curl -L -X PATCH 'https://api.eu1.spotflow.io/workspaces/<Your Workspace ID>/egress-sinks/<Your Egress Sink Name>' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-H 'Authorization: Bearer <Your API Access Token>' \
--data-raw '{
"properties": {
"isEnabled": true,
"config": {
"azureEventHub": {
"connectionString": "<Your Event Hub Connection String>"
}
}
}
}'
(Invoke-WebRequest -Method Patch -Uri 'https://api.eu1.spotflow.io/workspaces/<Your Workspace ID>/egress-sinks/<Your Egress Sink Name>' `
-Headers @{
'Content-Type' = 'application/json'
'Accept' = 'application/json'
'Authorization' = 'Bearer <Your API Access Token>'
} `
-Body '{ "properties": { "isEnabled": true, "config": { "azureEventHub": { "connectionString": "<Your Event Hub Connection String>" }}}}').Content
The API will confirm the creation and display the Event Hub Egress Sink details:
{
"name": "my-event-hub-sink",
"workspaceId": "925d28eb-e162-47db-bdd0-4a7028e177fc",
"ingressVersion": "V2",
"version": 1,
"properties": {
"isEnabled": true,
"config": {
"azureEventHub": {
"endpoint": "sb://myeventhub.servicebus.windows.net/",
"eventHubName": "myeventhub",
"sharedAccessKeyName": "Write"
}
},
"protocolVersion": "V2"
}
}
Create Route to the Egress Sink
Replace the placeholders <Your Workspace ID>
, <Your Stream Group Name>
, <Your Stream Name>
, <Your Egress Route Name>
, <Your API Access Token>
and <Your Event Hub Egress Sink Name>
with your Workspace ID, your Stream Group name, your Stream name, API access token and your Event Hub Egress Sink name and run the following command to create a new Egress Route for Stream:
- cURL
- PowerShell
curl -L -X PATCH 'https://api.eu1.spotflow.io/workspaces/<Your Workspace ID>/stream-groups/<Your Stream Group Name>/streams/<Your Stream Name>/egress-routes/<Your Egress Route Name>' \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-H 'Authorization: Bearer <Your API Access Token>' \
--data-raw '{
"properties": {
"isEnabled": true,
"egressSinkName": "<Your Event Hub Egress Sink Name>",
"config": {
"azureEventHub": {
"partitionKeyPattern": {
"isEnabled": false
}
}
},
"input": "Messages"
}
}'
(Invoke-WebRequest -Method Patch -Uri 'https://api.eu1.spotflow.io/workspaces/<Your Workspace ID>/stream-groups/<Your Stream Group Name>/streams/<Your Stream Name>/egress-routes/<Your Egress Route Name>' `
-Headers @{
'Content-Type' = 'application/json'
'Accept' = 'application/json'
'Authorization' = 'Bearer <Your API Access Token>'
} `
-Body '{ "properties": { "isEnabled": true, "egressSinkName": "<Your Event Hub Egress Sink Name>", "config": { "azureEventHub": { "partitionKeyPattern": { "isEnabled": false }}}, "input": "Messages" }}').Content
The API will confirm the creation and display the Egress Route details:
{
"name": "route-to-event-hub",
"workspaceId": "925d28eb-e162-47db-bdd0-4a7028e177fc",
"streamGroupName": "my-new-stream-group",
"streamName": "my-new-stream",
"version": 1,
"properties": {
"isEnabled": true,
"egressSinkName": "my-event-hub-sink",
"config": {
"azureEventHub": {
"disablePayloadEmbedding": false,
"partitionKeyPattern": {
"isEnabled": false
},
"forceExternalizedPayload": false
}
},
"input": "Messages"
}
}
2. Send Data to Platform
Follow Tutorial: Send Data to Platform to send data to the Platform.
Do not forget to send data to the Stream you configured in step 2!
If you successfully followed the previous steps, your Device is sending data to the Platform and the data is forwarded to the Event Hub. Now you are ready to consume the data.
3. Consume Data
The way you process data is dependent on your needs. But we've created a minimal program to help you with consuming data from your Event Hub - it just reads individual messages and prints the actual payload.
- C#
Get the source code from our public repository.
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
using System.Text;
var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";
var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";
var storageClient = new BlobContainerClient(
storageConnectionString,
blobContainerName);
var processor = new EventProcessorClient(
storageClient,
consumerGroup,
eventHubsConnectionString,
eventHubName);
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("EventHubConsumer");
var partitionEventCount = new ConcurrentDictionary<string, int>();
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
try
{
await processor.StartProcessingAsync(cts.Token);
await Task.Delay(Timeout.Infinite, cts.Token);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
// This is expected if the cancellation token is
// signaled.
}
finally
{
// This may take up to the length of time defined
// as part of the configured TryTimeout of the processor;
// by default, this is 60 seconds.
await processor.StopProcessingAsync();
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
async Task processEventHandler(ProcessEventArgs args)
{
try
{
// If the cancellation token is signaled, then the
// processor has been asked to stop. It will invoke
// this handler with any events that were in flight;
// these will not be lost if not processed.
//
// It is up to the handler to decide whether to take
// action to process the event or to cancel immediately.
if (args.CancellationToken.IsCancellationRequested)
{
return;
}
var partition = args.Partition.PartitionId;
var eventBody = args.Data.EventBody.ToArray();
logger.LogInformation("Event from partition {partition} with length {eventBodyLength}.", partition, eventBody.Length);
var message = Encoding.UTF8.GetString(eventBody);
logger.LogInformation(message);
var eventsSinceLastCheckpoint = partitionEventCount.AddOrUpdate(
key: partition,
addValue: 1,
updateValueFactory: (_, currentCount) => currentCount + 1);
if (eventsSinceLastCheckpoint >= 50)
{
await args.UpdateCheckpointAsync();
partitionEventCount[partition] = 0;
}
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be redirected to
// the error handler.
}
}
Task processErrorHandler(ProcessErrorEventArgs args)
{
try
{
logger.LogError("Error in the EventProcessorClient. Operation: '{operation}' Exception: {exception}", args.Operation, args.Exception);
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be handled in any
// way.
}
return Task.CompletedTask;
}
Depending on what data you sent to the Platform, the output of the program might look like this:
info: EventHubConsumer[0]
Event from partition 0 with length 78.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:41.954036+02:00", "temperatureCelsius": 21.05}
info: EventHubConsumer[0]
Event from partition 0 with length 77.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:43.577170+02:00", "temperatureCelsius": 21.1}
info: EventHubConsumer[0]
Event from partition 0 with length 78.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:44.798378+02:00", "temperatureCelsius": 21.15}
info: EventHubConsumer[0]
Event from partition 0 with length 77.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:46.020595+02:00", "temperatureCelsius": 21.2}