diff --git a/SolixBLE/__init__.py b/SolixBLE/__init__.py index 7e30d6b..f43b0ab 100644 --- a/SolixBLE/__init__.py +++ b/SolixBLE/__init__.py @@ -7,6 +7,7 @@ from .device import SolixBLEDevice from .devices import ( C300, + C200DC, C300DC, C800, C1000, @@ -31,6 +32,7 @@ __all__ = [ "SolixBLEDevice", "C300", + "C200DC", "C300DC", "C800", "C1000", diff --git a/SolixBLE/device.py b/SolixBLE/device.py index a563ad9..3ccbee5 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -75,6 +75,7 @@ def __init__(self, ble_device: BLEDevice) -> None: self._connection_attempts: int = 0 self._shared_key: bytes | None = None self._iv: bytes | None = None + self._unencrypted_session: bool = False def add_callback(self, function: Callable[[], None]) -> None: """Register a callback to be run on state updates. @@ -249,8 +250,10 @@ def negotiated(self) -> bool: """ return ( self.connected - and self._shared_key is not None - and self._iv is not None + and ( + (self._shared_key is not None and self._iv is not None) + or self._unencrypted_session + ) and self._negotiation_timestamp is not None ) @@ -573,6 +576,12 @@ async def _process_notification( parameters = self._parse_payload(decrypted_payload) return await self._process_telemetry(cmd, parameters) + # C200 DC telemetry appears to be sent unencrypted on 0402. + case "0402": + _LOGGER.debug("Received plaintext telemetry message!") + parameters = self._parse_payload(payload) + return await self._process_telemetry(cmd, parameters) + # Unknown messages case _: _LOGGER.debug(f"Received unknown message of type: {cmd.hex()}") @@ -668,6 +677,18 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: parameters = self._parse_payload(payload) _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters)}") + # C200 DC appears to use an alternate handshake and does not + # provide a stage-5 public key (a1). For this path we accept + # the session as negotiated without setting AES key/IV. + if "a1" not in parameters: + _LOGGER.debug( + "Device did not provide a stage-5 public key. Assuming unencrypted session." + ) + self._unencrypted_session = True + if self._negotiation_timestamp is None: + self._negotiation_timestamp = time.time() + return + # Extract public key of device from payload device_public_key_bytes = bytes.fromhex("04") + parameters["a1"] _LOGGER.debug(f"Public key of device: {device_public_key_bytes.hex()}") @@ -974,6 +995,7 @@ def _reset_session(self, reset_data: bool = True): self._telemetry_payload_large = None self._shared_key = None self._iv = None + self._unencrypted_session = False self._last_packet_timestamp = None self._negotiation_timestamp = None self._packet_futures: dict[bytes, list[asyncio.Future]] = {} diff --git a/SolixBLE/devices/__init__.py b/SolixBLE/devices/__init__.py index b68dd3a..eb27007 100644 --- a/SolixBLE/devices/__init__.py +++ b/SolixBLE/devices/__init__.py @@ -5,6 +5,7 @@ """ from .c300 import C300 +from .c200dc import C200DC from .c300dc import C300DC from .c800 import C800 from .c1000 import C1000 @@ -18,6 +19,7 @@ __all__ = [ "C300", + "C200DC", "C300DC", "C800", "C1000", diff --git a/SolixBLE/devices/c200dc.py b/SolixBLE/devices/c200dc.py new file mode 100644 index 0000000..c931f75 --- /dev/null +++ b/SolixBLE/devices/c200dc.py @@ -0,0 +1,29 @@ +"""C200 DC power station model. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +from ..const import DEFAULT_METADATA_BOOL +from .c300dc import C300DC + + +class C200DC(C300DC): + """ + C200 DC Power Station. + + The C200 DC telemetry layout is largely compatible with C300DC telemetry. + Some firmware variants do not expose all optional fields (e.g. f7). + """ + + _EXPECTED_TELEMETRY_LENGTH: int = 242 + + @property + def dc_12v_auto_on(self) -> bool: + """Configured DC Port Auto On. + + :returns: Status of the DC auto on mode or default bool value if unsupported. + """ + if self._data is None or "f7" not in self._data: + return DEFAULT_METADATA_BOOL + return bool(self._parse_int("f7", begin=1))