iot2mqtt package

Submodules

iot2mqtt.abstract module

This module defines various abstract representations of IoT device states and attributes. It provides a set of Pydantic models and enumerations to represent the state and configuration of different types of IoT devices, such as switches, sensors, and alarms.

Constants

  • POWER_ON: String constant representing the “ON” state.

  • POWER_OFF: String constant representing the “OFF” state.

class iot2mqtt.abstract.ADC(*, last_seen: datetime | None = None, Range: float | None = None)[source]

Bases: DeviceState

Represents the state of an ADC (Analog-to-Digital Converter) device.

Range

The range value of the ADC.

Type:

Optional[float]

voltage

The computed voltage based on the range value.

Type:

float

Range: float | None
property voltage: float

Calculate the voltage based on the range.

Returns:

The calculated voltage in volts.

Return type:

float

class iot2mqtt.abstract.AirSensor(*, last_seen: datetime | None = None, humidity: float | None = None, temperature: float | None = None)[source]

Bases: DeviceState

Represents the state of an air sensor device.

humidity

The humidity level measured by the sensor.

Type:

Optional[float]

temperature

The temperature measured by the sensor.

Type:

Optional[float]

humidity: float | None
temperature: float | None
class iot2mqtt.abstract.Alarm(*, last_seen: datetime | None = None, alarm: bool | None = None, battery_low: bool | None = None, duration: int | None = None, melody: int | None = None, volume: Literal['low', 'medium', 'high'] | None = None)[source]

Bases: DeviceState

Represents the state of an alarm device.

alarm

Indicates whether the alarm is active.

Type:

Optional[bool]

battery_low

Indicates whether the battery is low.

Type:

Optional[bool]

duration

Duration of the alarm.

Type:

Optional[int]

melody

Melody of the alarm.

Type:

Optional[int]

volume

Volume level of the alarm.

Type:

Optional[Literal[“low”, “medium”, “high”]]

alarm: bool | None
battery_low: bool | None
duration: int | None
melody: int | None
volume: Literal['low', 'medium', 'high'] | None
class iot2mqtt.abstract.AlarmButton(*, last_seen: datetime | None = None, battery: int | None = None, action: AlarmButtonlValues = None, linkquality: int | None = None)[source]

Bases: DeviceState

Represents the state of a alarm controler device.

Attributes: battery (Optional[int]): The battery level of the device. action (AlarmButtonlValues): The action performed by the alarm controler. linkquality (Optional[int]): The link quality of the device.

