From 1cc0049d608171c30c05fb85d2fb69b036b46117 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 19 Mar 2026 22:05:50 -0600 Subject: [PATCH 1/2] Add minimal C200 DC connect and telemetry support --- SolixBLE/__init__.py | 2 + SolixBLE/device.py | 72 ++++++++++++++++++++++++++++++++---- SolixBLE/devices/__init__.py | 2 + SolixBLE/devices/c200dc.py | 29 +++++++++++++++ 4 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 SolixBLE/devices/c200dc.py diff --git a/SolixBLE/__init__.py b/SolixBLE/__init__.py index 7e30d6b..f43b0ab 100644 --- a/SolixBLE/__init__.py +++ b/SolixBLE/__init__.py @@ -7,6 +7,7 @@ from .device import SolixBLEDevice from .devices import ( C300, + C200DC, C300DC, C800, C1000, @@ -31,6 +32,7 @@ __all__ = [ "SolixBLEDevice", "C300", + "C200DC", "C300DC", "C800", "C1000", diff --git a/SolixBLE/device.py b/SolixBLE/device.py index a563ad9..756557e 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -75,6 +75,7 @@ def __init__(self, ble_device: BLEDevice) -> None: self._connection_attempts: int = 0 self._shared_key: bytes | None = None self._iv: bytes | None = None + self._unencrypted_session: bool = False def add_callback(self, function: Callable[[], None]) -> None: """Register a callback to be run on state updates. @@ -143,6 +144,10 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo _LOGGER.debug( f"Established initial connection to '{self.name}' on attempt {self._connection_attempts}!" ) + + # macOS can report service discovery as incomplete if we try to write + # immediately after connect. Ensure services are available first. + await self._ensure_services_discovered() try: _LOGGER.debug(f"Subscribing to notifications from device '{self.name}'!") await self._client.start_notify( @@ -170,11 +175,22 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo _LOGGER.debug( f"Sending negotiation initiation request to '{self.name}'..." ) - await self._client.write_gatt_char( - UUID_COMMAND, - bytes.fromhex(NEGOTIATION_COMMAND_0), - response=True, - ) + try: + await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_0), + response=True, + ) + except BleakError as exc: + if "Service Discovery has not been performed yet" in str( + exc + ): + _LOGGER.debug( + "Service discovery incomplete while negotiating, retrying..." + ) + await self._ensure_services_discovered() + continue + raise # Wait at this long to see if we get any response to # our initial request in stage 0. This weird layout @@ -249,11 +265,26 @@ def negotiated(self) -> bool: """ return ( self.connected - and self._shared_key is not None - and self._iv is not None + and ( + (self._shared_key is not None and self._iv is not None) + or self._unencrypted_session + ) and self._negotiation_timestamp is not None ) + async def _ensure_services_discovered(self) -> None: + """Ensure BLE services have been discovered when required by backend.""" + if self._client is None: + return + get_services = getattr(self._client, "get_services", None) + if callable(get_services): + try: + await get_services() + except Exception: + _LOGGER.debug( + "Service discovery check failed, continuing with existing state." + ) + @property def available(self) -> bool: """Connected to device and data is available. @@ -573,9 +604,20 @@ async def _process_notification( parameters = self._parse_payload(decrypted_payload) return await self._process_telemetry(cmd, parameters) + # C200 DC telemetry appears to be sent unencrypted on 0402. + case "0402": + _LOGGER.debug("Received plaintext telemetry message!") + parameters = self._parse_payload(payload) + return await self._process_telemetry(cmd, parameters) + # Unknown messages case _: _LOGGER.debug(f"Received unknown message of type: {cmd.hex()}") + if self._shared_key is None or self._iv is None: + _LOGGER.debug( + "Skipping decrypt attempt for unknown message because session key is unavailable." + ) + return try: # If the payload is one byte too short try putting the @@ -668,6 +710,18 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: parameters = self._parse_payload(payload) _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters)}") + # C200 DC appears to use an alternate handshake and does not + # provide a stage-5 public key (a1). For this path we accept + # the session as negotiated without setting AES key/IV. + if "a1" not in parameters: + _LOGGER.debug( + "Device did not provide a stage-5 public key. Assuming unencrypted session." + ) + self._unencrypted_session = True + if self._negotiation_timestamp is None: + self._negotiation_timestamp = time.time() + return + # Extract public key of device from payload device_public_key_bytes = bytes.fromhex("04") + parameters["a1"] _LOGGER.debug(f"Public key of device: {device_public_key_bytes.hex()}") @@ -702,6 +756,9 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: _LOGGER.debug( "Entered negotiation stage 6 (optional) due to response from device!" ) + if self._shared_key is None or self._iv is None: + _LOGGER.debug("Skipping stage 6 decrypt due to missing AES key/IV.") + return decrypted_payload = self._decrypt_payload(payload) parameters = self._parse_payload(decrypted_payload) _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters)}") @@ -974,6 +1031,7 @@ def _reset_session(self, reset_data: bool = True): self._telemetry_payload_large = None self._shared_key = None self._iv = None + self._unencrypted_session = False self._last_packet_timestamp = None self._negotiation_timestamp = None self._packet_futures: dict[bytes, list[asyncio.Future]] = {} diff --git a/SolixBLE/devices/__init__.py b/SolixBLE/devices/__init__.py index b68dd3a..eb27007 100644 --- a/SolixBLE/devices/__init__.py +++ b/SolixBLE/devices/__init__.py @@ -5,6 +5,7 @@ """ from .c300 import C300 +from .c200dc import C200DC from .c300dc import C300DC from .c800 import C800 from .c1000 import C1000 @@ -18,6 +19,7 @@ __all__ = [ "C300", + "C200DC", "C300DC", "C800", "C1000", diff --git a/SolixBLE/devices/c200dc.py b/SolixBLE/devices/c200dc.py new file mode 100644 index 0000000..c931f75 --- /dev/null +++ b/SolixBLE/devices/c200dc.py @@ -0,0 +1,29 @@ +"""C200 DC power station model. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +from ..const import DEFAULT_METADATA_BOOL +from .c300dc import C300DC + + +class C200DC(C300DC): + """ + C200 DC Power Station. + + The C200 DC telemetry layout is largely compatible with C300DC telemetry. + Some firmware variants do not expose all optional fields (e.g. f7). + """ + + _EXPECTED_TELEMETRY_LENGTH: int = 242 + + @property + def dc_12v_auto_on(self) -> bool: + """Configured DC Port Auto On. + + :returns: Status of the DC auto on mode or default bool value if unsupported. + """ + if self._data is None or "f7" not in self._data: + return DEFAULT_METADATA_BOOL + return bool(self._parse_int("f7", begin=1)) From 9b5b26a992857025ebd49b219a00e97d677133f2 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 19 Mar 2026 22:53:48 -0600 Subject: [PATCH 2/2] Trim C200 support patch to required runtime changes --- SolixBLE/device.py | 46 +++++----------------------------------------- 1 file changed, 5 insertions(+), 41 deletions(-) diff --git a/SolixBLE/device.py b/SolixBLE/device.py index 756557e..3ccbee5 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -144,10 +144,6 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo _LOGGER.debug( f"Established initial connection to '{self.name}' on attempt {self._connection_attempts}!" ) - - # macOS can report service discovery as incomplete if we try to write - # immediately after connect. Ensure services are available first. - await self._ensure_services_discovered() try: _LOGGER.debug(f"Subscribing to notifications from device '{self.name}'!") await self._client.start_notify( @@ -175,22 +171,11 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo _LOGGER.debug( f"Sending negotiation initiation request to '{self.name}'..." ) - try: - await self._client.write_gatt_char( - UUID_COMMAND, - bytes.fromhex(NEGOTIATION_COMMAND_0), - response=True, - ) - except BleakError as exc: - if "Service Discovery has not been performed yet" in str( - exc - ): - _LOGGER.debug( - "Service discovery incomplete while negotiating, retrying..." - ) - await self._ensure_services_discovered() - continue - raise + await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_0), + response=True, + ) # Wait at this long to see if we get any response to # our initial request in stage 0. This weird layout @@ -272,19 +257,6 @@ def negotiated(self) -> bool: and self._negotiation_timestamp is not None ) - async def _ensure_services_discovered(self) -> None: - """Ensure BLE services have been discovered when required by backend.""" - if self._client is None: - return - get_services = getattr(self._client, "get_services", None) - if callable(get_services): - try: - await get_services() - except Exception: - _LOGGER.debug( - "Service discovery check failed, continuing with existing state." - ) - @property def available(self) -> bool: """Connected to device and data is available. @@ -613,11 +585,6 @@ async def _process_notification( # Unknown messages case _: _LOGGER.debug(f"Received unknown message of type: {cmd.hex()}") - if self._shared_key is None or self._iv is None: - _LOGGER.debug( - "Skipping decrypt attempt for unknown message because session key is unavailable." - ) - return try: # If the payload is one byte too short try putting the @@ -756,9 +723,6 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: _LOGGER.debug( "Entered negotiation stage 6 (optional) due to response from device!" ) - if self._shared_key is None or self._iv is None: - _LOGGER.debug("Skipping stage 6 decrypt due to missing AES key/IV.") - return decrypted_payload = self._decrypt_payload(payload) parameters = self._parse_payload(decrypted_payload) _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters)}")