diff --git a/CHANGELOG.md b/CHANGELOG.md index 4112ffd..deed5ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## Unreleased + +### Added + +* Add official MAC-derived UDSK generation and verification helpers. + +### Changed + +* Prepare the writable session using the app DSK/UDSK flow instead of writing a random UDSK value. + ## [1.3.3] - 2026-03-24 ### Added diff --git a/ember_mug/consts.py b/ember_mug/consts.py index d8e78c8..3d7feeb 100644 --- a/ember_mug/consts.py +++ b/ember_mug/consts.py @@ -116,9 +116,9 @@ class MugCharacteristic(IntEnum): FIRMWARE = 12 # [Unique ID]-[serial number] (Read) MUG_ID = 13 - # DSK - Unique ID used for auth in app (Read) + # DSK - Mug-side auth/session value read by the app (Read) DSK = 14 - # UDSK - Used for auth in app (Read/Write) + # UDSK - App-generated auth/session value derived from the BLE MAC (Read/Write) UDSK = 15 # int/temp lock - Address (Read/Write) CONTROL_REGISTER_ADDRESS = 16 @@ -220,7 +220,7 @@ class PushEvent(IntEnum): LiquidState.COLD_NO_TEMP_CONTROL: "Cold (No control)", LiquidState.COOLING: "Cooling", LiquidState.HEATING: "Heating", - LiquidState.TARGET_TEMPERATURE: "Perfect", + LiquidState.TARGET_TEMPERATURE: "Ready", LiquidState.WARM_NO_TEMP_CONTROL: "Warm (No control)", } diff --git a/ember_mug/mug.py b/ember_mug/mug.py index 445c498..b24d385 100644 --- a/ember_mug/mug.py +++ b/ember_mug/mug.py @@ -5,7 +5,6 @@ import asyncio import contextlib import logging -import os from datetime import UTC, datetime from enum import StrEnum from functools import cached_property @@ -36,8 +35,10 @@ decode_byte_string, discover_services, encode_byte_string, + generate_udsk, get_model_info_from_advertiser_data, temp_from_bytes, + verify_udsk, ) if TYPE_CHECKING: @@ -104,6 +105,7 @@ def __init__( self._queued_updates: set[str] = set() self._latest_events: dict[int, float] = {} self._client_kwargs: dict[str, str] = {} + self._session_prepared = False logger.debug("New mug connection initialized.") self.set_client_options(**kwargs) @@ -132,7 +134,7 @@ def model_name(self) -> str | None: @property def can_write(self) -> bool: """Check if the mug can support write operations.""" - return self.data.udsk is not None + return self._session_prepared def _convert_to_device_unit(self, value: float) -> float: """Convert user value to the unit the device expects.""" @@ -175,20 +177,96 @@ async def _ensure_connection(self) -> None: except (TimeoutError, BleakError) as error: logger.debug("%s: Failed to connect to the mug: %s", self.device, error) raise error - # Attempt to pair for good measure - try: - await client.pair() - except (BleakError, EOFError): - pass - except NotImplementedError: - # workaround for Home Assistant ESPHome Proxy backend which does not allow pairing. - logger.warning( - "Pairing not implemented. " - "If your mug is still in pairing mode (blinking blue) tap the button on the bottom to exit.", - ) self._client = client + await self._prepare_session() await self.subscribe() + async def _read_without_lock(self, characteristic: MugCharacteristic) -> bytearray: + """Read a characteristic during connection setup without re-entering locks.""" + data = await self._client.read_gatt_char(characteristic.uuid) + logger.debug("Read attribute '%s' with value '%s'", characteristic, data) + return data + + async def _write_without_lock( + self, + characteristic: MugCharacteristic, + data: bytes | bytearray, + *, + response: bool | None = None, + ) -> None: + """Write a characteristic during connection setup without re-entering locks.""" + kwargs = {} if response is None else {"response": response} + await self._client.write_gatt_char(characteristic.uuid, bytearray(data), **kwargs) + logger.debug("Wrote '%s' to attribute '%s'", data, characteristic) + + async def _prepare_session(self) -> bool: + """Prepare the official Ember DSK/UDSK session when possible.""" + self._session_prepared = False + paired = False + try: + dsk = await self._read_without_lock(MugCharacteristic.DSK) + except BleakError as error: + logger.debug("Unable to read DSK before pairing: %s", error) + if not await self._pair_for_session(): + return False + paired = True + try: + dsk = await self._read_without_lock(MugCharacteristic.DSK) + except BleakError as retry_error: + logger.debug("Unable to read DSK after pairing: %s", retry_error) + return False + + self.data.dsk = decode_byte_string(dsk) + + try: + udsk = generate_udsk(self.device.address) + except ValueError as error: + logger.warning("Unable to prepare writable session: %s", error) + return False + + try: + await self._write_without_lock(MugCharacteristic.UDSK, udsk, response=True) + readback = await self._read_without_lock(MugCharacteristic.UDSK) + except BleakError as error: + logger.debug("Unable to write or read UDSK: %s", error) + if paired or not await self._pair_for_session(): + return False + try: + await self._write_without_lock(MugCharacteristic.UDSK, udsk, response=True) + readback = await self._read_without_lock(MugCharacteristic.UDSK) + except BleakError as retry_error: + logger.debug("Unable to write or read UDSK after pairing: %s", retry_error) + return False + + if not verify_udsk(self.device.address, readback): + logger.warning( + "Generated UDSK verification failed for %s: expected %s got %s", + self.device.address, + udsk.hex(), + bytes(readback).hex(), + ) + return False + + self.data.udsk = decode_byte_string(readback) + self._session_prepared = True + return True + + async def _pair_for_session(self) -> bool: + """Pair as part of session preparation.""" + try: + await self._client.pair() + except (BleakError, EOFError) as error: + logger.debug("Pairing failed while preparing session: %s", error) + return False + except NotImplementedError: + # Home Assistant ESPHome Proxy backend does not allow pairing. + logger.warning( + "Pairing not implemented. " + "If your mug is still in pairing mode (blinking blue) tap the button on the bottom to exit.", + ) + return False + return True + async def _read(self, characteristic: MugCharacteristic) -> bytearray: """Help read characteristic from Mug.""" self._check_operation_lock() @@ -219,6 +297,7 @@ async def disconnect(self, expected: bool = True) -> None: await self._client.disconnect() self._client = None # type: ignore[assignment] self._expected_disconnect = False + self._session_prepared = False def _disconnect_callback(self, client: BleakClient) -> None: """Disconnect from device.""" @@ -278,9 +357,7 @@ async def make_writable(self) -> bool: return True try: await self._ensure_connection() - # Attempt to write a random string - await self.set_udsk(os.urandom(14).hex()) - return True + return self.can_write except BleakError as e: logger.debug("Failed to make device writable: %s", e) return False @@ -384,6 +461,11 @@ async def set_udsk(self, udsk: str) -> None: await self._write(MugCharacteristic.UDSK, bytearray(encode_byte_string(udsk))) self.data.udsk = udsk + async def set_udsk_raw(self, udsk: bytes | bytearray) -> None: + """Write a raw UDSK value.""" + await self._write(MugCharacteristic.UDSK, bytearray(udsk)) + self.data.udsk = decode_byte_string(udsk) + async def get_dsk(self) -> str: """Get mug dsk from gatt.""" try: diff --git a/ember_mug/utils.py b/ember_mug/utils.py index 13517c7..1f97b98 100644 --- a/ember_mug/utils.py +++ b/ember_mug/utils.py @@ -4,6 +4,7 @@ import base64 import contextlib +import hashlib import logging import re from typing import TYPE_CHECKING, Any @@ -19,6 +20,9 @@ logger = logging.getLogger(__name__) +_COMPACT_MAC_REGEX = re.compile(r"^[0-9A-Fa-f]{12}$") +_SEPARATED_MAC_REGEX = re.compile(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$") + def decode_byte_string(data: bytes | bytearray) -> str: """Convert bytes to text as Ember expects.""" @@ -36,6 +40,45 @@ def encode_byte_string(data: str) -> bytes: return re.sub(b"[\r\n]", b"", base64.encodebytes(data.encode())) +def mac_to_bytes(mac: str | bytes) -> bytes: + """Convert a BLE MAC address to six raw bytes.""" + if isinstance(mac, bytes): + if len(mac) != 6: + msg = "MAC address bytes must be exactly 6 bytes" + raise ValueError(msg) + return mac + + compact_mac = mac.replace(":", "").replace("-", "") + if not _COMPACT_MAC_REGEX.fullmatch(compact_mac) or ( + not _SEPARATED_MAC_REGEX.fullmatch(mac) and not _COMPACT_MAC_REGEX.fullmatch(mac) + ): + msg = f"Expected a six-byte BLE MAC address, got {mac!r}" + raise ValueError(msg) + return bytes.fromhex(compact_mac) + + +def generate_udsk(mac: str | bytes) -> bytes: + """Generate the Ember app UDSK from a six-byte BLE MAC address.""" + mac_bytes = mac_to_bytes(mac) + mixed = bytes( + [ + mac_bytes[0], + mac_bytes[0] ^ mac_bytes[1], + mac_bytes[1] ^ mac_bytes[2], + mac_bytes[2] ^ mac_bytes[3], + mac_bytes[3] ^ mac_bytes[4], + mac_bytes[4] ^ mac_bytes[5], + mac_bytes[5], + ], + ) + return hashlib.sha256(mixed).digest()[19:32] + mixed + + +def verify_udsk(mac: str | bytes, udsk: bytes | bytearray) -> bool: + """Verify a UDSK value matches the BLE MAC-derived Ember UDSK.""" + return bytes(udsk) == generate_udsk(mac) + + def bytes_to_little_int(data: bytearray | bytes) -> int: """Convert bytes to little int.""" return int.from_bytes(data, byteorder="little", signed=False) diff --git a/tests/test_connection.py b/tests/test_connection.py index 80a20fa..5fcc174 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -43,11 +43,16 @@ async def test_adapter_without_bluez(ble_device: BLEDevice): EmberMug(ble_device, ModelInfo(), adapter="hci0") +EXPECTED_TEST_UDSK = bytes.fromhex("61a973d8efb8d0e569a59923793204931b3643cb") + + @patch("ember_mug.mug.EmberMug.subscribe") +@patch("ember_mug.mug.EmberMug._prepare_session") @patch("ember_mug.mug.establish_connection") async def test_connect( - mug_subscribe: Mock, mock_establish_connection: Mock, + mug_prepare_session: Mock, + mug_subscribe: Mock, ember_mug: MockMug, ) -> None: # Already connected @@ -56,6 +61,7 @@ async def test_connect( async with ember_mug.connection(): pass mug_subscribe.assert_not_called() + mug_prepare_session.assert_not_called() mock_establish_connection.assert_not_called() # Not connected @@ -65,6 +71,7 @@ async def test_connect( pass mock_establish_connection.assert_called() + mug_prepare_session.assert_called() mug_subscribe.assert_called() assert ember_mug._client is not None mock_disconnect.assert_called() @@ -88,21 +95,17 @@ async def test_connect_error( @patch("ember_mug.mug.logger") @patch("ember_mug.mug.establish_connection") -async def test_pairing_exceptions_esphome( +async def test_prepare_session_pairing_exceptions_esphome( mock_establish_connection: Mock, mock_logger: Mock, ember_mug: MockMug, ) -> None: ember_mug._client.is_connected = False mock_client = AsyncMock() - mock_client.connect.side_effect = BleakError + mock_client.read_gatt_char.side_effect = BleakError mock_client.pair.side_effect = NotImplementedError mock_establish_connection.return_value = mock_client - with patch.multiple( - ember_mug, - update_initial=AsyncMock(), - subscribe=AsyncMock(), - ): + with patch.object(ember_mug, "subscribe", AsyncMock()): await ember_mug._ensure_connection() mock_establish_connection.assert_called_once() @@ -113,21 +116,73 @@ async def test_pairing_exceptions_esphome( @patch("ember_mug.mug.establish_connection") -async def test_pairing_exceptions( +async def test_prepare_session_pairing_exceptions( mock_establish_connection: Mock, ember_mug: MockMug, ) -> None: mock_client = AsyncMock() + mock_client.read_gatt_char.side_effect = BleakError mock_client.pair.side_effect = BleakError mock_establish_connection.return_value = mock_client - with patch.multiple( - ember_mug, - update_initial=AsyncMock(), - subscribe=AsyncMock(), - ): + with patch.object(ember_mug, "subscribe", AsyncMock()): await ember_mug._ensure_connection() +async def test_prepare_session_success(ember_mug: MockMug) -> None: + ember_mug._client.read_gatt_char = AsyncMock(side_effect=[b"dsk", EXPECTED_TEST_UDSK]) + ember_mug._client.write_gatt_char = AsyncMock() + + assert await ember_mug._prepare_session() is True + assert ember_mug.can_write is True + assert ember_mug.data.dsk == "ZHNr" + assert ember_mug.data.udsk == "Yalz2O+40OVppZkjeTIEkxs2Q8s=" + ember_mug._client.pair.assert_not_called() + ember_mug._client.write_gatt_char.assert_called_once_with( + MugCharacteristic.UDSK.uuid, + bytearray(EXPECTED_TEST_UDSK), + response=True, + ) + + +async def test_prepare_session_pairs_after_dsk_failure(ember_mug: MockMug) -> None: + ember_mug._client.read_gatt_char = AsyncMock(side_effect=[BleakError("auth"), b"dsk", EXPECTED_TEST_UDSK]) + ember_mug._client.pair = AsyncMock() + ember_mug._client.write_gatt_char = AsyncMock() + + assert await ember_mug._prepare_session() is True + ember_mug._client.pair.assert_called_once() + assert ember_mug.can_write is True + + +async def test_prepare_session_pairs_after_udsk_write_failure(ember_mug: MockMug) -> None: + ember_mug._client.read_gatt_char = AsyncMock(side_effect=[b"dsk", EXPECTED_TEST_UDSK]) + ember_mug._client.pair = AsyncMock() + ember_mug._client.write_gatt_char = AsyncMock(side_effect=[BleakError("auth"), None]) + + assert await ember_mug._prepare_session() is True + ember_mug._client.pair.assert_called_once() + assert ember_mug._client.write_gatt_char.call_count == 2 + assert ember_mug.can_write is True + + +async def test_prepare_session_invalid_address(ember_mug: MockMug) -> None: + ember_mug.device = BLEDevice(address="00000000-0000-0000-0000-000000000000", name="Ember Ceramic Mug", details={}) + ember_mug._client.read_gatt_char = AsyncMock(return_value=b"dsk") + ember_mug._client.write_gatt_char = AsyncMock() + + assert await ember_mug._prepare_session() is False + assert ember_mug.can_write is False + ember_mug._client.write_gatt_char.assert_not_called() + + +async def test_prepare_session_udsk_mismatch(ember_mug: MockMug) -> None: + ember_mug._client.read_gatt_char = AsyncMock(side_effect=[b"dsk", b"wrong"]) + ember_mug._client.write_gatt_char = AsyncMock() + + assert await ember_mug._prepare_session() is False + assert ember_mug.can_write is False + + async def test_disconnect(ember_mug: MockMug) -> None: mock_client = AsyncMock() ember_mug._client = mock_client @@ -221,10 +276,10 @@ def test_ble_event_callback(ember_mug: MockMug) -> None: def test_can_write(ember_mug: MockMug) -> None: - ember_mug.data.udsk = "non-empty" + ember_mug._session_prepared = True assert ember_mug.can_write is True - ember_mug.data.udsk = None + ember_mug._session_prepared = False assert ember_mug.can_write is False @@ -261,22 +316,22 @@ async def test_get_mug_led_colour(ember_mug: MockMug) -> None: ember_mug._client.read_gatt_char.assert_called_once_with(MugCharacteristic.LED.uuid) -@patch("os.urandom", return_value=b"\x98\x92\x02\x10\xbd\x94\xb9\xf2\xe15\x9b6\x82\xa0") -async def test_make_writable(mock_urandom, ember_mug: MockMug) -> None: +async def test_make_writable(ember_mug: MockMug) -> None: mock_ensure_connection = AsyncMock() - ember_mug._client.write_gatt_char = AsyncMock() with patch.object(ember_mug, "_ensure_connection", mock_ensure_connection): - ember_mug.data.udsk = "non-empty" + ember_mug._session_prepared = True await ember_mug.make_writable() - mock_urandom.assert_not_called() + mock_ensure_connection.assert_not_called() - ember_mug.data.udsk = None + ember_mug._session_prepared = False await ember_mug.make_writable() - mock_urandom.assert_called_once() - ember_mug._client.write_gatt_char.assert_called_once_with( - MugCharacteristic.UDSK.uuid, - bytearray(b"OTg5MjAyMTBiZDk0YjlmMmUxMzU5YjM2ODJhMA=="), - ) + mock_ensure_connection.assert_called_once() + + +async def test_make_writable_returns_false_without_prepared_session(ember_mug: MockMug) -> None: + with patch.object(ember_mug, "_ensure_connection", AsyncMock()): + ember_mug._session_prepared = False + assert await ember_mug.make_writable() is False async def test_pair(ember_mug: MockMug) -> None: @@ -467,6 +522,18 @@ async def test_set_mug_udsk(ember_mug: MockMug) -> None: ) +async def test_set_mug_udsk_raw(ember_mug: MockMug) -> None: + mock_ensure_connection = AsyncMock() + ember_mug._client.write_gatt_char = AsyncMock() + with patch.object(ember_mug, "_ensure_connection", mock_ensure_connection): + await ember_mug.set_udsk_raw(EXPECTED_TEST_UDSK) + mock_ensure_connection.assert_called_once() + ember_mug._client.write_gatt_char.assert_called_once_with( + MugCharacteristic.UDSK.uuid, + bytearray(EXPECTED_TEST_UDSK), + ) + + async def test_get_mug_dsk(ember_mug: MockMug) -> None: with patch.object(ember_mug, "_ensure_connection", AsyncMock()): ember_mug._client.read_gatt_char = AsyncMock(return_value=b"abcd12345") diff --git a/tests/test_consts.py b/tests/test_consts.py index 4d371b0..147a60a 100644 --- a/tests/test_consts.py +++ b/tests/test_consts.py @@ -27,7 +27,7 @@ def test_liquid_state() -> None: assert LiquidState(3).label == "Cold (No control)" assert LiquidState(4).label == "Cooling" assert LiquidState(5).label == "Heating" - assert LiquidState(6).label == "Perfect" + assert LiquidState(6).label == "Ready" assert LiquidState(7).label == "Warm (No control)" diff --git a/tests/test_utils.py b/tests/test_utils.py index f52b53a..0a3cda4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -14,12 +14,15 @@ decode_byte_string, discover_services, encode_byte_string, + generate_udsk, get_colour_from_int, get_model_from_id_and_gen, get_model_from_single_int_and_services, get_model_info_from_advertiser_data, guess_model_from_name, + mac_to_bytes, temp_from_bytes, + verify_udsk, ) from tests.conftest import ( TEST_MUG_ADVERTISEMENT, @@ -58,6 +61,35 @@ def test_encode_byte_string() -> None: assert encode_byte_string("abcd12345") == b"YWJjZDEyMzQ1" +def test_mac_to_bytes() -> None: + expected = bytes.fromhex("f5fc43378da6") + assert mac_to_bytes("F5:FC:43:37:8D:A6") == expected + assert mac_to_bytes("F5-FC-43-37-8D-A6") == expected + assert mac_to_bytes("f5fc43378da6") == expected + assert mac_to_bytes(expected) == expected + + +@pytest.mark.parametrize( + "mac", + [ + "00000000-0000-0000-0000-000000000000", + "f5fc43378da", + b"\x01\x02\x03", + ], +) +def test_mac_to_bytes_errors(mac: str | bytes) -> None: + with pytest.raises(ValueError, match=r"MAC address|Expected a six-byte BLE MAC"): + mac_to_bytes(mac) + + +def test_generate_udsk() -> None: + expected = bytes.fromhex("2bb2f0c09cd191e8d124484e05f509bf74ba2ba6") + assert generate_udsk("F5:FC:43:37:8D:A6") == expected + assert generate_udsk(bytes.fromhex("f5fc43378da6")) == expected + assert verify_udsk("F5:FC:43:37:8D:A6", expected) is True + assert verify_udsk("F5:FC:43:37:8D:A6", b"wrong") is False + + @pytest.mark.parametrize( ("colour_id", "expected_colour"), [