action: AlarmButtonlValues
battery: int | None
linkquality: int | None
class iot2mqtt.abstract.AlarmButtonlValues(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumeration representing possible alarm controler actions.

EMERGENCY_ACTION

Represents an emergency alarm action.

DISARM_ACTION

Represents a disarm action.

ARM_DAYZONES_ACTION

Represents an arm day zones action.

ARM_ALL_ACTION

Represents an arm all zones action.

ARM_ALL_ACTION = 'arm_all_zones'
ARM_DAYZONES_ACTION = 'arm_day_zones'
DISARM_ACTION = 'disarm'
EMERGENCY_ACTION = 'emergency'
class iot2mqtt.abstract.AlarmVolumes(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumeration representing possible alarm volume levels.

LOW

Low volume level.

MEDIUM

Medium volume level.

HIGH

High volume level.

HIGH = 'high'
LOW = 'low'
MEDIUM = 'medium'
class iot2mqtt.abstract.Availability(*, is_online: bool)[source]

Bases: BaseModel

Represents the availability status of a device.

is_online

Indicates whether the device is online. This field is immutable.

Type:

bool

is_online: bool
class iot2mqtt.abstract.Button(*, last_seen: datetime | None = None, action: ButtonValues = None)[source]

Bases: DeviceState

Represents the state of a button device.

action

The action performed by the button.

Type:

ButtonValues

action: ButtonValues
class iot2mqtt.abstract.ButtonValues(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Enumeration representing possible button actions.

SINGLE_ACTION

Represents a single button press action.

DOUBLE_ACTION

Represents a double button press action.

LONG_ACTION

Represents a long button press action.

DOUBLE_ACTION = 'double'
LONG_ACTION = 'long'
SINGLE_ACTION = 'single'
class iot2mqtt.abstract.DeviceState(*, last_seen: datetime | None = None)[source]

Bases: BaseModel

Root class for all device state classes.

This class serves as the base for various device state representations, providing common attributes and functionality.

last_seen

The timestamp of when the device was last seen. This field can be accessed using aliases “last_seen” or “Time”.

Type:

Optional[datetime]

last_seen: datetime | None
class iot2mqtt.abstract.DoorSensor(*, last_seen: datetime | None = None, contact: bool | None = None, battery: int | None = None, voltage: int | None = None, tamper: bool | None = None, battery_low: bool | None = None, linkquality: int | None = None)[source]

Bases: DeviceState

Represents the state of a door sensor device.

This class models the state information for door/window contact sensors, including contact status, battery information, tamper detection, and signal quality.

contact

Contact state of the sensor. - True: Contact is closed (door/window closed) - False: Contact is open (door/window open) - None: Contact state unknown

Type:

Optional[bool]

battery

Remaining battery percentage (0-100).

Type:

Optional[int]

voltage

Battery voltage in millivolts.

Type:

Optional[int]

tamper

Tamper detection status. - True: Device has been tampered - False: No tampering detected - None: Tamper status unknown

Type:

Optional[bool]

battery_low

Low battery warning indicator. - True: Battery is critically low - False: Battery level is okay - None: Battery status unknown

Type:

Optional[bool]

linkquality

Signal strength/link quality indicator (0-255).

Type:

Optional[int]

battery: int | None
battery_low: bool | None
contact: bool | None
linkquality: int | None
tamper: bool | None
voltage: int | None
class iot2mqtt.abstract.MagicCube(*, last_seen: datetime | None = None, battery: int | None = None, voltage: int | None = None, power_outage_count: int | None = None, operation_mode: Literal['action_mode', 'scene_mode'] | None = None, side: int | None = None, action_from_side: int | None = None, action_angle: float | None = None, action: Literal['shake', 'throw', 'tap', 'slide', 'flip180', 'flip90', 'hold', 'side_up', 'rotate_left', 'rotate_right', '1_min_inactivity', 'flip_to_side'] | None = None, linkquality: int | None = None)[source]

Bases: DeviceState

Represents the state of a Cube T1 Pro device from Aquara.

action: Literal['shake', 'throw', 'tap', 'slide', 'flip180', 'flip90', 'hold', 'side_up', 'rotate_left', 'rotate_right', '1_min_inactivity', 'flip_to_side'] | None
action_angle: float | None
action_from_side: int | None
battery: int | None
linkquality: int | None
operation_mode: Literal['action_mode', 'scene_mode'] | None
power_outage_count: int | None
side: int | None
voltage: int | None
class iot2mqtt.abstract.Motion(*, last_seen: datetime | None = None, occupancy: bool | None = None, tamper: bool | None = None)[source]

Bases: DeviceState

Represents the state of a motion sensor device.

occupancy

Indicates whether motion is detected.

Type:

Optional[bool]

tamper

Indicates whether the device has been tampered with.

Type:

Optional[bool]

occupancy: bool | None
tamper: bool | None
class iot2mqtt.abstract.Registry(*, device_ids: List[str] = [])[source]

Bases: BaseModel

Represents a registry of dicovered devices.

device_ids

A list of device ids.

Type:

List[str]

device_ids: List[str]
class iot2mqtt.abstract.SomfyDevice(*, last_seen: ~datetime.datetime | None = None, name: str | None = None, remoteAddress: str | None = None, direction: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, lastRollingCode: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunSensor: ~typing.Annotated[bool | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunFlag: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunny: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, windy: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None)[source]

Bases: DeviceState

Represents the state of a Somfy RTS device.

direction: <lambda>)]
lastRollingCode: <lambda>)]
name: str | None
remoteAddress: str | None
sunFlag: <lambda>)]
sunSensor: <lambda>)]
sunny: <lambda>)]
windy: <lambda>)]
class iot2mqtt.abstract.SomfyGroup(*, last_seen: ~datetime.datetime | None = None, name: str | None = None, remoteAddress: str | None = None, direction: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, lastRollingCode: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunSensor: ~typing.Annotated[bool | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunFlag: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunny: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, windy: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, groupId: int | None = None)[source]

Bases: SomfyDevice

groupId: int | None
class iot2mqtt.abstract.SomfyShade(*, last_seen: ~datetime.datetime | None = None, name: str | None = None, remoteAddress: str | None = None, direction: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, lastRollingCode: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunSensor: ~typing.Annotated[bool | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunFlag: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, sunny: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, windy: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, shadeId: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, shadeType: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, tiltType: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, flipCommands: ~typing.Annotated[bool | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, flipPosition: ~typing.Annotated[bool | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, position: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, tiltPosition: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, target: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, tiltTarget: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, mypos: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, myTiltPos: ~typing.Annotated[int | str | None, ~pydantic.functional_validators.BeforeValidator(func=~iot2mqtt.abstract.<lambda>)] = None, cmd: str | None = None, cmdAddress: str | None = None, cmdSource: str | None = None)[source]

Bases: SomfyDevice

cmd: str | None
cmdAddress: str | None
cmdSource: str | None
flipCommands: <lambda>)]
flipPosition: <lambda>)]
myTiltPos: <lambda>)]
mypos: <lambda>)]
position: <lambda>)]
shadeId: <lambda>)]
shadeType: <lambda>)]
target: <lambda>)]
tiltPosition: <lambda>)]
tiltTarget: <lambda>)]
tiltType: <lambda>)]
class iot2mqtt.abstract.SrtsA01(*, last_seen: datetime | None = None, away_preset_temperature: Annotated[float, None, Interval(gt=-10.0, ge=None, lt=35.0, le=None), None, None] | None = None, battery: int | None = None, calibrated: bool | None = None, child_lock: bool | None = None, device_temperature: float | None = None, external_temperature_input: Annotated[float, None, Interval(gt=0, ge=None, lt=55, le=None), None, None] | None = None, internal_heating_setpoint: float | None = None, linkquality: int | None = None, local_temperature: float | None = None, occupied_heating_setpoint: Annotated[float, None, Interval(gt=5, ge=None, lt=30, le=None), None, None] | None = None, power_outage_count: int | None = None, preset: Literal['manual', 'away', 'auto'] | None = None, schedule: bool | None = None, schedule_settings: str | None = None, sensor: Literal['internal', 'external'] | None = None, setup: bool | None = None, system_mode: Literal['off', 'heat'] | None = None, update: dict | None = None, valve_alarm: bool | None = None, valve_detection: bool | None = None, voltage: int | None = None, window_detection: bool | None = None, window_open: bool | None = None)[source]

Bases: DeviceState

Represents the state of a Smart radiator thermostat AQARA SRTS-A01.

away_preset_temperature: Annotated[float, None, Interval(gt=-10.0, ge=None, lt=35.0, le=None), None, None] | None
battery: int | None
calibrated: bool | None
child_lock: bool | None
device_temperature: float | None
external_temperature_input: Annotated[float, None, Interval(gt=0, ge=None, lt=55, le=None), None, None] | None
internal_heating_setpoint: float | None
linkquality: int | None
local_temperature: float | None
occupied_heating_setpoint: Annotated[float, None, Interval(gt=5, ge=None, lt=30, le=None), None, None] | None
power_outage_count: int | None
preset: Literal['manual', 'away', 'auto'] | None
schedule: bool | None
schedule_settings: str | None
sensor: Literal['internal', 'external'] | None
setup: bool | None
system_mode: Literal['off', 'heat'] | None
update: dict | None
valve_alarm: bool | None
valve_detection: bool | None
voltage: int | None
window_detection: bool | None
window_open: bool | None
class iot2mqtt.abstract.Switch(*, last_seen: datetime | None = None, power_on_behavior: str | None = None, power: Literal['ON', 'OFF'] | None)[source]

Bases: DeviceState

Represents the state of a switch device.

power_on_behavior

The behavior of the switch when power is restored.

Type:

Optional[str]

power

The current power state of the switch: “ON” or “OFF” This field can be accessed using aliases “power”, “state”, or “POWER”.

Type:

str

power: Literal['ON', 'OFF'] | None
power_on_behavior: str | None
class iot2mqtt.abstract.Switch2Channels(*, last_seen: datetime | None = None, power1: Literal['ON', 'OFF'] | None = None, power2: Literal['ON', 'OFF'] | None = None)[source]

Bases: DeviceState

Represents the state of a switch device with two channels.

power1

The current power state of the first channel: “ON” or “OFF” This field can be accessed using aliases “power1” or “POWER1”.

Type:

