Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## Unreleased

### Added

* Add official MAC-derived UDSK generation and verification helpers.

### Changed

* Prepare the writable session using the app DSK/UDSK flow instead of writing a random UDSK value.

## [1.3.3] - 2026-03-24

### Added
Expand Down
6 changes: 3 additions & 3 deletions ember_mug/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ class MugCharacteristic(IntEnum):
FIRMWARE = 12
# [Unique ID]-[serial number] (Read)
MUG_ID = 13
# DSK - Unique ID used for auth in app (Read)
# DSK - Mug-side auth/session value read by the app (Read)
DSK = 14
# UDSK - Used for auth in app (Read/Write)
# UDSK - App-generated auth/session value derived from the BLE MAC (Read/Write)
UDSK = 15
# int/temp lock - Address (Read/Write)
CONTROL_REGISTER_ADDRESS = 16
Expand Down Expand Up @@ -220,7 +220,7 @@ class PushEvent(IntEnum):
LiquidState.COLD_NO_TEMP_CONTROL: "Cold (No control)",
LiquidState.COOLING: "Cooling",
LiquidState.HEATING: "Heating",
LiquidState.TARGET_TEMPERATURE: "Perfect",
LiquidState.TARGET_TEMPERATURE: "Ready",
LiquidState.WARM_NO_TEMP_CONTROL: "Warm (No control)",
}

