Skip to main content

Reference for Python Device SDK

You can use the Python package spotflow_device to integrate your Devices with the Spotflow IoT Platform, see Get Started.

Requirements

  • Python 3.7 or newer

Installation

Install the package using the following command:

pip install --upgrade spotflow-device

Basic Usage

The following code connects the Device to the Platform and starts sending simulated sensor measurements.

You need to register to the Platform and set it up before you can use the code. See Get Started for more information on configuring the Platform, registering your Device, and viewing the received data. Don't forget to replace <Your Provisioning Token> with the actual Provisioning Token from the Platform.

import datetime
import json
import time
from spotflow_device import DeviceClient

# Connect to the Platform (starts Device Provisioning if the Device is not already registered)
client = DeviceClient.start(device_id="my-device", provisioning_token="<Your Provisioning Token>", db="spotflow.db")

# Create a sender to the default stream
sender = client.create_stream_sender(stream_group = "default-stream-group", stream = "default-stream")

for i in range(0, 60):
# Send and display the measured data
payload = json.dumps({
"timestamp": datetime.datetime.now().astimezone().isoformat(),
"temperatureCelsius": 21 + (i * 0.05),
"humidityPercent": 50 + (i * 0.1)
})
sender.send_message(payload.encode())
print(payload)

# Pause till next iteration
time.sleep(5)

Logging

The package uses the standard Python logging. Use the following code to log all the events to the standard output:

import logging

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

Reference

class SpotflowError(builtins.Exception):

Any function, method, or property can throw this exception when it cannot continue because of an error. Errors that happen during the communication in the background are logged using the package logging.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
class ProvisioningOperation:

The summary of an ongoing Provisioning Operation.

If you specify a custom callback to DeviceClient.start, you'll receive a ProvisioningOperation as its argument.

id: str

(Read-only) The ID of this Provisioning Operation.

verification_code: str

(Read-only) The verification code of this Provisioning Operation.

expiration_time: str

(Read-only) The expiration time of this Provisioning Operation. The operation is no longer valid after that.

The date/time format is RFC 3339.

class Compression:

An enum that specifies the compression to use for sending a message. There are three options:

  • UNCOMPRESSED - Don't compress the message.
  • FASTEST - Compress the message using the fastest compression algorithm settings.
  • SMALLEST_SIZE - Compress the message using the algorithm settings that produce the smallest size. Beware that this may be significantly slower than the fastest compression. We recommend to test the performance of your application with this setting before using it in production.
class DeviceClient:

A client connected to the Platform. Create its instance using DeviceClient.start.

The client stores all outgoing communication to the local database file and then sends it in a background thread asynchronously. Thanks to that, it works even when the connection is unreliable. Similarly, the client also stores all ingoing communication to the local database file and deletes it only after the application processes it.

def start( device_id: Optional[str], provisioning_token:str, db:str, instance: Optional[str]=None, display_provisioning_operation_callback: Optional[ Callable[[ spotflow_device.ProvisioningOperation], NoneType]]=None, desired_properties_updated_callback: Optional[ Callable[[ spotflow_device.DesiredProperties], NoneType]]=None) -> spotflow_device.DeviceClient:

Start communicating with the Platform. Options:

  • device_id: The unique Device ID you are running the code from. If you don't specify it here, you'll need to either store it in the Provisioning Token, or choose Provisioning Operation.
  • provisioning_token: The Provisioning Token used to start Device Provisioning.
  • db: The path to the local database file where the Device SDK stores the connection credentials and temporarily persists incoming and outgoing messages. This method creates the file if it doesn't exist. The file must end with the suffix ".db", for example, "spotflow.db". If you don't use an absolute path, the file is created relative to the current working directory.
  • instance: The URI/hostname of the Platform instance where the Device will connect to. If your company uses a dedicated instance of the Platform, such as acme.spotflow.io, specify it here. The default value is api.eu1.spotflow.io.
  • display_provisioning_operation_callback: The function that displays the details of the Provisioning Operation. Set it to override the default message that is written to the standard output.
  • desired_properties_updated_callback: The function that is called right after DeviceClient.start with the current version of the Desired Properties and then whenever the Device receives their update from the Platform. The Device configuration tutorial shows how to use this option. The function is called in a separate thread, so make sure that you properly synchronize access to your shared resources. The whole interface of the Device SDK is thread-safe, so it's safe to use it in the function.

If the Device is not yet registered in the Platform, or its Registration Token is expired, this method performs Device Provisioning and waits for the approval. See Get Started for the instructions how to create the Provisioning Token, start Device Provisioning, and approve the Device.