str

power2

The current power state of the second channel: “ON” or “OFF” This field can be accessed using aliases “power2” or “POWER2”.

Type:

str

power1: Literal['ON', 'OFF'] | None
power2: Literal['ON', 'OFF'] | None

iot2mqtt.central module

Module for managing MQTT topics and processing messages for IoT devices.

This module includes classes and functions to manage MQTT topic configurations, subscribe to topics, process incoming messages, and trigger state changes on devices. It supports multiple protocols such as Zigbee2MQTT and Tasmota.

Classes

  • Scrutinizer: Subscribes to MQTT topics and processes incoming messages.

  • DeviceAccessor: Accesses device state via MQTT.

Functions

  • get_refined_data_queue: Returns a queue of refined messages by processing raw messages from MQTT.

class iot2mqtt.central.DeviceAccessor(mqtt_client: ClientHelper)[source]

Bases: object

switch_power_change(device_ids: str, protocol: Protocol, model: Model, power_on: bool, countdown: float = 0, on_time: float = 0.0, off_time: float = 0.0) None[source]

Manage the power state change of switch devices.

This function handles the power state change of switchs, optionally scheduling the change to occur after a countdown. It also manages the timing for turning the devices on and off based on the provided on_time and off_time parameters.

Parameters:
  • device_ids (str) – A comma-separated string of switch ids.

  • protocol (dev.Protocol) – The protocol used by the device.

  • model (dev.Model) – The model of the device.

  • power_on (bool) – The desired power state (True for ON, False for OFF).

  • countdown (float, optional) – The countdown time in seconds before the power state change occurs. Defaults to 0.

  • on_time (float, optional) – The duration in seconds to keep the device ON. Defaults to DEFAULT_ON_TIME.

  • off_time (float, optional) – The duration in seconds to keep the device OFF. Defaults to DEFAULT_OFF_TIME.

Returns:

None

Note

The discovery step is not required for this function to work, but the protocol and model must be provided compared to switch_power_change_helper() function.

switch_power_change_helper(device_ids: str, power_on: bool, countdown: float = 0, on_time: float = 0.0, off_time: float = 0.0) None[source]

Helper function to change the power state of switch devices.

This function retrieves devices from the device directory based on the provided device names, and then calls the switch_power_change function to change their power state.

Parameters:
  • device_ids (str) – A comma-separated string of switch ids.

  • power_on (bool) – The desired power state. True to power on, False to power off.

  • countdown (float, optional) – The countdown period in seconds before changing the power state. Defaults to 0.

  • on_time (float, optional) – The duration in seconds for which the device should remain powered on. Defaults to DEFAULT_ON_TIME.

  • off_time (float, optional) – The duration in seconds for which the device should remain powered off. Defaults to DEFAULT_OFF_TIME.

Returns:

None

Note

The discovery step must be performed before calling this function.

trigger_change_state(device_id: str, protocol: Protocol, state: Dict) None[source]

Publish a state change message to the MQTT topic for the given device.

Parameters:
  • device_id (str) – The id of the device.

  • protocol (dev.Protocol) – The communication protocol.

  • state (Dict) – The new state to be published.

Note

Refer to the documentation of the iot2mqtt.abstract module to generate the state, by the use of the model_dump method.

trigger_get_state(device_id: str, protocol: Protocol, model: Model) None[source]

Triggers the retrieval of the current state of a device via MQTT.

This method publishes state retrieval commands to the appropriate MQTT topics based on the device model and protocol. It uses the encoder registry to get the fields that can be retrieved for the given device model and constructs the MQTT topics accordingly.

Parameters:
  • device_id (str) – The id of the device for which the state is being retrieved.

  • protocol (dev.Protocol) – The communication protocol used by the device (e.g., Z2M, TASMOTA).

  • model (dev.Model) – The model of the device.

Raises:

NotImplementedError – If the protocol is unknown or not supported.

Note

If the encoder for the given device model is not found, a debug message is logged and the method returns without publishing any messages.

class iot2mqtt.central.ESPSomfyMessageParser[source]

Bases: MessageParser

Parses incoming ESPSomfy messages.

static parse(protocol: Protocol, message_type: MessageType, topic: str, raw_payload: str) Dict | str | int | List[Dict] | None[source]

Parse the incoming MQTT message according to the specified protocol and message type.

class iot2mqtt.central.JsonMessageParser[source]

Bases: MessageParser

Parses incoming MQTT messages in JSON format.

static parse(protocol: Protocol, message_type: MessageType, topic: str, raw_payload: str) Dict | str | int | List[Dict] | None[source]

Parse the incoming MQTT message according to the specified protocol and message type.

class iot2mqtt.central.MessageParser[source]

Bases: ABC

Abstract base class for parsing incoming MQTT messages.

abstract static parse(protocol: Protocol, message_type: MessageType, topic: str, raw_payload: str) Dict | str | int | List[Dict] | None[source]

Parse the incoming MQTT message according to the specified protocol and message type.