Expand Down
114 changes: 98 additions & 16 deletions ember_mug/mug.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import asyncio
import contextlib
import logging
import os
from datetime import UTC, datetime
from enum import StrEnum
from functools import cached_property
Expand Down Expand Up @@ -36,8 +35,10 @@
decode_byte_string,
discover_services,
encode_byte_string,
generate_udsk,
get_model_info_from_advertiser_data,
temp_from_bytes,
verify_udsk,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(
self._queued_updates: set[str] = set()
self._latest_events: dict[int, float] = {}
self._client_kwargs: dict[str, str] = {}
self._session_prepared = False

logger.debug("New mug connection initialized.")
self.set_client_options(**kwargs)
Expand Down Expand Up @@ -132,7 +134,7 @@ def model_name(self) -> str | None:
@property
def can_write(self) -> bool:
"""Check if the mug can support write operations."""
return self.data.udsk is not None
return self._session_prepared

def _convert_to_device_unit(self, value: float) -> float:
"""Convert user value to the unit the device expects."""
Expand Down Expand Up @@ -175,20 +177,96 @@ async def _ensure_connection(self) -> None:
except (TimeoutError, BleakError) as error:
logger.debug("%s: Failed to connect to the mug: %s", self.device, error)
raise error
# Attempt to pair for good measure
try:
await client.pair()
except (BleakError, EOFError):
pass
except NotImplementedError:
# workaround for Home Assistant ESPHome Proxy backend which does not allow pairing.
logger.warning(
"Pairing not implemented. "
"If your mug is still in pairing mode (blinking blue) tap the button on the bottom to exit.",
)
self._client = client
await self._prepare_session()
await self.subscribe()

async def _read_without_lock(self, characteristic: MugCharacteristic) -> bytearray:
"""Read a characteristic during connection setup without re-entering locks."""
data = await self._client.read_gatt_char(characteristic.uuid)
logger.debug("Read attribute '%s' with value '%s'", characteristic, data)
return data

async def _write_without_lock(
self,
characteristic: MugCharacteristic,
data: bytes | bytearray,
*,
response: bool | None = None,
) -> None:
"""Write a characteristic during connection setup without re-entering locks."""
kwargs = {} if response is None else {"response": response}
await self._client.write_gatt_char(characteristic.uuid, bytearray(data), **kwargs)
logger.debug("Wrote '%s' to attribute '%s'", data, characteristic)

async def _prepare_session(self) -> bool:
"""Prepare the official Ember DSK/UDSK session when possible."""
self._session_prepared = False
paired = False
try:
dsk = await self._read_without_lock(MugCharacteristic.DSK)
except BleakError as error:
logger.debug("Unable to read DSK before pairing: %s", error)
if not await self._pair_for_session():
return False
paired = True
try:
dsk = await self._read_without_lock(MugCharacteristic.DSK)
except BleakError as retry_error:
logger.debug("Unable to read DSK after pairing: %s", retry_error)
return False

self.data.dsk = decode_byte_string(dsk)

try:
udsk = generate_udsk(self.device.address)
except ValueError as error:
logger.warning("Unable to prepare writable session: %s", error)
return False

try:
await self._write_without_lock(MugCharacteristic.UDSK, udsk, response=True)
readback = await self._read_without_lock(MugCharacteristic.UDSK)
except BleakError as error:
logger.debug("Unable to write or read UDSK: %s", error)
if paired or not await self._pair_for_session():
return False
try:
await self._write_without_lock(MugCharacteristic.UDSK, udsk, response=True)
readback = await self._read_without_lock(MugCharacteristic.UDSK)
except BleakError as retry_error:
logger.debug("Unable to write or read UDSK after pairing: %s", retry_error)
return False

if not verify_udsk(self.device.address, readback):
logger.warning(
"Generated UDSK verification failed for %s: expected %s got %s",
self.device.address,
udsk.hex(),
bytes(readback).hex(),
)
return False

self.data.udsk = decode_byte_string(readback)
self._session_prepared = True
return True

async def _pair_for_session(self) -> bool:
"""Pair as part of session preparation."""
try:
await self._client.pair()
except (BleakError, EOFError) as error:
logger.debug("Pairing failed while preparing session: %s", error)
return False
except NotImplementedError:
# Home Assistant ESPHome Proxy backend does not allow pairing.
logger.warning(
"Pairing not implemented. "
"If your mug is still in pairing mode (blinking blue) tap the button on the bottom to exit.",
)
return False
return True

async def _read(self, characteristic: MugCharacteristic) -> bytearray:
"""Help read characteristic from Mug."""
self._check_operation_lock()
Expand Down Expand Up @@ -219,6 +297,7 @@ async def disconnect(self, expected: bool = True) -> None:
await self._client.disconnect()
self._client = None # type: ignore[assignment]
self._expected_disconnect = False
self._session_prepared = False

def _disconnect_callback(self, client: BleakClient) -> None:
"""Disconnect from device."""
Expand Down Expand Up @@ -278,9 +357,7 @@ async def make_writable(self) -> bool:
return True
try:
await self._ensure_connection()
# Attempt to write a random string
await self.set_udsk(os.urandom(14).hex())
return True
return self.can_write
except BleakError as e:
logger.debug("Failed to make device writable: %s", e)
return False
Expand Down Expand Up @@ -384,6 +461,11 @@ async def set_udsk(self, udsk: str) -> None:
await self._write(MugCharacteristic.UDSK, bytearray(encode_byte_string(udsk)))
self.data.udsk = udsk

async def set_udsk_raw(self, udsk: bytes | bytearray) -> None:
"""Write a raw UDSK value."""
await self._write(MugCharacteristic.UDSK, bytearray(udsk))
self.data.udsk = decode_byte_string(udsk)

async def get_dsk(self) -> str:
"""Get mug dsk from gatt."""
try:
Expand Down
43 changes: 43 additions & 0 deletions ember_mug/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import base64
import contextlib
import hashlib
import logging
import re
from typing import TYPE_CHECKING, Any
Expand All @@ -19,6 +20,9 @@

logger = logging.getLogger(__name__)

_COMPACT_MAC_REGEX = re.compile(r"^[0-9A-Fa-f]{12}$")
_SEPARATED_MAC_REGEX = re.compile(r"^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$")


def decode_byte_string(data: bytes | bytearray) -> str:
"""Convert bytes to text as Ember expects."""
Expand All @@ -36,6 +40,45 @@ def encode_byte_string(data: str) -> bytes:
return re.sub(b"[\r\n]", b"", base64.encodebytes(data.encode()))


def mac_to_bytes(mac: str | bytes) -> bytes:
"""Convert a BLE MAC address to six raw bytes."""
if isinstance(mac, bytes):
if len(mac) != 6:
msg = "MAC address bytes must be exactly 6 bytes"
raise ValueError(msg)
return mac

compact_mac = mac.replace(":", "").replace("-", "")
if not _COMPACT_MAC_REGEX.fullmatch(compact_mac) or (
not _SEPARATED_MAC_REGEX.fullmatch(mac) and not _COMPACT_MAC_REGEX.fullmatch(mac)
):
msg = f"Expected a six-byte BLE MAC address, got {mac!r}"
raise ValueError(msg)
return bytes.fromhex(compact_mac)


def generate_udsk(mac: str | bytes) -> bytes:
"""Generate the Ember app UDSK from a six-byte BLE MAC address."""
mac_bytes = mac_to_bytes(mac)
mixed = bytes(
[
mac_bytes[0],
mac_bytes[0] ^ mac_bytes[1],
mac_bytes[1] ^ mac_bytes[2],
mac_bytes[2] ^ mac_bytes[3],
mac_bytes[3] ^ mac_bytes[4],
mac_bytes[4] ^ mac_bytes[5],
mac_bytes[5],
],
)
return hashlib.sha256(mixed).digest()[19:32] + mixed


def verify_udsk(mac: str | bytes, udsk: bytes | bytearray) -> bool:
"""Verify a UDSK value matches the BLE MAC-derived Ember UDSK."""
return bytes(udsk) == generate_udsk(mac)


def bytes_to_little_int(data: bytearray | bytes) -> int:
"""Convert bytes to little int."""
return int.from_bytes(data, byteorder="little", signed=False)
Expand Down
Loading