Skip to main content

Python

You can use the Python package spotflow_device to integrate your Devices with the Spotflow IoT Platform, see Get Started.

Requirements

  • Python 3.7 or newer

Installation

Install the package using the following command:

pip install spotflow_device

Basic Usage

The following code connects the Device to the Platform and starts sending simulated sensor measurements.

You need to register to the Platform and set it up before you can use the code. See Get Started for more information on configuring the Platform, registering your Device, and viewing the received data. Don't forget to replace <Your Provisioning Token> with the actual Provisioning Token from the Platform.

import datetime
import json
import time
from spotflow_device import ProvisioningToken, DeviceClientOptions, DeviceClient

# Connect to the Platform (starts Device Provisioning if the Device is not already registered)
options = DeviceClientOptions("spotflow.db", ProvisioningToken("<Your Provisioning Token>"), "my-device")
client = DeviceClient.start(options)

# Create a sender to the default stream
sender = client.create_stream_sender(stream_group = "default", stream = "default")

for i in range(0, 60):
# Generate new "measured" values
temperature = 21 + (i * 0.05)
humidity = 50 + (i * 0.1)

# Send and display the measured data
payload = json.dumps({
"timestamp": datetime.datetime.now().astimezone().isoformat(),
"temperatureCelsius": temperature,
"humidityPercent": humidity
})
sender.send_message(payload.encode())
print(payload)

# Pause till next iteration
time.sleep(5)

# Wait for all the messages to be sent
while client.pending_messages_count > 0:
time.sleep(1)

Logging

The package uses the standard Python logging. Use the following code to log all the events to the standard output:

import logging

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

Reference

class SpotflowError(builtins.Exception):

Any function, method, or property can throw this exception when it cannot continue because of an error. Errors that happen during the communication in the background are logged using the package logging.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
class DeviceClientOptions:

A set of options that specify how to connect to the Platform.

You must provide database_file and provisioning_token when you create the instance of the class. After you configure all the options, pass the instance of DeviceClientOptions to DeviceClient.start.

DeviceClientOptions( database_file:str, provisioning_token: spotflow_device.ProvisioningToken, device_id: Optional[str]=None)
display_provisioning_operation_callback: Optional[Callable[[spotflow_device.ProvisioningOperation], NoneType]]

The function that displays the details of the Provisioning Operation when DeviceClient.start is performing Device Provisioning. See Get Started for hands-on experience with this process.

By default, DeviceClient.start writes the following text to the standard output:

Provisioning operation initialized, waiting for approval.
Operation ID: b271c7ae-8930-40cd-83d1-623a0c92939b
Verification Code: 8rria3bv

An example how to change this behavior:

from spotflow_device import DeviceClientOptions, ProvisioningOperation, ProvisioningToken