class iot2mqtt.central.MessageStructure(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Incoming MQTT message determining the parsing of the messages.

ESPSOMFY = 'espsomfy'
JSON = 'json'
RAW = 'raw'
class iot2mqtt.central.RawMessageParser[source]

Bases: MessageParser

Parses incoming MQTT messages in raw format.

static parse(protocol: Protocol, message_type: MessageType, topic: str, raw_payload: str) Dict | str | int | List[Dict] | None[source]

Parse the incoming MQTT message according to the specified protocol and message type.

class iot2mqtt.central.Scrutinizer(mqtt_client: ClientHelper, output_queue: Queue, protocols_expected: List[Protocol] = None, queue_timeout: int = 1)[source]

Bases: object

A class responsible for subscribing to MQTT topics and processing incoming messages.

The Scrutinizer class subscribes to various MQTT topics based on protocol and message type, processes incoming messages, and places the processed messages into an output queue of raw data.

Parameters:
  • mqtt_client (mqtthelper.ClientHelper) – The MQTT client helper instance.

  • output_queue (Queue) – The queue where the raw data is placed.

  • protocols_expected (List[dev.Protocol], optional) – List of expected protocols. None for all.

  • queue_timeout (int, optional) – Timeout for queue operations in seconds. Defaults to 1.

iot2mqtt.central.get_refined_data_queue(mqtt_client: ClientHelper, protocols_expected: List[Protocol] = None, models_expected: List[Model] = None, devices_expected: List[str] = None) Queue[source]

Creates and returns a queue of refined messages by processing raw messages from MQTT.

This function sets up a message processing pipeline that: 1. Captures raw MQTT messages via a Scrutinizer 2. Processes discovery messages to identify devices 3. Resolves device models and processes availability/state messages 4. Normalizes messages into standardized formats

Parameters:
  • mqtt_client (mqtthelper.ClientHelper) – The MQTT client helper instance

  • protocols_expected (List[dev.Protocol], optional) – List of protocols to filter messages by

  • models_expected (List[dev.Model], optional) – List of device models to filter messages by

  • devices_expected (List[str], optional) – List of device IDs to filter messages by

Returns:

A queue containing the refined (processed) messages that match the specified filters

Return type:

Queue

Note

The pipeline includes a 1-second delay after discovery messages to ensure all devices are properly identified before processing state messages.

iot2mqtt.central.is_message_expected(message: Message, types_expected: List[MessageType] = None, protocols_expected: List[Protocol] = None, models_expected: List[Model] = None, devices_expected: List[str] = None) bool[source]

Validates if a message matches the expected criteria for message type, protocol, model and device.

Parameters:
  • message – The messenger.Message object to validate

  • types_expected – List of allowed message types

  • protocols_expected – List of allowed protocols

  • models_expected – List of allowed device models

  • devices_expected – List of allowed device IDs

Returns:

True if message matches all specified criteria, False otherwise

Return type:

bool

Note

If any of the expected lists are None, that criteria is not checked. The message must have non-None values for message_type, protocol and model.

iot2mqtt.dev module

This module defines various enumerations and classes related to IoT device models, protocols, and actions. It also includes custom exceptions for handling specific error cases.

Classes and Enums

  • Model: Represents a model of an IoT device.

  • ModelFactory: Singleton class responsible for managing instances of Model objects.

  • Protocol: Enumeration representing different communication protocols used by IoT devices.

  • Device: Represents a generic IoT device in the system.

  • ButtonAction: Enumeration defining button action values.

class iot2mqtt.dev.ButtonAction(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration defining button action values.

This enum defines string constants for possible button actions like single press, double press, long press, etc.

SINGLE_ACTION

Represents a single button press action.

Type:

str

DOUBLE_ACTION

Represents a double button press action.

Type:

str

LONG_ACTION

Represents a long button press action.

Type:

str

DOUBLE_ACTION = 'double'
LONG_ACTION = 'long'
SINGLE_ACTION = 'single'
class iot2mqtt.dev.Device(*, name: str = (FieldInfo(annotation=NoneType, required=True, frozen=True),), protocol: Protocol = (FieldInfo(annotation=NoneType, required=True, frozen=True),), address: str | None = (None,), model: Model | None = (None,))[source]

Bases: BaseModel

Represents a generic IoT device in the system.

This class serves as a base model for all types of IoT devices. It defines common properties and methods that all devices should have. Specific device types should inherit from this class and add their own unique properties and methods.

name

The human-readable name of the device. This field is immutable.

Type:

str

protocol

The communication protocol used by the device. This field is immutable.

Type:

Protocol

address

The network address of the device. Defaults to None.

Type:

Optional[str]

model

The model of the device. Defaults to None.

Type:

Optional[Model]

address: str | None
model: Model | None
name: str
protocol: Protocol
class iot2mqtt.dev.Model(*, tag: str)[source]

Bases: BaseModel

Represents a model of an IoT device.

Parameters:

tag (str) – The tag associated with the device model.

tag: str
class iot2mqtt.dev.ModelFactory[source]

Bases: object

ModelFactory is a singleton class responsible for managing instances of Model objects.

This class ensures that only one instance of each Model is created and provides a thread-safe mechanism for accessing these instances.

UNKNOWN: Model = Model(tag='UNKNOWN')
classmethod get(tag: str) Model[source]

Retrieve a Model instance by its tag. If the tag does not exist, create a new Model instance with the given tag.

Parameters:

tag (str) – The tag associated with the Model instance.

Returns:

The Model instance associated with the given tag, or a new instance if the tag does not exist.

Return type:

Model

class iot2mqtt.dev.Protocol(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration for different communication protocols used by IoT devices.

This enum class represents various communication protocols that IoT devices can use to interact with each other and with central systems. Each protocol is represented as a string constant.

DEFAULT

Default protocol, used as a fallback.

Type:

str

ESPSOMFY

The protocol used by ESPHome Somfy RTS devices.

Type:

str

HOMIE

The Homie IoT convention for MQTT.

Type:

str

RING

The protocol used by Ring devices.

Type:

str

SHELLY

The protocol used by Shelly smart home devices.

Type:

str

TASMOTA

The protocol used by Tasmota firmware for ESP8266/ESP32 boards.

Type:

str

Z2M

The Zigbee2MQTT protocol for Zigbee devices.

Type:

str

Z2T

The Zigbee2Tasmota protocol for Zigbee devices using Tasmota firmware.

Type:

str

DEFAULT = 'default'
ESPSOMFY = 'ESPSomfy'
HOMIE = 'Homie'
RING = 'Ring'
SHELLY = 'Shelly'
TASMOTA = 'Tasmota'
Z2M = 'Zigbee2MQTT'
Z2T = 'Zigbee2Tasmota'

iot2mqtt.encoder module

This module provides functionality for encoding and validating device states, managing timers, and handling power state changes for various device models.

Classes

  • Encoder: Transforms and validates device states.

  • EncoderRegistry: Manages encoders for different device models.

Functions

  • encode: Encodes the state of a device model into a dictionary format using the appropriate encoder.

class iot2mqtt.encoder.Encoder(*, settable_fields: List[str], gettable_fields: List[str], field_aliases: Dict[str, str] | None = None, field_converters: Dict[str, Callable[[Any], Any]] | None = None)[source]

Bases: BaseModel

Encoder class for transforming and validating device states.

field_aliases: Dict[str, str] | None
field_converters: Dict[str, Callable[[Any], Any]] | None
gettable_fields: List[str]
settable_fields: List[str]
transform(state: DeviceState) Dict[source]

Transforms the given device state into an encoded dictionary.

Parameters:

state (abstract.DeviceState) – The current state of the device to be encoded.

Returns:

The encoded state of the device as a dictionary.

Return type:

Dict

class iot2mqtt.encoder.EncoderRegistry(models: Model, settable_fields: List[str], gettable_fields: List[str], field_aliases: Dict[str, str] | None = None, field_converters: Dict[str, Callable[[Any], Any]] | None = None)[source]

Bases: object

A registry for managing encoders for different device models.

This class allows for the creation and retrieval of encoders that are responsible for transforming and validating device states for various device models. Encoders are stored in a registry and can be accessed using the device model as the key.

_registry

A dictionary that maps device models to their corresponding encoders.

Type:

Dict[dev.Model, Encoder]

Parameters:
  • models (dev.Model) – The device models for which the encoder is being created.

  • settable_fields (List[str]) – List of fields that can be set for the device model.

  • gettable_fields (List[str]) – List of fields that can be retrieved for the device model.

  • field_aliases (Optional[Dict[str, str]], optional) – Optional dictionary of field aliases. Defaults to None.

  • field_converters (Optional[Dict[str, Callable[[Any], Any]]], optional) – Optional dictionary of field converters. Defaults to None.

Raises:

ValidationError – If there is an error creating the encoder for the given models.

static get_encoder(model: Model) Encoder | None[source]

Retrieves the encoder for the given device model.

Parameters:

model (dev.Model) – The device model for which the encoder is being retrieved.

Returns:

The encoder for the given device model, or None if no encoder is found.

Return type:

Optional[Encoder]

iot2mqtt.encoder.encode(model: Model, state: DeviceState) Dict[str, Any][source]

Encode the state of a device model into a dictionary format using the appropriate encoder.

Parameters:
  • model (dev.Model) – The device model for which the state needs to be encoded.

  • state (abstract.DeviceState) – The current state of the device to be encoded.

Returns:

The encoded state of the device as a dictionary. If no encoder is found, the state is returned as a dictionary without transformation.

Return type:

Dict[str, Any]

iot2mqtt.messenger module

This module defines the messaging system for the IoT framework, including message types, message structures, and the mechanisms for producing and dispatching messages.

Classes

  • MessageType: Enumeration for IoT message types.

  • Item: Represents a raw data item in the IoT system.

  • Message: Represents a message in the IoT system.

  • QueueManager: Base class for managing input and output queues.

  • Producer: Thread-safe message producer that puts messages onto an output queue.

  • Dispatcher: Thread-safe message dispatcher that processes messages from an input queue based on specified conditional handlers.

Functions

  • is_type_discovery: Checks if a message is of type discovery.

  • is_type_availability: Checks if a message is of type availability.

  • is_type_state: Checks if a message is of type state.

Examples

Here is an example of how to access incoming MQTT messages and process them using a dispatcher according to their message type:

import time
import iot2mqtt as i2m

TARGET = "localhost"

def main():
    _client = i2m.mqtthelper.ClientHelper(
        i2m.mqtthelper.MQTTContext(hostname=TARGET), i2m.mqtthelper.SecurityContext()
    )
    _client.start()
    _refined_queue = i2m.central.get_refined_data_queue(_client)

    i2m.messenger.Dispatcher(
        input_queue=_refined_queue,
        output_queue=None,
        conditional_handlers=[
            (i2m.messenger.is_type_availability,
            lambda msg: print(f"Availability: {msg.device_id} {msg.refined}")),
        ],
    )


if __name__ == "__main__":
    main()
    for _ in range(10):
        # Wait for 10 seconds before exiting
        print(".", end="", flush=True)
        time.sleep(1)

# Display :
# Availability: SWITCH_CAVE is_online=True
# Availability: SWITCH_PLUG is_online=False
class iot2mqtt.messenger.Dispatcher(input_queue: Queue, output_queue: Queue | None, conditional_handlers: List[Tuple[Callable[[Message], bool], Callable[[Message], Message | None]]], default_handler: Callable[[Message], Message | None] | None = None, name: str | None = None)[source]

Bases: QueueManager

Dispatcher is a thread-safe message dispatcher that processes messages from an input queue based on specified conditional handlers and optionally forwards the processed messages to an output queue.

Parameters:
  • input_queue (queue.Queue) – The queue from which messages are consumed.

  • output_queue (Optional[queue.Queue]) – The queue to which processed messages are forwarded.

  • conditional_handlers (ConditionalProcessingList) – A list of tuples where each tuple contains a condition function and a handler function.

  • default_handler (Optional[Handler]) – A default handler function to process messages that do not match any condition. Defaults to None.

  • name (Optional[str]) – An optional name for the dispatcher. If not provided, a default name is generated.

STOP

Sentinel value used to signal the dispatcher to stop processing messages.

Type:

str

STOP = 'STOP'
force_stop() None[source]

Forcefully stops the dispatcher by setting the stop event.

This method signals the dispatcher to stop processing messages by setting the internal stop event. It is typically used to gracefully shut down the dispatcher when it is no longer needed. Once the stop event is set, the dispatcher will complete processing any current message and then exit its run loop.

Logs a debug message indicating that the dispatcher has been forcefully stopped.

Returns:

None

stop_loop() None[source]

Process all pending messages and stop the loop.

class iot2mqtt.messenger.Item(*, data: Dict | str | int | List[Dict])[source]

Bases: BaseModel

Represents a raw data item in the IoT system.

data

The data associated with the item. It can be a dictionary, a string, or a list of dictionaries.

Type:

Union[Dict, str, List[Dict]]

data: Dict | str | int | List[Dict]
class iot2mqtt.messenger.Message(*, protocol: Protocol, model: Model | None, device_id: str, message_type: MessageType, raw_item: Item, id: UUID = None, refined: Annotated[Item | None, SerializeAsAny()] = None)[source]

Bases: BaseModel

Represents a message in the IoT system.

protocol

The communication protocol used by the device.

Type:

dev.Protocol

model

The model of the device, if available.

Type:

Optional[dev.Model]

device_id

The id of the device.

Type:

str

message_type

The type of the message (e.g., discovery, availability, state).

Type:

MessageType

raw_item

The raw data item associated with the message.

Type:

Item

id

A unique identifier for the message, generated by default.

Type:

UUID

refined

An optional refined version of the raw item.

Type:

Optional[Item]

device_id: str
id: UUID
message_type: MessageType
model: Model | None
protocol: Protocol
raw_item: Item
refined: Annotated[Item | None, SerializeAsAny()]
class iot2mqtt.messenger.MessageType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration for IOT message types.

AVAIL = 'availability'
DISCO = 'discovery'
STATE = 'state'
class iot2mqtt.messenger.Producer(output_queue: Queue)[source]

Bases: QueueManager

Producer is a thread-safe message producer that puts messages onto an output queue.

This class extends the QueueManager to provide functionality for producing messages and placing them onto an output queue for further processing by consumers.

Parameters:

output_queue (queue.Queue) – The queue to which produced messages are forwarded.

put(message: Message | str) None[source]

Puts a message onto the output queue.

Parameters:

message (Union[Message, str]) – The message to be placed onto the output queue.

Returns:

None

class iot2mqtt.messenger.QueueManager(input_queue: Queue, output_queue: Queue)[source]

Bases: object

QueueManager is a base class that implements a thread-safe message queue manager.

This class provides the basic functionality for managing input and output queues that can be used by derived classes to implement specific message processing logic.

Parameters:
  • input_queue (queue.Queue) – The queue from which messages are consumed.

  • output_queue (queue.Queue) – The queue to which processed messages are forwarded.

iot2mqtt.messenger.is_type_availability(msg: Message) bool[source]
iot2mqtt.messenger.is_type_discovery(msg: Message) bool[source]
iot2mqtt.messenger.is_type_state(msg: Message) bool[source]

iot2mqtt.mqtthelper module

This module provides helper classes and functions for managing MQTT client operations using the paho.mqtt.client library. It includes context and security configurations, as well as client management for connecting, disconnecting, and handling MQTT events.

class iot2mqtt.mqtthelper.ClientHelper(context: MQTTContext, security_ctxt: SecurityContext)[source]

Bases: Client

ClientHelper is a helper class for managing MQTT client operations.

This class extends the paho.mqtt.client.Client class and provides additional functionality for handling MQTT connections, disconnections, and event callbacks.

This constructor sets up the MQTT client with the provided context and security settings. It also initializes the connection and disconnection handlers.

Parameters:
  • context (MQTTContext) – The context containing MQTT connection parameters such as hostname, port

  • security_ctxt (SecurityContext) – The security context containing TLS and authentication settings.

connect_handler_add(handler: Callable) None[source]

Adds a handler to be called when the client successfully connects to the MQTT broker.

This method allows you to register a callback function that will be invoked whenever the client establishes a connection to the MQTT broker.

disconnect(*argc, **argv) MQTTErrorCode[source]

Disconnects the client from the MQTT broker.

This method overrides the default disconnect method to add logging functionality. It logs the result of the disconnection request and then returns the result code.

Parameters:
  • *argc – Variable length argument list passed to the superclass disconnect method.

  • **argv – Arbitrary keyword arguments passed to the superclass disconnect method.

Returns:

The result code of the disconnection request.

Return type:

mqtt.MQTTErrorCode

disconnect_handler_add(handler: Callable) None[source]

Adds a handler to be called when the client disconnects from the MQTT broker.

This method allows you to register a callback function that will be invoked whenever the client disconnects from the MQTT broker.

Returns:

None

loop_forever(*argv, **kwargs) MQTTErrorCode[source]

Start a new thread to run the network loop.

start() MQTTErrorCode[source]

Starts the client MQTT network loop.

Connects the client if not already connected, and starts the network loop.

Returns:

The result of calling client.loop_start().

Return type:

mqtt.MQTTErrorCode

Raises:

RuntimeError – If loop_start fails or connection is refused.

stop() MQTTErrorCode[source]

Stops the client MQTT connection.

Stops the async loop if it was not already running and calls disconnect() to close the client connection.

Returns:

The result of calling disconnect().

Return type:

mqtt.MQTTErrorCode

Raises:

RuntimeError – If loop_stop fails.

class iot2mqtt.mqtthelper.MQTTClientDeprecated(*args, **kwargs)[source]

Bases: ClientHelper

This class provides additional helper methods for MQTT operations. Could evolve to a more generic class in the future.

default_message_callback_add(callback: Callable) None[source]

Adds a callback function to the list of default message callbacks.

Parameters:

callback (Callable) – The callback function to be added.

Returns:

None

publish_and_wait(topic: str, payload: str, timeout: float, **kwargs) MQTTMessageInfo[source]

Publish a message on a topic and wait for it to be published.

Parameters:
  • topic (str) – The topic on which to publish the message.

  • payload (str) – The message payload.

  • timeout (float) – The maximum time to wait for the message to be published.

  • **kwargs – Additional keyword arguments to pass to the publish method.

Returns:

Information about the published message.

Return type:

mqtt.MQTTMessageInfo

Raises:

ValueError – If any of the parameters are of incorrect type.

subscribe_handler_add(handler: Callable)[source]

Adds a subscribe handler.

Subscribe handlers are called when a subscription is received from the MQTT broker.

Parameters:

handler (Callable) – The callback function to handle the subscribe event.

Returns:

None

class iot2mqtt.mqtthelper.MQTTContext(hostname: str = '127.0.0.1', client_id: str = '', port: int = 1883, keepalive: int = 60, clean_start: bool = False)[source]

Bases: object

MQTTContext holds the configuration for the MQTT client.

hostname

The hostname of the MQTT broker. Defaults to “127.0.0.1”.

Type:

str

client_id

The client ID to use for the MQTT connection. Defaults to an empty string.

Type:

str

port

The port number to connect to the MQTT broker. Defaults to 1883.

Type:

int

keepalive

The keepalive interval in seconds for the MQTT connection. Defaults to 60.

Type:

int

clean_start

Indicates whether to start with a clean session. Defaults to False.

Type:

bool

connected

Indicates whether the client is currently connected. Initialized to False.

Type:

bool

started

Indicates whether the client has started. Initialized to False.

Type:

bool

loop_forever_used

Indicates whether the loop_forever method has been used. Initialized to False.

Type:

bool

__post_init__()[source]

Validates the parameters after initialization.

clean_start: bool = False
client_id: str = ''
hostname: str = '127.0.0.1'
keepalive: int = 60
port: int = 1883
class iot2mqtt.mqtthelper.SecurityContext(tls: bool = False, user_name: str | None = None, user_pwd: str | None = None)[source]

Bases: object

SecurityContext holds the security-related configuration for the MQTT client.

tls

Indicates whether TLS should be used for the connection. Defaults to False.

Type:

bool

user_name

The username for MQTT authentication. Defaults to None.

Type:

Optional[str]

user_pwd

The password for MQTT authentication. Defaults to None.

Type:

Optional[str]

tls: bool = False
user_name: str | None = None
user_pwd: str | None = None

iot2mqtt.processor module

This module provides various classes and functions for processing IoT messages within the iot2mqtt framework. It includes abstract base classes, utility functions, and concrete implementations for handling different types of messages and device protocols.

class iot2mqtt.processor.AvailabilityNormalizer[source]

Bases: Processor

A processor that normalizes the availability status of devices based on their protocol and raw message data.

The AvailabilityNormalizer class is responsible for interpreting raw availability data from different protocols (e.g., TASMOTA, Z2M) and converting it into a standardized Availability object indicating whetherthe device is online or offline.

Constants:
  • ONLINE (abstract.Availability): Represents an online availability status.

  • OFFLINE (abstract.Availability): Represents an offline availability status.

OFFLINE = Availability(is_online=False)
ONLINE = Availability(is_online=True)
process(message: Message) Message | None[source]

Processes a message to normalize its availability status based on the device protocol and raw data.

Parameters:

message (messenger.Message) – The message containing raw availability data to be normalized.

Returns:

The message with refined availability data, or None if the message available.

Return type:

Optional[messenger.Message]

Raises:
  • DecodingException – If the message type is not available, the protocol is

  • not supported, or the raw data format is incorrect.

class iot2mqtt.processor.DeviceDirectory[source]

Bases: object

A directory for managing devices.

The DeviceDirectory class provides methods to update, retrieve, and list devices. It maintains an internal dictionary to store device information.

static get_device(device_id: str) Device | None[source]

Retrieves a device from the directory by its name.

Parameters:

device_id (str) – The id of the device to retrieve.

Returns:

The device object if found, otherwise None.

Return type:

Optional[dev.Device]

static get_device_ids() List[Device][source]

Retrieves a list of all device ids in the directory.

Returns:

A list of all device ids in the directory.

Return type:

List[dev.Device]

static get_devices() List[Device][source]

Retrieves a list of all devices in the directory.

Returns:

A list of all device objects in the directory.

Return type:

List[dev.Device]

update_devices(devices: List[Device]) None[source]

Updates the device directory with a list of devices.

Parameters:

devices (List[dev.Device]) – A list of devices to be added or updated in the directory.

class iot2mqtt.processor.Discoverer[source]

Bases: Processor

A processor that discovers and registers devices based on incoming discovery messages.

The Discoverer class processes discovery messages from different protocols (e.g., Z2M, TASMOTA) to update the device directory with new devices and their details.

directory = <iot2mqtt.processor.DeviceDirectory object>
process(message: Message) Message | None[source]

Processes a discovery message to update the device directory and refine the message.

Parameters:

message (messenger.Message) – The discovery message to be processed.

Returns:

The refined message with discovered devices, or the original message if the protocol is unknown.

Return type:

Optional[messenger.Message]

Raises:

DecodingException – If not a discovery message.

class iot2mqtt.processor.ESPSomfyConfig(*, path: str = None, device: ESPSomfyDevice)[source]

Bases: BaseModel

device: ESPSomfyDevice
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'device': FieldInfo(annotation=ESPSomfyDevice, required=True), 'path': FieldInfo(annotation=str, required=False, default=None, alias='~', alias_priority=2)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

path: str
class iot2mqtt.processor.ESPSomfyDevice(*, via_device: str = None, model: str)[source]

Bases: BaseModel

address: str
model: str
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'address': FieldInfo(annotation=str, required=False, default=None, alias='via_device', alias_priority=2), 'model': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class iot2mqtt.processor.ESPSomfyDiscovery(*, config: ESPSomfyConfig)[source]

Bases: BaseModel

A dictionary representing the discovery message configuration of an ESPSomfy device.

config: ESPSomfyConfig
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'config': FieldInfo(annotation=ESPSomfyConfig, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class iot2mqtt.processor.MessageLogger[source]

Bases: Processor

A processor that logs messages for debugging purposes.

The MessageLogger class processes messages by logging them using the configured logger.

process(message: Message) None[source]

Logs the given message for debugging purposes.

Parameters:

message (messenger.Message) – The message to be logged.

Returns:

None

class iot2mqtt.processor.MessageWritter(file_name: str)[source]

Bases: Processor

A processor that writes messages to a file.

The MessageWritter class processes messages by writing them to a specified file in JSON format. It ensures that the file is properly opened and closed.

process(message: Message) Message[source]

Initializes the MessageWritter with the specified file name.

Parameters:

file_name (str) – The name of the file where messages will be written.

Returns:

None

class iot2mqtt.processor.ModelResolver[source]

Bases: Processor

A processor that resolves the model of a device based on its name.

The ModelResolver class processes messages to determine the model of the device by looking it up in the device directory. If the model is unknown, it logs a warning.

process(message: Message) Message | None[source]

Resolves the model of the device in the given message.

Parameters:

message (messenger.Message) – The message containing the device name.

Returns:

The message with the resolved model, or the original message if the model is unknown.

Return type:

Optional[messenger.Message]

Raises:

DecodingException – If a discovery message is received.

class iot2mqtt.processor.Processor[source]

Bases: object

An abstract base class for processing messages.

The Processor class defines the interface for processing messages. Subclasses must implement the process method. It also provides static methods for no-op and pass-through message handling.

static no_op(message: Message) None[source]

Default no-op function when message callback is not provided.

static pass_through(message: Message) Message[source]

Copy of the message without any processing.

abstract process(message: Message) Message | None[source]
class iot2mqtt.processor.StateNormalizer[source]

Bases: Processor

A processor that normalizes the state of various devices based on their model and protocol.

The StateNormalizer class is responsible for refining raw message data into structured device state representations. It supports different device models and protocols.

process(message: Message) Message | None[source]

Processes a message to normalize its state based on the device model and protocol.

Parameters:

message (messenger.Message) – The message containing raw data to be normalized.

Returns:

The message with refined state data, or None if no refinement is possible.

Return type:

Optional[messenger.Message]

Raises:

DecodingException – If the message model is not supported, the raw data format is incorrect, or an error occurs during the refinement process.

class iot2mqtt.processor.StateNormalizerFactory(initial_registry: Dict[Model, Type[DeviceState]] | None = None)[source]

Bases: object

Factory class for managing state normalizers for different device models.

This class provides methods to register and retrieve state normalizers, which are responsible for refining raw message data into structured device state representations. It maintains an internal registry to store the mapping between device models and their corresponding state normalizers.

classmethod get(model: Model) Type[DeviceState] | None[source]

Retrieve the target abstract type for a given model.

Parameters:

model (dev.Model) – The device model for which to retrieve the state normalizer.

Returns:

The state normalizer class for the given model, or None if the model is not found in the registry.

Return type:

Optional[Type[abstract.DeviceState]]

classmethod register(model: Model, abstract_type: Type[DeviceState]) None[source]

Register a target abstract type for the given model.

Parameters:
  • model (dev.Model) – The device model to register.

  • abstract_type (Type[abstract.DeviceState]) – The state normalizer class to associate with the given model.

class iot2mqtt.processor.TasmotaDiscovery(*, hn: str = None, md: str = None, t: str = None)[source]

Bases: BaseModel

A dictionary representing the discovery message configuration of an Tasmota device.

address: str
device_id: str
model: str
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'address': FieldInfo(annotation=str, required=False, default=None, alias='hn', alias_priority=2), 'device_id': FieldInfo(annotation=str, required=False, default=None, alias='t', alias_priority=2), 'model': FieldInfo(annotation=str, required=False, default=None, alias='md', alias_priority=2)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

iot2mqtt.processor.is_button_action_expected(msg: Message, device_ids: str, action: ButtonAction) bool[source]

Checks if the given message contains a button action for the specified device.

Parameters:
  • msg (messenger.Message) – The message to check for a button action.

  • device_ids (str) – The device name, a comma-separated list of device ids or * for all to check for button press.

  • action (dev.ButtonAction) – The button action to check for.

Returns:

True if the message contains the specified button action, False otherwise.

Return type:

bool

iot2mqtt.processor.is_message_typing(msg: Message, expected_type: Type[DeviceState]) bool[source]

Checks if the given message is of the expected type.

iot2mqtt.processor.is_motion_detected(msg: Message, device_ids: str) bool[source]

Checks if motion is detected in the given message for the specified device.

Parameters:
  • msg (messenger.Message) – The message to check for motion detection.

  • device_ids (str) – The device name, a comma-separated list of device ids or * for all to check for motion.

Returns:

True if motion is detected, False otherwise.

Return type:

bool

iot2mqtt.processor.is_switch_power_expected(msg: Message, device_ids: str | None, is_on: bool) bool[source]

Checks if the power status of the switch is as expected.

Parameters:
  • msg (messenger.Message) – The message containing the switch state.

  • device_ids (str) – The device name, a comma-separated list of device ids or * for all to check for switch state.

  • is_on (bool) – The expected power status of the switch.

Returns:

True if the power status of the switch is as expected, False otherwise.

Return type:

bool

iot2mqtt.setup module

This module defines the default configuration values for the iot2mqtt application. It includes the initialization of supported device models and the setup of state normalizers and encoder registries.

class iot2mqtt.setup.Models(*args: Any, **kwargs: Any)[source]

Bases: object

Default configuration values for the iot2mqtt application.

CTP_R01 = Model(tag='CTP-R01')
E3 = Model(tag='E3')
HM_ALARM_BUTTON = Model(tag='HM1RC-2-E')
MIFLORA = Model(tag='Miflora')
NEO_ALARM = Model(tag='NAS-AB02B2')
RING_CAMERA = Model(tag='RingCamera')
SHELLY_PLUGS = Model(tag='Shelly Plug S')
SHELLY_UNI = Model(tag='Shelly Uni')
SN_AIRSENSOR = Model(tag='SNZB-02')
SN_BUTTON = Model(tag='SNZB-01')
SN_MINI = Model(tag='ZBMINI-L')
SN_MINI_L2 = Model(tag='ZBMINIL2')
SN_MOTION = Model(tag='SNZB-03')
SN_SMART_PLUG = Model(tag='S26R2ZB')
SN_ZBBRIDGE = Model(tag='Sonoff ZbBridge')
SOMFY_SHADES = Model(tag='ESPSomfy-RTS MQTT')
SRTS_A01 = Model(tag='SRTS-A01')
TUYA_SOIL = Model(tag='TS0601_soil')

iot2mqtt.topics module

This module provides various classes and functions for managing MQTT topics and configurations used in the scrutinizer module.

It includes classes for managing topic configurations based on protocol and message type, as well as functions for retrieving topic configurations and generating MQTT topics.

class iot2mqtt.topics.CommandTopicManager(*args: Any, **kwargs: Any)[source]

Bases: TopicManager

Manages command topics for different protocols.

This class provides methods to register and retrieve command base topics for various protocols. It acts as a registry for command topic configurations, allowing easy access to the command base topic for a given protocol.

configure_topic_registry() None[source]

Configure the topic registry.

get_command_base_topic(protocol: Protocol) str[source]

Get the command base topic for a given protocol.

register(protocol: Protocol, command_topic_base: str) None[source]

Register a command topic base for a given protocol.

Parameters:
  • protocol (dev.Protocol) – The protocol for which the command topic base is being registered.

  • command_topic_base (str) – The base topic for commands of the given protocol.

Raises:

ValueError – If the protocol is already registered.

class iot2mqtt.topics.InfoTopicManager(*args: Any, **kwargs: Any)[source]

Bases: TopicManager

Manages topic configurations for different protocols and message types.

This class provides methods to register, retrieve, and resolve MQTT topics based on protocol and message type. It acts as a registry for topic configurations, allowing easy access to the topic base, topic extension, and device name offset.

configure_topic_registry() None[source]

Configure the topic registry.

get_all_topics_to_subscribe() List[str][source]

Get a list of all topics to subscribe to.

get_sub_topic(protocol: Protocol, message_type: MessageType, topic: str) str[source]

Get the sub-topic from the given topic based on the protocol and message type.

get_topic_to_subscribe(protocol: Protocol, message_type: MessageType) str[source]

Get the topic to subscribe to based on the protocol and message type.

register(protocol: Protocol, message_type: MessageType, info_topic_base: str, info_topic_extension: str) None[source]

Register a topic configuration.

resolve_wildcards(protocol: Protocol, message_type: MessageType, topic: str, position: int = 0) str[source]

Resolve wildcards in the topic based on the protocol and message type.

class iot2mqtt.topics.TopicManager(*args: Any, **kwargs: Any)[source]

Bases: object

A registry for managing topic configurations based on protocol and message type.

This class provides a centralized way to store and retrieve topic configurations for different combinations of protocol and message type. It acts as a registry for topic configurations, allowing easy access to the topic base, topic extension, and device name offset for a given protocol and message type.

Module contents

This is the initialization module for the iot2mqtt package.