Skip to main content

Tutorial: Receive Data in Your Databricks

This tutorial will show you how to route data to a Databricks. You will:

  • Create a free Databricks Workspace in case you don't have any
  • Create a Delta Egress Sink
  • Configure a Stream to forward data to the Egress Sink
  • See data sent by your Device in Databricks

Delta Egress Sink Overview

Requirements

  • .NET 8.
  • If you are not registered to the Spotflow IoT Platform yet, Sign Up.
  • You need to have an existing Stream. A Stream called default-stream in Stream Group default-stream-group is created automatically after you register. Create a new one if you want to use a different Stream.
  • You need to have an existing Azure Storage Account to store your data. The Storage Account needs to have Hierarchical namespace enabled.
  • Databricks runs on Microsoft Azure, AWS, or Google cloud. You need to have an account with one of the major cloud providers.

1. Create Databricks Workspace

info

You can skip this step if you already have an existing Databricks Workspace.

To give you a quick start, we've created a simple tutorial on how to create a Databricks Workspace as a 14-day free trial in Azure. For other cloud providers, it should be equally simple.

Go to Azure Portal to create Databricks Workspace. Log in if you are requested to.

  1. Fill in or select all fields on the Basics tab. Most importantly, select Trial (Premium - 14-Days Free DBUs) Pricing Tier to get a free trial. Click Review + create.

  2. If the validation succeeds, click Create.

  3. After the Databricks Workspace is created, open it in Azure Portal and click Launch Workspace on the Overview page.

  4. The Databricks web interface opens. Let's put that aside for a moment and get down to configuring the Spotflow IoT Platform to route data to your Databricks Workspace.

2. Route Data to a new Delta Egress Sink

A Delta Egress Sink represents an existing Storage Account to which the Platform can forward data. Let's create a new one and configure a stream to forward data to it.

Currently, creating a Delta Egress sink requires Azure Storage Connection String which is a compact representation of identifier of the target account and access key. For help on how to get the connection string, see this Azure guide.

  1. Click on Data Flows in the left sidebar.

  2. Click on the Add Route button next to the stream that should be routed to the Delta Egress Sink.

  3. Choose the Create a New Egress Sink option and click on the Next button.

  4. Fill the name of the Egress Sink, e.g. databricks. Select Delta as the egress sink kind. To configure the connection, you need to provide a connection string to the Azure Storage Account and a name of a container. Click Create Egress Sink and Continue.

  5. The new Delta Egress Sink is now created! At this point, it is only needed to create an egress route so the data from your stream are forwarded to this sink. Review the default configuration and optionally customize it. Configure the path where the we should store the Delta table. Click Create Egress Route to finish.

  6. The route to the new Delta Egress Sink has been created! You should see an arrow going from your stream to the Egress Sink.

3. Send Data to Platform

Follow Tutorial: Send Data to Platform to send data to the Platform.

warning

Do not forget to send data to the Stream you configured in step 2!

If you successfully followed the previous steps, your Device is sending data to the Platform and the data is forwarded to the delta table in your Storage Account. Now you are ready to consume the data.

4. See Data Sent by Your Device in Databricks

Now, you'll create a notebook in Databricks. Our example uses Python & PySpark, but you can use different languages if needed:

# Prepare information about the target Delta Table

storage_account_name = "<Your Stroage Account Name>"
sas_token = "<Your SAS Token For Your Container>"
container_name = "<Your Container Name>"
directory_path = "<Your Delta Table Directory Path>"

# Configure PySpark

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account_name}.dfs.core.windows.net", sas_token)

# Load the Delta Table as a PySpark data frame

df = spark.read.format("delta").load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{directory_path}")

# Transform the data frame (e.g. sort)

df_sorted = df.sort("ingress_enqueued_date_time")

# Check the result

df.toPandas()

How to get the SAS token

The service-level SAS or container-level SAS can be used. If using service-level SAS, Container and Object resource types must be allowed. In both cases it is necessary to grant Read and List permissions. The SAS token can be generated using Azure CLI, Azure Portal, Azure SDK, or manually.

Now, go back to your Databricks Workspace (launch it from the Azure Portal) and follow these steps to create a notebook and query your data:

  1. In Databricks, click Create notebook

  2. Paste the Python code snippet you copied earlier into the notebook. Replace the placeholders <Your Stroage Account Name>, <Your SAS Token For Your Container>, <Your Container Name>, and <Your Delta Table Directory Path> with the corresponding values. Click Run all.

  3. Voilà, you can explore the data.

This short code snippet loads and prints the data from the connected Delta table. You can easily extend the notebook to perform data analysis according to your needs.

Parsing JSON data

If the payload_line column contains a JSON string, it can be simply parsed and used e.g. for filtering.

Suppose that the JSON string in the payload_line column looks like this:

{
"data": 42
}

Then, following PySpark code can be used to filter on the data property:

# Import functions needed for parsing JSON

from pyspark.sql.functions import from_json

# Load the Delta Table as a PySpark data frame as in the previous example

df = spark.read.format("delta").load(...)

# Add a new column with the parsed JSON

df_with_payload_json = df.withColumn("payload_json", from_json(df.payload_line, "MAP<STRING,INT>"))

# Filter

df_filtered = df_with_payload_json.filter(df_with_payload_json.payload_json.data >= 42)

# Check the result

df_filtered.toPandas()