If the Registration Token from the last run is still valid, this method succeeds even without the connection to the Internet. The Device SDK will store all outgoing communication in the local database file and send it once it connects to the Platform.

workspace_id

(Read-only) The ID of the Workspace to which the Device belongs.

device_id: str

(Read-only) The Device ID. Note that the value might differ from the one requested in DeviceClient.start if the technician overrides it during the approval of the Provisioning Operation.

def create_stream_sender( self, stream_group: Optional[str]=None, stream: Optional[str]=None, compression: Optional[ spotflow_device.Compression]=< Compression. UNCOMPRESSED:0>) -> spotflow_device.StreamSender:

Create a StreamSender for sending Messages to a Stream that is contained in a Stream Group.

If stream_group is omitted, the Platforms directs the Messages to the default Stream Group of the current Workspace. If stream is ommited, the Platform directs the Messages into the default Stream of the given Stream Group.

pending_messages_count: int

The number of Messages that have been persisted in the local database file but haven't been sent to the Platform yet.

def wait_enqueued_messages_sent( self) -> None:

Block the current thread until all the Messages that have been previously enqueued are sent to the Platform.

def get_desired_properties( self) -> spotflow_device.DesiredProperties:

Get the current Desired Properties.

Only the latest version is returned, any versions between the last obtained one and the current one are skipped.

def get_desired_properties_if_newer( self, version: Optional[int]=None) -> Optional[ spotflow_device.DesiredProperties]:

Get the current Desired Properties if their version is higher than version or if version is None. Otherwise, return None.

Only the latest version is returned, any versions between the last obtained one and the current one are skipped.

def update_reported_properties( self, properties:dict) -> None:

Enqueue an update of the Reported Properties to be sent to the Platform.

This method saves these Reported Properties persistently in the local database file. The update will be sent asynchronously when possible. The update may be sent later depending on Internet connectivity and other factors. To be sure that it has been sent to the Platform, call any_pending_reported_properties_updates.

any_pending_reported_properties_updates: bool

(Read-only) Whether are there any updates to Reported Properties that are yet to be sent to the Platform.

class StreamSender:

A sender of Messages to a Stream.

Create it with DeviceClient.create_stream_sender. StreamSender will send all the Messages to the given Stream.

def send_message( self, payload:str|bytes, batch_id: Optional[str]=None, message_id: Optional[str]=None, batch_slice_id: Optional[str]=None, chunk_id: Optional[str]=None) -> None:

Send a Message to the Platform.

Warning: This method blocks the current thread until the Message (and all the Messages enqueued before it) is sent to the Platform. If your Device doesn't have a stable Internet connection, consider using enqueue_message instead.

If the Stream doesn't have a Message ID Autofill Pattern, you must provide the message_id. If the Stream groups Messages into Batches and doesn't have a Batch ID Autofill Pattern, you must provide the batch_id. See User Guide for more details. Optionally, you can provide also batch_slice_id to use Batch Slices and chunk_id to use Message Chunking.

def enqueue_message( self, payload:str|bytes, batch_id: Optional[str]=None, message_id: Optional[str]=None, batch_slice_id: Optional[str]=None, chunk_id: Optional[str]=None) -> None:

Enqueue a Message to be sent to the Platform.

If the Stream doesn't have a Message ID Autofill Pattern, you must provide the message_id. If the Stream groups Messages into Batches and doesn't have a Batch ID Autofill Pattern, you must provide the batch_id. See User Guide for more details. Optionally, you can provide also batch_slice_id to use Batch Slices and chunk_id to use Message Chunking.

The method returns right after it saves the Message to the queue in the local database file. A background thread asynchronously sends the messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

def enqueue_batch_completion( self, batch_id:str) -> None:

Enqueue the manual completion of the current Batch to be sent to the Platform.

The Platform also completes the previous Batch automatically when the new one starts. Therefore, you might not need to call this method at all. See User Guide for more details.

The method returns right after it saves the batch-completing Message to the queue in the local database file. A background thread asynchronously sends the messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

def enqueue_message_completion( self, batch_id:str, message_id:str) -> None:

Enqueue the manual completion of the current Message to be sent to the Platform. Use this methods when Message Chunking is used.

The method returns right after it saves the message-completing Message to the queue in the local database file. A background thread asynchronously sends the Messages from the queue to the Platform. You can check the number of pending messages in the queue using DeviceClient.pending_messages_count.

class DesiredProperties:

A wrapper of Desired Properties.

version: int

The version of the current properties.

values: dict

The dictionary containing the values of the individual properties.