def italian_display_callback(operation: ProvisioningOperation):
print('Operazione di fornitura inizializzata, in attesa di approvazione.')
print()
print(f'ID dell'operazione: {operation.id}')
print(f'Codice di verifica: {operation.verification_code}')

options = DeviceClientOptions("spotflow.db", ProvisioningToken("<Your Provisioning Token>"), "my-device")
options.display_provisioning_operation_callback = italian_display_callback
database_file: str

The path to the local database file where the Device SDK stores the connection credentials and temporarily persists incoming and outgoing messages. DeviceClient.start creates the file if it doesn't exist.

The file must end with the suffix ".db", for example, "spotflow.db". If you don't use an absolute path, the file is created relative to the current working directory.

device_id: Optional[str]

The ID of the Device you are running the code from. If you don't specify it here, you'll need to either store it in the Provisioning Token, or choose it during the approval of the Provisioning Operation. See Device Provisioning for more details about the process.

Make sure that no two Devices in the same Workspace use the same ID. Otherwise, unexpected errors can occur during the communication with the Platform.

instance: Optional[str]

The URI/hostname of the Platform instance where the Device will connect to.

If your company uses a dedicated instance of the Platform, such as acme.spotflow.io, specify it here. The default value is api.eu1.spotflow.io.

desired_properties_updated_callback: Optional[Callable[[spotflow_device.DesiredProperties], NoneType]]

The function that is called right after DeviceClient.start with the current version of the Desired Properties and then whenever the Device receives their update from the Platform.

The function is called in a separate thread, so make sure that you properly synchronize access to your shared resources. The whole interface of the Device SDK is thread-safe, so it's safe to use it in the function.

An example how to register the callback:

from spotflow_device import DeviceClientOptions, DesiredProperties, ProvisioningToken

def on_desired_properties_updated(properties: DesiredProperties):
print(f"Desired Properties updated to version {properties.version}: {properties.values}")

options = DeviceClientOptions("spotflow.db", ProvisioningToken("<Your Provisioning Token>"), "my-device")
options.desired_properties_updated_callback = on_desired_properties_updated
class ProvisioningToken:

A Provisioning Token. See Get Started for the instructions how to create it.

ProvisioningToken( token:str)
class ProvisioningOperation:

The summary of an ongoing Provisioning Operation.

If you specify a custom callback to DeviceClientOptions.display_provisioning_operation_callback, you'll receive a ProvisioningOperation as its argument.

id: str

(Read-only) The ID of this Provisioning Operation.

verification_code: str

(Read-only) The verification code of this Provisioning Operation.

expiration_time: str

(Read-only) The expiration time of this Provisioning Operation. The operation is no longer valid after that.

The date/time format is RFC 3339.

class Compression:

An enum that specifies the compression to use for sending a message. There are three options:

  • UNCOMPRESSED - Don't compress the message.
  • FASTEST - Compress the message using the fastest compression algorithm settings.
  • SMALLEST_SIZE - Compress the message using the algorithm settings that produce the smallest size. Beware that this may be significantly slower than the fastest compression. We recommend to test the performance of your application with this setting before using it in production.
class DeviceClient:

A client connected to the Platform. Create its instance using DeviceClient.start.

The client stores all outgoing communication to the local database file (specified in DeviceClientOptions.database_file) and then sends it in a background thread asynchronously. Thanks to that, it works even when the connection is unreliable. Similarly, the client also stores all ingoing communication to the local database file and deletes it only after the application processes it.

Start communicating with the Platform.

If the Device is not yet registered in the Platform, or its Registration Token is expired, this method performs Device Provisioning and waits for the approval. See Get Started for the instructions how to set up the Platform, start Device Provisioning, and approve the Device.

If the Registration Token from the last run is still valid, this method succeeds even without the connection to the Internet. The Device SDK will store all outgoing communication in the local database file and send it once it connects to the Platform.

def create_stream_sender( self, stream_group: Optional[str]=None, stream: Optional[str]=None, compression: Optional[ spotflow_device.Compression]=< Compression. UNCOMPRESSED:0>) -> spotflow_device.StreamSender:

Create a StreamSender for sending Messages to a Stream that is contained in a Stream Group.

If stream_group is omitted, the Platforms directs the Messages to the default Stream Group of the current Workspace. If stream is ommited, the Platform directs the Messages into the default Stream of the given Stream Group.

def get_desired_properties_if_newer( self, version: Optional[int]=None) -> Optional[ spotflow_device.DesiredProperties]:

Get the current Desired Properties if their version is higher than version or if version is None. Otherwise, return None.

Only the latest version is returned, any versions between the last obtained one and the current one are skipped.

def get_desired_properties( self) -> spotflow_device.DesiredProperties:

Get the current Desired Properties.

Only the latest version is returned, any versions between the last obtained one and the current one are skipped.

def update_reported_properties( self, properties:dict) -> None:

Enqueue an update of the Reported Properties to be sent to the Platform.

This method saves these Reported Properties persistently in the local database file. The update will be sent asynchronously when possible. The update may be sent later depending on Internet connectivity and other factors. To be sure that it has been sent to the Platform, call any_pending_reported_properties_updates.

pending_messages_count: int

The number of Messages that have been persisted in the local database file but haven't been sent to the Platform yet.

device_id: str

(Read-only) The Device ID. Note that the value might differ from the one requested in DeviceClientOptions if the technician overrides it during the approval of the Provisioning Operation.

workspace_id

(Read-only) The ID of the Workspace to which the Device belongs.

any_pending_reported_properties_updates: bool

(Read-only) Whether are there any updates to Reported Properties that are yet to be sent to the Platform.

class StreamSender:

A sender of Messages to a Stream.

Create it with DeviceClient.create_stream_sender. StreamSender will send all the Messages to the given Stream.

def send_message( self, payload:str|bytes, batch_id: Optional[str]=None, message_id: Optional[str]=None, batch_slice_id: Optional[str]=None, chunk_id: Optional[str]=None) -> None:

Enqueue a Message to be sent to the Platform.

If the Stream doesn't have a Message ID Autofill Pattern, you must provide the message_id. If the Stream groups Messages into Batches and doesn't have a Batch ID Autofill Pattern, you must provide the batch_id. See User Guide for more details. Optionally, you can provide also batch_slice_id to use Batch Slices and chunk_id to use Message Chunking.

The method returns right after it saves the Message to the queue in the local database file. A background thread asynchronously sends the messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

def complete_batch( self, batch_id:str) -> None:

Enqueue the manual completion of the current Batch to be sent to the Platform.

The Platform also completes the previous Batch automatically when the new one starts. Therefore, you might not need to call this method at all. See User Guide for more details.

The method returns right after it saves the batch-completing Message to the queue in the local database file. A background thread asynchronously sends the messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

def complete_message( self, batch_id:str, message_id:str) -> None:

Enqueue the manual completion of the current Message to be sent to the Platform. Use this methods when Message Chunking is used.

The method returns right after it saves the message-completing Message to the queue in the local database file. A background thread asynchronously sends the Messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

class DesiredProperties:

A wrapper of Desired Properties.

version: int

The version of the current properties.

values: dict

The dictionary containing the values of the individual properties.