Skip to main content

Tutorial: Receive Data in Your Event Hub

This tutorial will show you how to route data to an Event Hub and consume it. You will:

  • Create an Event Hub Egress Sink
  • Configure a Stream to forward data to the Egress Sink
  • Send data to the Platform and consume it from the Event Hub

Event Hub Egress Sink Overview

Requirements

  • .NET 8.
  • If you are not registered to the Spotflow IoT Platform yet, Sign Up.
  • You need to have an existing Stream. A Stream called default-stream in Stream Group default-stream-group is created automatically after you register. Create a new one if you want to use a different Stream.
  • You need to have an existing Event Hub and Storage Account for checkpointing.

1. Route Data to a New Event Hub Egress Sink

Event Hub Egress Sink represents an existing Event Hub to which the Platform can forward data. Let's create a new one and configure a stream to forward data to it.

  1. Click on Data Flows in the left sidebar.

  2. Click on the Add Route button next to the stream that should be routed to the Event Hub.

  3. Choose the Create a New Egress Sink option and click on the Next button.

  4. Fill the name of the Egress Sink, e.g. my-eventhub. Select Azure Event Hub as the egress sink kind. And finally, paste the connection string to your eventhub into the respective input field. Click Create Egress Sink and Continue.

  5. The new Event Hub Egress Sink is now created! At this point, it is only needed to create an egress route so the data from your stream are forwarded to this sink. Review the default configuration and optionally customize it. Click Create Egress Route to finish.

  6. The route to the new Event Hub Egress Sink has been created! You should see an arrow going from your stream to the Egress Sink.

2. Send Data to Platform

Follow Tutorial: Send Data to Platform to send data to the Platform.

warning

Do not forget to send data to the Stream you configured in step 2!

If you successfully followed the previous steps, your Device is sending data to the Platform and the data is forwarded to the Event Hub. Now you are ready to consume the data.

3. Consume Data

The way you process data is dependent on your needs. But we've created a minimal program to help you with consuming data from your Event Hub - it just reads individual messages and prints the actual payload.

Get the source code from our public repository.

Spotflow.EventHubConsumer.cs
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;

using Microsoft.Extensions.Logging;

using System.Collections.Concurrent;
using System.Text;

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var eventHubsConnectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
var eventHubName = "<< NAME OF THE EVENT HUB >>";
var consumerGroup = "<< NAME OF THE EVENT HUB CONSUMER GROUP >>";

var storageClient = new BlobContainerClient(
storageConnectionString,
blobContainerName);

var processor = new EventProcessorClient(
storageClient,
consumerGroup,
eventHubsConnectionString,
eventHubName);

using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("EventHubConsumer");

var partitionEventCount = new ConcurrentDictionary<string, int>();

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));

processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;

try
{
await processor.StartProcessingAsync(cts.Token);
await Task.Delay(Timeout.Infinite, cts.Token);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
// This is expected if the cancellation token is
// signaled.
}
finally
{
// This may take up to the length of time defined
// as part of the configured TryTimeout of the processor;
// by default, this is 60 seconds.

await processor.StopProcessingAsync();

processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}

async Task processEventHandler(ProcessEventArgs args)
{
try
{
// If the cancellation token is signaled, then the
// processor has been asked to stop. It will invoke
// this handler with any events that were in flight;
// these will not be lost if not processed.
//
// It is up to the handler to decide whether to take
// action to process the event or to cancel immediately.

if (args.CancellationToken.IsCancellationRequested)
{
return;
}

var partition = args.Partition.PartitionId;
var eventBody = args.Data.EventBody.ToArray();
logger.LogInformation("Event from partition {partition} with length {eventBodyLength}.", partition, eventBody.Length);
var message = Encoding.UTF8.GetString(eventBody);
logger.LogInformation(message);

var eventsSinceLastCheckpoint = partitionEventCount.AddOrUpdate(
key: partition,
addValue: 1,
updateValueFactory: (_, currentCount) => currentCount + 1);

if (eventsSinceLastCheckpoint >= 50)
{
await args.UpdateCheckpointAsync();
partitionEventCount[partition] = 0;
}
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be redirected to
// the error handler.
}
}

Task processErrorHandler(ProcessErrorEventArgs args)
{
try
{
logger.LogError("Error in the EventProcessorClient. Operation: '{operation}' Exception: {exception}", args.Operation, args.Exception);
}
catch (Exception ex)
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be handled in any
// way.
}

return Task.CompletedTask;
}

Depending on what data you sent to the Platform, the output of the program might look like this:

info: EventHubConsumer[0]
Event from partition 0 with length 78.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:41.954036+02:00", "temperatureCelsius": 21.05}
info: EventHubConsumer[0]
Event from partition 0 with length 77.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:43.577170+02:00", "temperatureCelsius": 21.1}
info: EventHubConsumer[0]
Event from partition 0 with length 78.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:44.798378+02:00", "temperatureCelsius": 21.15}
info: EventHubConsumer[0]
Event from partition 0 with length 77.
info: EventHubConsumer[0]
{"timestamp": "2024-05-28T15:26:46.020595+02:00", "temperatureCelsius": 21.2}