diff --git a/.gitignore b/.gitignore index b9a04d5..098d5fe 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ __pycache__ .coverage SolixBLE.egg-info docs/_build -.venv/ \ No newline at end of file +.venv/ + +scripts/logs/ +scripts/data/ +scripts/tools/ diff --git a/SolixBLE/__init__.py b/SolixBLE/__init__.py index fd7e1b2..1cee83f 100644 --- a/SolixBLE/__init__.py +++ b/SolixBLE/__init__.py @@ -17,8 +17,10 @@ PrimeCharger160w, PrimeCharger250w, Solarbank2, + Solarbank2Prime, Solarbank3, ) +from .devices.solarbank2 import Solarbank2Common from .prime_device import PrimeDevice from .states import ( ChargingStatus, @@ -33,6 +35,7 @@ __all__ = [ "SolixBLEDevice", + "Solarbank2Common", "PrimeDevice", "C300", "C300DC", @@ -42,6 +45,7 @@ "F2000", "F3800", "Solarbank2", + "Solarbank2Prime", "Solarbank3", "PrimeCharger160w", "PrimeCharger250w", diff --git a/SolixBLE/device.py b/SolixBLE/device.py index a1b7da1..0973935 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -626,7 +626,7 @@ async def _process_notification( match cmd.hex(): # Telemetry messages - case "c402" | "4300" | "c405": + case "c402" | "4300" | "c405" | "c840": _LOGGER.debug("Received telemetry message!") return await self._process_telemetry_packet(payload, cmd) diff --git a/SolixBLE/devices/__init__.py b/SolixBLE/devices/__init__.py index 030b7ea..3286d91 100644 --- a/SolixBLE/devices/__init__.py +++ b/SolixBLE/devices/__init__.py @@ -14,7 +14,7 @@ from .generic import Generic from .prime_charger_160w import PrimeCharger160w from .prime_charger_250w import PrimeCharger250w -from .solarbank2 import Solarbank2 +from .solarbank2 import Solarbank2, Solarbank2Prime from .solarbank3 import Solarbank3 __all__ = [ @@ -26,6 +26,7 @@ "F2000", "F3800", "Solarbank2", + "Solarbank2Prime", "Solarbank3", "PrimeCharger160w", "PrimeCharger250w", diff --git a/SolixBLE/devices/solarbank2.py b/SolixBLE/devices/solarbank2.py index 6ea504a..06cb634 100644 --- a/SolixBLE/devices/solarbank2.py +++ b/SolixBLE/devices/solarbank2.py @@ -1,24 +1,63 @@ """Solarbank 2 power station model. +Two variants live in this module: + +* :class:`Solarbank2` - uses the legacy base ``SolixBLEDevice`` handshake + (00xx/08xx negotiation, AES-CBC for session traffic). Does not require + a cloud-tied user-id, so it can connect to any SB2 that accepts the + base handshake. + +* :class:`Solarbank2Prime` - uses the Anker-Prime-style handshake + (40xx/48xx negotiation across 8 stages, AES-GCM for session traffic). + Requires an Anker user-id. + +Both variants share telemetry property accessors, the 0x405e schedule write +builder, and the wall-clock-timestamp ``_send_command`` override via the +:class:`Solarbank2Common` base. + .. moduleauthor:: Harvey Lelliott (flip-dots) """ +import logging +import os +import time from enum import Enum +from bleak.backends.device import BLEDevice +from Crypto.Cipher import AES +from cryptography.hazmat.primitives.asymmetric.ec import ( + ECDH, + SECP256R1, + EllipticCurvePublicKey, + generate_private_key, +) +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + from ..const import ( DEFAULT_METADATA_BOOL, DEFAULT_METADATA_FLOAT, DEFAULT_METADATA_STRING, + UUID_COMMAND, ) from ..device import SolixBLEDevice +from ..prime_device import ( + AAD, + NEGOTIATION_KEY, + NEGOTIATION_NONCE, + NEGOTIATION_PATTERN, + TELEMETRY_PATTERN, + PrimeDevice, +) from ..states import GridStatus, LightMode, SBPowerCutoff, SBUsageMode, TemperatureUnit +_LOGGER = logging.getLogger(__name__) + class MaxLoadSB2(Enum): """ Maximum output power of the Solarbank 2 in watts. - + Only specific values are allowed. """ @@ -38,24 +77,308 @@ class MaxLoadSB2(Enum): W1000 = 1000 -class Solarbank2(SolixBLEDevice): +#: One of the command codes for setting a schedule on an SB2. +CMD_SB2_SET_SCHEDULE = "405e" + +#: Command code for setting the AC output power limit (max load). +CMD_SB2_SET_MAX_LOAD = "4080" + +#: Command code for setting the reserved-power composite (discharge cutoff, +#: low-power-input parameter, and charge cutoff written together). +CMD_SB2_SET_RESERVED_POWER = "4067" + +#: Command code for turning the status light on or off. +CMD_SB2_SET_LIGHT = "4068" + +#: TLV header introducing a 4-byte LE timestamp (tag ``a1``, length ``04``). +#: Recurs across handshake stage plaintexts and post-handshake payloads. +TLV_TIMESTAMP_HEADER = "a104" + +#: TLV for an empty ``a2`` field (tag ``a2``, length ``00``, no value). +#: Appears as a fixed marker in several handshake plaintexts. +TLV_A2_EMPTY = "a200" + +#: Environment variable for the Anker user ID required by Solarbank2Prime. +#: Value should be the ASCII-hex string (40 hex chars, no dashes). +#: The device validates this token against its +#: account-bound whitelist during the Prime-style handshake. +ENV_SB2_ANKER_USER_ID = "SB2_ANKER_USER_ID" + + +class Solarbank2Common(SolixBLEDevice): """ - SolarBank 2 Power Station. + Shared base for both SB2 handshake variants. - Use this class to connect and monitor a Solarbank 2 power station. - This model is also known as the A17C1. + Holds everything that does not depend on the negotiation/crypto variant: - .. note:: - It should be possible to add more sensors. I think devices with lots of - telemetry values split them up into multiple messages but I have not - played around with this yet. That and I am being a bit conservative with - these initial implementations, if you want more sensors and are willing - to help with testing feel free to raise a GitHub issue. + * Telemetry property accessors (parse fields out of ``self._data`` populated + by the parent's ``_process_telemetry``). + * ``set_schedule`` / ``_build_set_schedule_payload`` for the 0x405e write. + * ``_send_command`` override that uses a real wall-clock unix timestamp + in the ``fe 05 03 `` trailer (the SolixBLEDevice base would use + ``BASE_TIMESTAMP + elapsed_seconds`` which is fine for Prime but the SB2 + validates the timestamp against a sane recent-time window). + The concrete subclasses :class:`Solarbank2` (legacy CBC) and + :class:`Solarbank2Prime` (Prime-style GCM) pick the handshake/crypto + behavior by their second base class. """ _EXPECTED_TELEMETRY_LENGTH: int = 253 + # Post-handshake command path + + async def _send_command(self, cmd: bytes, payload: bytes) -> None: + """Send a post-handshake command with a wall-clock ``fe 05 03 `` trailer. + + Overrides the base/Prime schemes that derive the timestamp from a + hardcoded ``BASE_TIMESTAMP`` constant. The SB2 expects a current Unix + timestamp. + + Uses ``self._encrypt_payload`` and ``self._build_packet`` polymorphically: + for :class:`Solarbank2` (legacy) ``_encrypt_payload`` resolves to the + SolixBLEDevice CBC variant; for :class:`Solarbank2Prime` it resolves + to the PrimeDevice GCM variant. + + :param cmd: 2-byte command identifier. + :param payload: Plaintext payload (without the ``fe 05 03 `` trailer). + :raises ConnectionError: If not connected/negotiated to device. + """ + if not self.negotiated: + raise ConnectionError("Not connected to device") + + ts = int(time.time()).to_bytes(4, "little") + full_payload = payload + bytes.fromhex("fe0503") + ts + + encrypted = self._encrypt_payload(full_payload) + packet = self._build_packet( + pattern=bytes.fromhex(TELEMETRY_PATTERN), + cmd=cmd, + payload=encrypted, + ) + _LOGGER.debug(f"SB2 _send_command cmd={cmd.hex()} packet={packet.hex()}") + await self._client.write_gatt_char(UUID_COMMAND, packet) + + # 0x405e set-schedule + + @staticmethod + def _build_set_schedule_payload(power_w: int) -> bytes: + """Build the plaintext payload for cmd 0x405e (set schedule). + + Produces a uniform 7-day schedule (Mon-Sun, all identical) with the + same time range (00:00-24:00) and the requested output power. + + The caller's session timestamp is appended automatically by + ``_send_command`` as the ``fe 05 03 <4-byte LE>`` trailer, so this + function returns the payload *without* that trailer. + + :param power_w: Output wattage (0 = charge-only). + """ + if not (0 <= power_w <= 800): + raise ValueError(f"power_w must be 0-800 W, got {power_w}") + if power_w % 10 != 0: + _LOGGER.warning( + f"Power_w={power_w} is not a multiple of 10! This seems to work " + "and the device accepts it, but the Anker app only ever uses " + "10 W increments. Use with caution!" + ) + + # 8-byte schedule struct (byte layout shared with SB1, but the trailing + # 2 bytes are an unknown constant on SB2 - SB1 calls that slot SOC but + # on SB2 we've only ever seen 0x0050 LE regardless of user settings). + # bytes [0:2] start_min u16 LE + # bytes [2:4] end_min u16 LE + # bytes [4:6] power_W u16 LE + # bytes [6:8] unknown constant, always 0x0050 LE in captures + # 00:00-24:00 -> start=0, end=1440. + sched_struct = ( + (0).to_bytes(2, "little") + + (1440).to_bytes(2, "little") + + power_w.to_bytes(2, "little") + + bytes.fromhex("5000") + ) + + pt = bytearray() + # Header + pt += bytes.fromhex("a10121") + pt += bytes.fromhex("a2020101") + + # 7 fully-symmetric day blocks (Mon-Sun). Each day uses 4 tags starting + # at base = a3 + 4*day: + # aX 02 01 01 enable/include flag for this day (always 1) + # aX+1 09 04 <8B struct> the schedule struct itself + # aX+2 02 01 00 per-day flag (always 0 in captures) + # aX+3 01 04 1-byte trailer (always value 0x04) + for day in range(7): + base = 0xa3 + 4 * day + pt += bytes([base]) + bytes.fromhex("020101") + pt += bytes([base + 1]) + bytes.fromhex("0904") + sched_struct + pt += bytes([base + 2]) + bytes.fromhex("020100") + pt += bytes([base + 3]) + bytes.fromhex("0104") + + # `fd` trailer: 4 fresh random bytes generated per write. The Anker app + # uses a different value every time (confirmed by a 3-write + # capture); reusing a value the device has already seen in this session + # leaves the device in an "abnormal state" where the schedule storage + # gets updated but the inverter target doesn't change, and only a + # subsequent write with a fresh value clears it. + pt += bytes.fromhex("fd0503") + os.urandom(4) + + return bytes(pt) + + async def set_schedule(self, power_w: int) -> None: + """Set a uniform 7-day charge/discharge schedule on the SB2. + + Sends cmd 405e with a payload that configures every day of the week + identically: output `power_w` Watts from 00:00 to 24:00. + + :param power_w: Output wattage (0 = charge-only). + :raises ConnectionError: If not connected/negotiated to the device. + :raises ValueError: For out-of-range power_w. + """ + payload = self._build_set_schedule_payload(power_w) + await self._send_command( + cmd=bytes.fromhex(CMD_SB2_SET_SCHEDULE), payload=payload + ) + + ############################## + # WARNING # + # Functions below are # + # NOT E2E Tested on Device!! # + ############################## + + @staticmethod + def _build_set_light_payload(light_on: bool) -> bytes: + """Build the plaintext payload for cmd 0x4068 (status light on/off). + + Plaintext layout (without the ``fe 05 03 `` trailer added by + ``_send_command``):: + + a1 01 21 command marker + a2 02 01 00 constant 0x00 flag (purpose unknown) + a3 02 01 0 = ON, 1 = OFF (the "light_off_switch" bit) + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + """ + return bytes.fromhex(f"a10121a2020100a30201{0 if light_on else 1:02x}") + + async def set_light_switch(self, light_on: bool) -> None: + """Turn the SB2 status light on or off. + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + + :param light_on: ``True`` to turn the light on, ``False`` to turn it off. + :raises ConnectionError: If not connected/negotiated to the device. + """ + payload = self._build_set_light_payload(light_on) + await self._send_command( + cmd=bytes.fromhex(CMD_SB2_SET_LIGHT), payload=payload + ) + + # Mapping for 3rd field reserved power command + _RESERVED_POWER_A3_MAP: dict[int, int] = {5: 4, 10: 5} + + @staticmethod + def _build_set_reserved_power_payload(level: SBPowerCutoff) -> bytes: + """Build the plaintext payload for cmd 0x4067 (reserved power). + + The Anker app's "Reserved power" writes + three independent fields. This is reflected in the telemetry: + discharge cutoff (b4), low-power-input (b5), and charge cutoff (b6). + a2 and a4 both equal the reserved-power %; a3 seems to follow an + uncharacterized mapping. + + Plaintext layout (without the ``fe 05 03 `` trailer):: + + a1 01 21 + a2 02 01 -> c405 telemetry b4 (discharge cutoff) + a3 02 01 -> c405 telemetry b5 (low-power-input parameter) + a4 02 01 -> c405 telemetry b6 (charge cutoff) + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + """ + if level is SBPowerCutoff.UNKNOWN: + raise ValueError("SBPowerCutoff.UNKNOWN is not a valid setter input") + pct = level.value + a3_value = Solarbank2Common._RESERVED_POWER_A3_MAP.get(pct) + if a3_value is None: + raise ValueError( + f"Reserved-power value {pct}% has no captured a3 mapping." + ) + return bytes.fromhex( + f"a10121a20201{pct:02x}a30201{a3_value:02x}a40201{pct:02x}" + ) + + async def set_reserved_power(self, level: SBPowerCutoff) -> None: + """Set the battery reserved-power percentage. + + Mirrors the Anker app's "Reserved power" toggle. Writes three + telemetry fields (output_cutoff_data, lowpower_input_data, + input_cutoff_data). + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + + :param level: Desired reserved-power level + (:class:`SBPowerCutoff.P5` or :class:`SBPowerCutoff.P10`). + :raises ConnectionError: If not connected/negotiated to the device. + :raises ValueError: If ``level`` is ``UNKNOWN`` or has no captured + a3 mapping. + """ + payload = self._build_set_reserved_power_payload(level) + await self._send_command( + cmd=bytes.fromhex(CMD_SB2_SET_RESERVED_POWER), payload=payload + ) + + @staticmethod + def _build_set_max_load_payload(load: MaxLoadSB2) -> bytes: + """Build the plaintext payload for cmd 0x4080 (AC power limit). + + Plaintext layout (without the ``fe 05 03 `` trailer):: + + a1 01 21 + a2 03 02 output power limit + a3 03 02 00 00 constant zero (flags/secondary cap?) + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + """ + if load is MaxLoadSB2.UNKNOWN: + raise ValueError("MaxLoadSB2.UNKNOWN is not a valid setter input") + watts_le_hex = load.value.to_bytes(2, "little").hex() + return bytes.fromhex(f"a10121a20302{watts_le_hex}a303020000") + + async def set_max_load(self, load: MaxLoadSB2) -> None: + """Set the AC output power limit (max load) in watts. + + .. warning:: + This function has been added using telemetry captures. It has not + been tested! + + :param load: One of the discrete limits in :class:`MaxLoadSB2`. + :raises ConnectionError: If not connected/negotiated to the device. + :raises ValueError: If ``load`` is ``MaxLoadSB2.UNKNOWN``. + """ + payload = self._build_set_max_load_payload(load) + await self._send_command( + cmd=bytes.fromhex(CMD_SB2_SET_MAX_LOAD), payload=payload + ) + + ############################## + # END UNTESTED # + # FUNCTIONs # + ############################## + + # Telemetry property accessors + @property def serial_number(self) -> str: """Device serial number. @@ -381,7 +704,7 @@ def input_cutoff_data(self) -> SBPowerCutoff: def max_load(self) -> MaxLoadSB2: """ Maximum output power in watts. - + Maximum legal value depends on country of operation. :returns: Maximum load as a MaxLoadSB2 enum value. @@ -445,3 +768,289 @@ def battery_heating(self) -> bool | None: else DEFAULT_METADATA_BOOL ) + +class Solarbank2(Solarbank2Common): + """ + SolarBank 2 power station with the legacy handshake. + + Uses the SolixBLEDevice 00xx/08xx negotiation and AES-CBC for session + traffic. Does not require an Anker cloud user-id. + + This is the recommended default for end users - no cloud-side + user-id capture needed. Use :class:`Solarbank2Prime` only if you need + to mimic the Anker app's exact handshake (e.g. for protocol research). + + .. note:: + A pristine, never-paired SB2 may still require a one-time pairing + through the Anker app before any BLE client (including SolixBLE) + can connect. + """ + + async def _initiate_negotiations(self) -> None: + """Start the legacy base SolixBLEDevice handshake.""" + _LOGGER.info("SB2: starting legacy base handshake (00xx/08xx, AES-CBC)") + await super()._initiate_negotiations() + + +class Solarbank2Prime(Solarbank2Common, PrimeDevice): + """ + SolarBank 2 power station with the Anker-Prime-style handshake. + + Uses 40xx/48xx negotiation across 8 stages and AES-GCM for session + traffic. Requires an Anker user-id. SB2 firmware whitelists user-ids + and rejects unknown values with RX 4827 = ``09 a1 02 b4 00``. + + Differences from Anker Prime power stations: + + * Per-session random ECDH private key (Prime uses a hardcoded one). + * Different stage-0 through stage-4 plaintexts (live timestamps, extra + TLVs). + * Stage-6 sets the timezone (TX 4022); stage-7 re-sends the user-id + (TX 4027) session-encrypted. + * Post-stage-5 payloads use the ``fe 05 03 `` trailer rather than + Prime's ``fe 04 ``. + + .. note:: + The Anker app sends TX 4001 (stage 0) and TX 4040 (stage 8) twice + each. We send each once because it seems to work. + """ + + # Per-session state for the SB2 handshake + + def __init__( + self, + ble_device: BLEDevice, + anker_user_id: bytes | str | None = None, + ) -> None: + """Initialize SB2 device with per-instance ECDH key and Anker user ID. + + :param ble_device: The discovered BLE device handle. + :param anker_user_id: Cloud-registered Anker user ID for the user's + Anker account. If ``None``, reads from the + ``SB2_ANKER_USER_ID`` environment variable. If neither is set, + raises ``ValueError``. + :raises ValueError: If no Anker user ID is available via either + source. + """ + super().__init__(ble_device) + # Per-session random ECDH private key (regenerated on each connect via + # ``_initiate_negotiations``). SB2 does NOT use Prime's hardcoded key. + self._ecdh_private_key = generate_private_key(SECP256R1()) + # Negotiation-stage timestamp baked into stage-0..4 TX plaintexts as the + # 4-byte LE int after the a1 tag. Same value across all of stages 0-4. + self._neg_ts_bytes: bytes | None = None + # Anker user ID resolution: explicit arg > env var > error. + # The SB2 firmware whitelists Anker user ID. + if anker_user_id is None: + env_val = os.environ.get(ENV_SB2_ANKER_USER_ID) + if not env_val: + raise ValueError( + "Solarbank2Prime requires the Anker cloud-side userId. " + "Either pass it to the constructor " + f"or set the {ENV_SB2_ANKER_USER_ID} environment variable. " + "Alternatively, use the Solarbank2 (legacy CBC) class - " + "it does not require an Anker user ID." + ) + anker_user_id = env_val + if isinstance(anker_user_id, str): + anker_user_id = anker_user_id.encode("ascii") + self._anker_user_id: bytes = anker_user_id + + # Encryption helpers + + def _encrypt_with_static_key(self, plaintext: bytes) -> bytes: + """AES-GCM encrypt with the static negotiation key (stages 0-4).""" + cipher = AES.new( + bytes.fromhex(NEGOTIATION_KEY), + AES.MODE_GCM, + nonce=bytes.fromhex(NEGOTIATION_NONCE), + ) + cipher.update(bytes.fromhex(AAD)) + ciphertext, tag = cipher.encrypt_and_digest(plaintext) + return ciphertext + tag + + def _build_static_packet(self, cmd_hex: str, plaintext: bytes) -> bytes: + """Build a stage-0..4 packet: encrypt with static key, then frame.""" + return self._build_packet( + pattern=bytes.fromhex(NEGOTIATION_PATTERN), + cmd=bytes.fromhex(cmd_hex), + payload=self._encrypt_with_static_key(plaintext), + ) + + def _build_session_packet(self, cmd_hex: str, plaintext: bytes) -> bytes: + """Build a stage-5+ packet: encrypt with session key, then frame.""" + return self._build_packet( + pattern=bytes.fromhex(NEGOTIATION_PATTERN), + cmd=bytes.fromhex(cmd_hex), + payload=self._encrypt_payload(plaintext), + ) + + # Handshake + + async def _initiate_negotiations(self) -> None: + """SB2 stage 0: send TX 4001 with ``a1 04 a2 00``.""" + _LOGGER.info( + "SB2: starting Prime-style handshake (40xx/48xx, AES-GCM, with Anker user ID)" + ) + # Fresh ECDH key + negotiation timestamp on every (re-)connect. + self._ecdh_private_key = generate_private_key(SECP256R1()) + self._neg_ts_bytes = int(time.time()).to_bytes(4, "little") + + plaintext = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + self._neg_ts_bytes + + bytes.fromhex(TLV_A2_EMPTY) + ) + packet = self._build_static_packet("4001", plaintext) + _LOGGER.debug(f"SB2 stage 0 TX 4001 packet: {packet.hex()}") + await self._client.write_gatt_char(UUID_COMMAND, packet) + + async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: + """SB2 handshake state machine (overrides PrimeDevice's variant).""" + + # Decrypt for logging; _decrypt_payload picks static vs session key + # automatically based on whether _shared_secret has been set. + try: + decrypted = self._decrypt_payload(payload) + _LOGGER.debug( + f"SB2 stage cmd={cmd.hex()} decrypted plaintext={decrypted.hex()}" + ) + except ValueError: + _LOGGER.exception(f"SB2 failed to decrypt stage cmd={cmd.hex()}") + decrypted = b"" + + match cmd.hex(): + + # Stage 1 - RX 4801 -> TX 4003 (a1 04 a2 00 a3 01 20 a4 02 00 f0) + case "4801": + pt = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + self._neg_ts_bytes + + bytes.fromhex(TLV_A2_EMPTY) + + bytes.fromhex("a30120") + + bytes.fromhex("a40200f0") + ) + packet = self._build_static_packet("4003", pt) + _LOGGER.debug(f"SB2 stage 2 TX 4003 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 2 - RX 4803 -> TX 4029 (a1 04 a2 ) + case "4803": + pt = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + self._neg_ts_bytes + + bytes.fromhex("a2") + + bytes([len(self._anker_user_id)]) + + self._anker_user_id + ) + packet = self._build_static_packet("4029", pt) + _LOGGER.debug(f"SB2 stage 3 TX 4029 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 3 - RX 4829 -> TX 4005 + case "4829": + pt = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + self._neg_ts_bytes + + bytes.fromhex(TLV_A2_EMPTY) + + bytes.fromhex("a30120") + + bytes.fromhex("a40200f0") + + bytes.fromhex("a50140") + + bytes.fromhex("a60102") + ) + packet = self._build_static_packet("4005", pt) + _LOGGER.debug(f"SB2 stage 4 TX 4005 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 4 - RX 4805 -> TX 4021 (phone ECDH pubkey, raw X||Y) + case "4805": + # strip 0x04 prefix -> 64 B X||Y + phone_pub = self._ecdh_private_key.public_key().public_bytes( + Encoding.X962, PublicFormat.UncompressedPoint + ) + phone_pub_xy = phone_pub[1:] + pt = bytes.fromhex("a140") + phone_pub_xy + packet = self._build_static_packet("4021", pt) + _LOGGER.debug(f"SB2 stage 5 TX 4021 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 5 - RX 4821 -> derive shared secret, then TX 4022 (timezone) + case "4821": + # Parse plaintext: > + parameters = self._parse_payload(decrypted[1:]) + device_pub_xy = parameters["a1"] + device_pubkey = EllipticCurvePublicKey.from_encoded_point( + SECP256R1(), bytes.fromhex("04") + device_pub_xy + ) + self._shared_secret = self._ecdh_private_key.exchange( + ECDH(), device_pubkey + ) + self._negotiation_timestamp = time.time() + _LOGGER.debug( + f"SB2 ECDH shared secret derived: {self._shared_secret.hex()}" + ) + + # TX 4022 - set timezone. Plaintext layout: + # a1 04 a2 00 a3 04 + # a5 + # The tz_offset is signed LE seconds; CEST in capture was -7200 + # (= -2h). We hardcode that for now - proper localtime detection + # is a TODO. + tz_str = b"CET-1CEST,M3.5.0,M10.5.0/3" + tz_offset = (-7200).to_bytes(4, "little", signed=True) + pt = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + int(time.time()).to_bytes(4, "little") + + bytes.fromhex(TLV_A2_EMPTY) + + bytes.fromhex("a304") + + tz_offset + + bytes([0xa5, len(tz_str)]) + + tz_str + ) + packet = self._build_session_packet("4022", pt) + _LOGGER.debug(f"SB2 stage 6 TX 4022 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 6 - RX 4822 -> TX 4027 (re-send user-id, session-encrypted) + case "4822": + pt = ( + bytes.fromhex(TLV_TIMESTAMP_HEADER) + + int(time.time()).to_bytes(4, "little") + + bytes.fromhex("a2") + + bytes([len(self._anker_user_id)]) + + self._anker_user_id + ) + packet = self._build_session_packet("4027", pt) + _LOGGER.debug(f"SB2 stage 7 TX 4027 packet: {packet.hex()}") + return await self._client.write_gatt_char(UUID_COMMAND, packet) + + # Stage 7 - RX 4827 -> TX 4040 (start telemetry stream) + case "4827": + # _send_command handles the fe0503 timestamp trailer + 03000f + # pattern. + _LOGGER.debug("SB2 stage 8 - sending TX 4040 to start telemetry") + return await self._send_command( + cmd=bytes.fromhex("4040"), payload=bytes.fromhex("a10121") + ) + + case _: + _LOGGER.warning( + f"SB2 unexpected negotiation cmd: {cmd.hex()} " + f"plaintext={decrypted.hex()}" + ) + + # Telemetry + + async def _process_telemetry_packet( + self, payload: bytes, cmd: bytes = None + ) -> None: + """Handle SB2 multi-fragment telemetry (cmd c405 + c840, pattern 03010f). + + PrimeDevice's variant assumes single-packet telemetry, but SB2 splits + large telemetry across multiple ``03010f`` packets with a per-packet + ```` nibble byte after CMD. Delegate to the base + SolixBLEDevice implementation, which already implements reassembly. + After reassembly it calls ``self._decrypt_payload``, which on this + class is PrimeDevice's AES-GCM variant. + """ + return await SolixBLEDevice._process_telemetry_packet(self, payload, cmd) diff --git a/docs/source/app_decoding.rst b/docs/source/app_decoding.rst index a5261d2..963d922 100644 --- a/docs/source/app_decoding.rst +++ b/docs/source/app_decoding.rst @@ -230,6 +230,35 @@ Below are example outputs of the patched app with the Frida script. Data (Hex): ff095c0003000140210ac647f6ccbed11bae9f95d175e31b768e6fb309f82e4d8776e1999923b2fc7b34ecd8c19dc1923cd3e1370ab601eb2454eebe3f0df91572b04f8c2fddb802cc5e8ac304fe7f9c34f41794528e1fc69e84171a +When Java Cipher hooks aren't enough +------------------------------------ + +The hooks in ``frida.js`` intercept Java-side ``javax.crypto.Cipher`` +calls, which is sufficient for devices whose Anker-app integration +performs BLE crypto in Java. Other devices (notably the +:doc:`Solarbank 2 `) instead perform their per-session BLE +AES inside the **Dart AOT** code in ``libapp.so`` using the +``pointycastle`` library. The Java Cipher hooks never fire for that +traffic, so the session key cannot be recovered through them. + +For these devices the session key and IV are installed via Dart calls +such as ``BleInfoHelper::setAesKey(mac, key)`` and ``setAesIV(mac, iv)`` +shortly after the ECDH stage completes. Hooking them requires: + +1. Extracting the Dart AOT symbol table from ``libapp.so`` to locate the + target methods. Use `apktool`` for unpacking the APK in + the first place. Afterwards `blutter `_ + reverses Flutter/Dart AOT snapshots and produces a method table with + offsets and class-id (cid) information that is then implemented in the + Frida script. +2. A Frida script that attaches at those offsets via + ``Module.findBaseAddress("libapp.so").add()`` and decodes + the Dart ``List`` argument according to its cid. These scripts + will probably only work for a certain app and ABI version. + You will need to update the adresses for every app change. The sample + SB2 capture script ``scripts/frida_3.js`` in the repository works + for Anker App 3.18.0 + Scripts ------- diff --git a/docs/source/index.rst b/docs/source/index.rst index b2ab9f3..27d3386 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -115,11 +115,11 @@ Consumed energy ✅ ❌ Error codes ✅ ❌ Max load ✅ ❌ Usage mode ✅ ❌ -Presets ❌ ❌ +Power Schedule control ✅ ❌ Light mode ✅ ❌ PV limitations ❌ ❌ PV panel power ✅ ❌ -AC limitations ❌ ❌ +AC limitations ✅ ❌ Software version ✅ ❌ Software version controller ✅ ❌ Software version expansion ✅ ❌ diff --git a/docs/source/new_devices.rst b/docs/source/new_devices.rst index 91a9f4e..add4843 100644 --- a/docs/source/new_devices.rst +++ b/docs/source/new_devices.rst @@ -45,6 +45,13 @@ section for more information on how to capture these commands and the :doc:`C300 :doc:`800 `, and :doc:`C1000 ` source code for examples of how to implement the control functions. +.. note:: + Some Anker devices accept more than one BLE handshake protocol, and a new device + class may be needed per protocol. The :doc:`Solarbank 2 ` is an example: + its firmware accepts both the legacy base-Solix handshake and an + Anker-Prime-style handshake (8-stage), + exposed as ``Solarbank2`` and ``Solarbank2Prime`` respectively. + Example telemetry data ---------------------- diff --git a/docs/source/solarbank2.rst b/docs/source/solarbank2.rst index a2f617e..d6e37bc 100644 --- a/docs/source/solarbank2.rst +++ b/docs/source/solarbank2.rst @@ -1,9 +1,23 @@ Solarbank 2 =========== -.. autoclass:: SolixBLE.Solarbank2 - :members: +.. autoclass:: SolixBLE.Solarbank2Common + :members: :inherited-members: connect, disconnect, add_callback, remove_callback, connected, available, address, name, supports_telemetry, last_update :special-members: __init__ :member-order: groupwise :no-index: + + +.. autoclass:: SolixBLE.Solarbank2 + :members: + :special-members: __init__ + :member-order: groupwise + :no-index: + + +.. autoclass:: SolixBLE.Solarbank2Prime + :members: + :special-members: __init__ + :member-order: groupwise + :no-index: diff --git a/scripts/frida_sb2_dart.js b/scripts/frida_sb2_dart.js new file mode 100644 index 0000000..679ed45 --- /dev/null +++ b/scripts/frida_sb2_dart.js @@ -0,0 +1,548 @@ +/* + * Script name : frida_sb2_dart.js + * Description : Frida script to analyze BLE traffic of the + * Anker app's Dart AOT code in libapp.so. + * Author : jul1an-s + * Date : 19/05/26 + * + * License : MIT + * Revision : 1.0.0 + * + * App version : This probably only works with Anker app version 3.18.0 + * + * For addtional app versions you need to analyze the app with apktool and blutter + * + * A one-shot AES module scan runs at load + on first BLE RX as a sanity check + * that libcrypto + libapp are present. + * + * libapp.so offsets are APK-version-specific. Use apktool and blutter + * to determine the correct addresses for your version. + * + * Header layout, BLE hook style, and anti-tamper pattern adapted from frida.js + * by Harvey Lelliott (@flip-dots), MIT-licensed. + * + * Known issue: plaintext parsing is not implemented. Use the captured + * session key + IV to decrypt the encrypted payloads externally. +*/ + +// Utilities + +function log(msg) { + console.log(msg); +} + +function toHex(byteArray) { + if (!byteArray) return "null"; + try { + var result = ""; + for (var i = 0; i < byteArray.length; i++) { + result += ('0' + (byteArray[i] & 0xFF).toString(16)).slice(-2); + } + return result; + } catch (e) { + return "Error parsing byte array"; + } +} + +function hexdumpNative(ptr, len) { + if (ptr.isNull()) return "null"; + try { + var bytes = ptr.readByteArray(len); + if (!bytes) return "null"; + var arr = new Uint8Array(bytes); + var hex = ""; + for (var i = 0; i < arr.length; i++) { + hex += ('0' + arr[i].toString(16)).slice(-2); + } + return hex; + } catch (e) { + return "read error: " + e; + } +} + +// Diagnostic tracking - structured tags ([HOOK-ATTACHED], [HOOK-FIRED], +// [HOOK-FIRE-SUMMARY]) for grepping a session log. + +var attachedHooks = []; +var hookFireCounts = {}; +var hookFiredFirst = {}; + +function noteAttached(name) { + attachedHooks.push(name); + log("[HOOK-ATTACHED] " + name); +} + +function noteFired(name) { + hookFireCounts[name] = (hookFireCounts[name] || 0) + 1; + if (!hookFiredFirst[name]) { + hookFiredFirst[name] = true; + log("[HOOK-FIRED] first fire: " + name); + } +} + +// Periodic firing summary - every 15s. Quiet if nothing changed. +var lastSummarySnapshot = {}; +setInterval(function() { + var keys = Object.keys(hookFireCounts); + if (keys.length === 0) { + log("[HOOK-FIRE-SUMMARY] " + attachedHooks.length + " hooks attached, 0 fired so far"); + return; + } + var changed = false; + var snapshot = {}; + for (var i = 0; i < keys.length; i++) { + snapshot[keys[i]] = hookFireCounts[keys[i]]; + if (lastSummarySnapshot[keys[i]] !== hookFireCounts[keys[i]]) changed = true; + } + if (changed) { + log("[HOOK-FIRE-SUMMARY] " + JSON.stringify(snapshot)); + lastSummarySnapshot = snapshot; + } +}, 15000); + +// AES library identification - scan loaded modules for the AES S-box, the +// ARMv8 AESE/AESD instruction signature, and the ChaCha20 constant. Runs +// once at script load and again on the first BLE RX (to catch libs that +// load on demand). +var aesScanDone = false; +var prevModuleSet = null; +var rescanDone = false; +function maybeRescanCrypto() { + if (rescanDone) return; + rescanDone = true; + log("\n[+] --- AES MODULE RESCAN (BLE session detected) ---"); + try { + var current = {}; + Process.enumerateModules().forEach(function(m) { current[m.name] = m.base.toString(); }); + if (prevModuleSet) { + var newMods = []; + Object.keys(current).forEach(function(name) { + if (!prevModuleSet[name]) newMods.push(name); + }); + if (newMods.length === 0) { + log("[AES-RESCAN] no new modules loaded since startup scan"); + } else { + log("[AES-RESCAN] " + newMods.length + " new modules loaded since startup:"); + newMods.forEach(function(n) { log("[AES-RESCAN] " + n); }); + } + } + // Re-arm and re-run the AES/ChaCha scan + aesScanDone = false; + scanModulesForAES(); + } catch (e) { + log("[AES-RESCAN] error: " + e + (e.stack ? "\n stack:\n" + e.stack : "")); + } +} + +function scanModulesForAES() { + // Snapshot module set on first run so the rescan can diff against it. + if (prevModuleSet === null) { + try { + prevModuleSet = {}; + Process.enumerateModules().forEach(function(m) { prevModuleSet[m.name] = m.base.toString(); }); + } catch (e) {} + } + if (aesScanDone) return; + aesScanDone = true; + + var SBOX_PATTERN = "63 7c 77 7b f2 6b 6f c5 30 01 67 2b fe d7 ab 76"; + var AES_INSTR_PATTERN = "28 4e"; + // ChaCha20 initial state constant "expand 32-byte k" + var CHACHA20_CONST = "65 78 70 61 6e 64 20 33 32 2d 62 79 74 65 20 6b"; + + log("\n[+] --- AES MODULE SCAN ---"); + var modules; + try { + modules = Process.enumerateModules(); + } catch (e) { + log("[AES-SCAN] enumerateModules failed: " + e); + return; + } + log("[AES-SCAN] " + modules.length + " loaded modules"); + + var hits = []; + modules.forEach(function(mod) { + try { + var sbox = 0, instr = 0, chacha = 0; + var ranges; + try { ranges = mod.enumerateRanges('r--'); } catch (e) { return; } + ranges.forEach(function(r) { + try { sbox += Memory.scanSync(r.base, r.size, SBOX_PATTERN).length; } catch (e) {} + try { instr += Memory.scanSync(r.base, r.size, AES_INSTR_PATTERN).length; } catch (e) {} + try { chacha += Memory.scanSync(r.base, r.size, CHACHA20_CONST).length; } catch (e) {} + }); + if (sbox > 0 || instr > 20 || chacha > 0) { + hits.push({ name: mod.name, base: mod.base.toString(), size: mod.size, + sbox: sbox, instr: instr, chacha: chacha }); + } + } catch (e) {} + }); + + if (hits.length === 0) { + log("[AES-SCAN] NO libraries contain crypto code (suspicious - at minimum libcrypto.so should hit)"); + return; + } + // Sort by combined evidence (SBOX heavily weighted; ChaCha20 const is also a strong signal) + hits.sort(function(a, b) { + return (b.sbox * 1000 + b.chacha * 1000 + b.instr) - (a.sbox * 1000 + a.chacha * 1000 + a.instr); + }); + log("[AES-SCAN] " + hits.length + " libraries contain crypto code (sorted by evidence):"); + hits.forEach(function(h) { + log("[AES-SCAN] " + h.name + " base=" + h.base + " size=" + h.size + + " SBOX=" + h.sbox + " AES_instr=" + h.instr + " ChaCha20=" + h.chacha); + }); +} + + +// Hook installers + +var installed = { antitamper: false, ble: false, dartCrypto: false }; + +function installAntiTamper() { + if (installed.antitamper) return; + Java.perform(function() { + try { + var System = Java.use('java.lang.System'); + var AndroidProcess = Java.use('android.os.Process'); + System.exit.implementation = function(code) { + noteFired("System.exit"); + log("[!] Intercepted exit (" + code + ")"); + }; + AndroidProcess.killProcess.implementation = function(pid) { + noteFired("Process.killProcess"); + log("[!] Intercepted killProcess for PID: " + pid); + }; + installed.antitamper = true; + noteAttached("anti-tamper (System.exit, Process.killProcess)"); + // Anti-tamper-safe to scan loaded modules for AES code. + scanModulesForAES(); + } catch (e) { + log("[-] anti-tamper hook error: " + e); + } + }); +} + +function installBleHooks() { + if (installed.ble) return; + Java.perform(function() { + try { + var BluetoothGatt = Java.use('android.bluetooth.BluetoothGatt'); + var BluetoothGattCharacteristic = Java.use('android.bluetooth.BluetoothGattCharacteristic'); + + BluetoothGatt.writeCharacteristic.overloads.forEach(function(overload) { + overload.implementation = function() { + noteFired("BluetoothGatt.writeCharacteristic"); + var char = arguments[0]; + var data = (arguments.length >= 2) ? arguments[1] : char.getValue(); + var uuid = char.getUuid().toString(); + // Filter to the Anker service characteristic UUIDs (per past captures: contains "8c850") + if (uuid.indexOf("8c850") >= 0) { + log("\n[BLE WRITE] UUID: " + uuid); + log("Data (Hex): " + toHex(data)); + } + return overload.apply(this, arguments); + }; + }); + + BluetoothGattCharacteristic.setValue.overloads.forEach(function(overload) { + overload.implementation = function() { + noteFired("BluetoothGattCharacteristic.setValue"); + var uuid = this.getUuid().toString(); + var value = arguments[0]; + if (uuid.indexOf("8c850") >= 0 && value !== null && typeof value === 'object') { + log("\n[BLE NOTIFY] UUID: " + uuid); + log("Data (Hex): " + toHex(value)); + // First BLE RX - rescan modules for late-loaded crypto libs. + maybeRescanCrypto(); + } + return overload.apply(this, arguments); + }; + }); + + installed.ble = true; + noteAttached("android.bluetooth.BluetoothGatt.writeCharacteristic + BluetoothGattCharacteristic.setValue (filter 8c850)"); + } catch (e) { + log("[-] BLE hook error: " + e); + } + }); +} + + +// Dart AOT hooks (libapp.so via blutter offsets, APK-version-specific) + +var dartHeapBase = null; +var DART_CID_POS = 12; +var DART_CID_MASK = 0xfffff; +// CIDs from blutter_frida.js class table (Dart 3.4.4, this libapp.so): +var DART_CID_ARRAY = 89; // immutable List +var DART_CID_GROWABLE_LIST = 91; // mutable List +var DART_CID_STRING = 93; +var DART_CID_TWO_BYTE_STRING = 94; +var DART_CID_UINT8LIST = 115; // _Uint8List +var DART_CID_NULL = 170; + +function dartInit(context) { if (dartHeapBase === null) dartHeapBase = context.x28.shl(32); } +function dartDecompress(tptr) { return dartHeapBase.add(tptr.toInt32()); } +function dartIsHeap(tptr) { return (tptr.toInt32() & 1) === 1; } +function dartGetCid(ptr) { return (ptr.readU32() >>> DART_CID_POS) & DART_CID_MASK; } + +// Read up to 'maxBytes' bytes from a List/GrowableList of int. Each element +// is a Smi stored as a 4-byte compressed pointer (Dart 3.4 ARM64 AOT); raw +// 32-bit value has low bit=0 and value = raw >> 1. +function readListInt(ptr, len, maxBytes) { + var n = Math.min(len, maxBytes); + var hex = ""; + try { + for (var i = 0; i < n; i++) { + var raw = ptr.add(16 + i * 4).readU32(); + var b = (raw >>> 1) & 0xff; + hex += ('0' + b.toString(16)).slice(-2); + } + if (len > maxBytes) hex += "..."; + return hex; + } catch (e) { + return "(readListInt err: " + e + ")"; + } +} + +function dartReadObject(tptr) { + try { + if (!dartIsHeap(tptr)) return { type: "Smi", value: tptr.toInt32() >> 1 }; + var ptr = dartDecompress(tptr).sub(1); + var cid = dartGetCid(ptr); + if (cid === DART_CID_NULL) return { type: "Null" }; + if (cid === DART_CID_STRING) { + var len = ptr.add(8).readU32() >> 1; + return { type: "String", value: ptr.add(16).readUtf8String(len) }; + } + if (cid === DART_CID_TWO_BYTE_STRING) { + var len = ptr.add(8).readU32() >> 1; + return { type: "TwoByteString", value: ptr.add(16).readUtf16String(len) }; + } + if (cid === DART_CID_UINT8LIST) { + var len = ptr.add(20).readU32() >> 1; + var data = ptr.add(24); + var hex = ""; + for (var i = 0; i < Math.min(len, 512); i++) { + hex += ('0' + data.add(i).readU8().toString(16)).slice(-2); + } + if (len > 512) hex += "..."; + return { type: "Uint8List", length: len, value: hex }; + } + if (cid === DART_CID_GROWABLE_LIST || cid === DART_CID_ARRAY) { + // length stored at offset 12 (per blutter lenOffset) + var len = ptr.add(12).readU32() >> 1; + if (cid === DART_CID_ARRAY) { + // _List (cid 89): elements stored inline at ptr+16. Decoder works. + return { type: "List", length: len, value: readListInt(ptr, len, 512) }; + } + // _GrowableList (cid 91): offset 16 holds a tagged pointer to the + // backing Array. Follow it if we can; if the indirection fails do + // NOT fall back to reading ptr+16 directly — that reads pointer + // and header bytes as if they were Smi-encoded data and emits + // adjacent-memory garbage that pollutes the log. + try { + var dataPtr = ptr.add(16).readU32(); + if (dataPtr & 1) { + var backing = dartDecompress(ptr.add(16).readU64()).sub(1); + return { type: "GrowableList", length: len, value: readListInt(backing, len, 512) }; + } + } catch (e) {} + return { type: "GrowableList", length: len, value: null }; + } + return { type: "cid:" + cid, value: null }; + } catch (e) { + return { type: "error", value: e.toString() }; + } +} + +function dartFormatObj(obj) { + if (obj.type === "String") return '"' + obj.value + '"'; + if (obj.type === "Uint8List") return "Uint8List(" + obj.length + "): " + obj.value; + if (obj.type === "Smi") return "int:" + obj.value; + if (obj.value !== null) return obj.type + "=" + obj.value; + return obj.type; +} + +// Dart function offsets in libapp.so, derived from blutter analysis. +// setAesKey/setAesIV signature is (bleMac, keyBytes) so the key/iv is in x3. +// AesHelper::aesGcm{Encrypt,Decrypt} are static, args in x2 + x3. +var DART_HOOK_SPECS = [ + // BLE session lifecycle + { name: "ZXBleConfer::startZXBleConfer", off: 0x1ecd430 }, + { name: "ZXBleConfer::sendConnectCommand", off: 0x1ecd958 }, + { name: "BleConfer::addBleConnectStatusListener", off: 0x3641d5c }, + { name: "SecureEcdhConfer::startConfer", off: 0x3644e7c }, + // Session key/IV storage + { name: "BleInfoHelper::setAesKey", off: 0x1d45ddc, decodeArg: 3 }, + { name: "BleInfoHelper::setAesIV", off: 0x1d45cf0, decodeArg: 3 }, + { name: "BleInfoHelper::getAesKey", off: 0x199fec0, decodeRet: true }, + { name: "BleInfoHelper::getAesIV", off: 0x199ff18, decodeRet: true }, + // GCM path used for session-encrypted SB2 commands (405e, 4022, 4027) + { name: "AesHelper::aesGcmEncrypt", off: 0x199ffc8, decodeArg: 2, decodeArg2: 3 }, + { name: "AesHelper::aesGcmDecrypt", off: 0x1ea6e30, decodeArg: 2, decodeArg2: 3 } +]; + +// Cache of last decoded retval per spec.name, used to suppress duplicate +// enter+leave logs (getAesKey/getAesIV fire many times back-to-back). +var lastDecodedRet = {}; + +function installDartCryptoHooks() { + if (installed.dartCrypto) return false; + var mod = Process.findModuleByName('libapp.so'); + if (!mod) return false; + var libapp = mod.base; + log("\n[+] --- LIBAPP.SO LOADED ---"); + log("Base address: " + libapp); + + // Decode a Dart tagged-pointer register; falls back to raw hex. + function tryDecodeArg(ctx, regName) { + var raw = ctx[regName]; + try { + dartInit(ctx); + var obj = dartReadObject(raw); + if (obj && obj.type === "Uint8List") { + return regName + "=Uint8List(" + obj.length + "B):" + obj.value; + } + if (obj && (obj.type === "GrowableList" || obj.type === "List")) { + if (obj.value === null) { + return regName + "=" + obj.type + "(" + obj.length + "B)"; + } + return regName + "=" + obj.type + "(" + obj.length + "B):" + obj.value; + } + if (obj && obj.type === "String") { + return regName + '=String:"' + obj.value + '"'; + } + if (obj && obj.type === "Smi") { + return regName + "=Smi:" + obj.value; + } + if (obj && obj.type === "Null") { + return regName + "=Null"; + } + // obj.type already starts with "cid:" for unknown types - don't double-prefix + return regName + "=" + raw + " (" + (obj ? obj.type : "?") + ")"; + } catch (e) { + return regName + "=" + raw; + } + } + + var count = 0; + DART_HOOK_SPECS.forEach(function(spec) { + try { + Interceptor.attach(libapp.add(spec.off), { + onEnter: function() { + noteFired("libapp.so:" + spec.name); + var c = this.context; + var parts = ["x1=" + c.x1]; + var decodedRegs = {}; + if (spec.decodeArg !== undefined) { + var reg = "x" + spec.decodeArg; + parts.push(tryDecodeArg(c, reg)); + decodedRegs[reg] = true; + } + if (spec.decodeArg2 !== undefined) { + var reg2 = "x" + spec.decodeArg2; + parts.push(tryDecodeArg(c, reg2)); + decodedRegs[reg2] = true; + } + ["x2","x3","x4"].forEach(function(r) { + if (!decodedRegs[r]) parts.push(r + "=" + c[r]); + }); + var line = "[DART-FN] " + spec.name + " " + parts.join(" "); + // Defer for decodeRet so onLeave can suppress unchanged pairs. + if (spec.decodeRet) { + this._enterLine = line; + } else { + log(line); + } + }, + onLeave: function(retval) { + if (!spec.decodeRet) return; + var retLine, cacheKey; + try { + var c = this.context; + dartInit(c); + var obj = dartReadObject(ptr(retval.toString())); + if (obj && obj.type === "Uint8List") { + cacheKey = "u8:" + obj.value; + retLine = "[DART-FN] " + spec.name + " → Uint8List(" + obj.length + "B): " + obj.value; + } else if (obj && (obj.type === "List" || obj.type === "GrowableList")) { + cacheKey = obj.type + ":" + (obj.value === null ? ":" + obj.length : obj.value); + if (obj.value === null) { + retLine = "[DART-FN] " + spec.name + " → " + obj.type + "(" + obj.length + "B)"; + } else { + retLine = "[DART-FN] " + spec.name + " → " + obj.type + "(" + obj.length + "B): " + obj.value; + } + } else if (obj && obj.type === "String") { + cacheKey = "s:" + obj.value; + retLine = "[DART-FN] " + spec.name + ' → String:"' + obj.value + '"'; + } else { + cacheKey = "raw:" + retval + ":" + (obj ? obj.type : "?"); + retLine = "[DART-FN] " + spec.name + " → " + retval + " (cid:" + (obj ? obj.type : "?") + ")"; + } + } catch (e) { + cacheKey = "err:" + retval + ":" + e; + retLine = "[DART-FN] " + spec.name + " → " + retval + " (decode err: " + e + ")"; + } + if (lastDecodedRet[spec.name] === cacheKey) return; + lastDecodedRet[spec.name] = cacheKey; + if (this._enterLine) log(this._enterLine); + log(retLine); + } + }); + noteAttached("libapp.so:" + spec.name + " @ 0x" + spec.off.toString(16)); + count++; + } catch (e) { + log("[-] failed to hook " + spec.name + ": " + e); + } + }); + + installed.dartCrypto = true; + log("[+] installed " + count + " Dart hooks in libapp.so"); + return count > 0; +} + +// Hook orchestration - ijiami delays DEX unpacking and lib loading, so we +// install framework hooks immediately and poll for libapp.so. + +setImmediate(function() { + installAntiTamper(); + installBleHooks(); +}); + +// Poll for libapp.so load; one-shot setTimeout can fire too early. +(function() { + var attempts = 0; + var maxAttempts = 60; + var poller = setInterval(function() { + attempts++; + if (!installed.dartCrypto) installDartCryptoHooks(); + if (installed.dartCrypto) { + clearInterval(poller); + log("[+] all native hooks installed"); + } else if (attempts >= maxAttempts) { + clearInterval(poller); + log("[*] native hook polling timeout. Missing: libapp.so Dart"); + } + }, 1000); +})(); + + +// RPC exports - call from a frida CLI session: +// await rpc.exports.summary() // hooks attached + fire counts +// await rpc.exports.aesscan() // re-run the module AES scan + +rpc.exports = { + summary: function() { + return { + script: "frida_sb2_dart.js", + attached: attachedHooks, + fired: hookFireCounts + }; + }, + aesscan: function() { + aesScanDone = false; + scanModulesForAES(); + return "see [AES-SCAN] lines in log"; + } +}; \ No newline at end of file diff --git a/tests/test_devices.py b/tests/test_devices.py index 084a35f..8020a3f 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -6,7 +6,9 @@ import asyncio import logging +import os from typing import Any +from unittest import mock import pytest @@ -26,7 +28,12 @@ SolixBLEDevice, TemperatureUnit, ) -from SolixBLE.devices.solarbank2 import MaxLoadSB2 +from SolixBLE.devices.solarbank2 import ( + ENV_SB2_ANKER_USER_ID, + MaxLoadSB2, + Solarbank2Prime, + Solarbank2Common, +) from SolixBLE.states import GridStatus, LightMode, SBPowerCutoff, SBUsageMode from tests.const import ( MOCK_BLE_DEVICE, @@ -1136,3 +1143,105 @@ async def test_bad_values( assert ( getattr(device, class_property) == expected_value ), f"Mismatch for property '{class_property}'!" + + +# Solarbank 2 set-schedule payload (0x405e) and Solarbank2Prime construction +# Synthetic user IDs - never the real Anker cloud userId from any capture. +_FAKE_SB2_USER_ID_STR = "0" * 40 +_FAKE_SB2_USER_ID_BYTES = b"f" * 40 +_FAKE_SB2_USER_ID_ENV = "1" * 40 + + +def test_sb2_build_set_schedule_payload_matches_expected_layout(): + """A 90 W uniform schedule produces the documented 7-day TLV layout. + + Asserts the per-day-symmetric reading: 7 identical day-blocks + (a3, a7, ab, af, b3, b7, bb) each with enable flag, 8-byte struct, + per-day flag, 1-byte trailer; then the ``fd 05 03 <4B>`` nonce. + """ + fixed_nonce = bytes.fromhex("00112233") + with mock.patch.object(os, "urandom", return_value=fixed_nonce): + result = Solarbank2Common._build_set_schedule_payload(90) + + # 8-byte struct for power=90: start=0, end=1440, power=90, const=80. + struct_hex = "0000a0055a005000" + + def day_block(base: int) -> str: + return ( + f"{base:02x}020101" + f"{base + 1:02x}0904{struct_hex}" + f"{base + 2:02x}020100" + f"{base + 3:02x}0104" + ) + + expected_hex = ( + "a10121" + "a2020101" + + "".join(day_block(0xa3 + 4 * d) for d in range(7)) + + "fd050300112233" + ) + assert result.hex() == expected_hex + + +@pytest.mark.parametrize("bad_power", [-1, 801, 1000]) +def test_sb2_build_set_schedule_payload_rejects_out_of_range(bad_power): + """Out-of-range wattage raises ValueError.""" + with pytest.raises(ValueError): + Solarbank2Common._build_set_schedule_payload(bad_power) + + +def test_sb2_build_set_schedule_payload_uses_fresh_nonce_per_call(): + """Each call generates a fresh fd nonce (prevents 'abnormal state'). + + The Anker app generates fresh random bytes per write; reusing a value + causes the schedule storage to update but the inverter target to stay + stuck at the previous value. + """ + a = Solarbank2Common._build_set_schedule_payload(100) + b = Solarbank2Common._build_set_schedule_payload(100) + assert a[:-4] == b[:-4] + # 1-in-2^32 false-fail rate is negligible. + assert a[-4:] != b[-4:] + + +def test_sb2_prime_user_id_explicit_str_encoded_as_ascii(): + dev = Solarbank2Prime(MOCK_BLE_DEVICE, anker_user_id=_FAKE_SB2_USER_ID_STR) + assert dev._anker_user_id == _FAKE_SB2_USER_ID_STR.encode("ascii") + + +def test_sb2_prime_user_id_explicit_bytes_kept_as_is(): + dev = Solarbank2Prime(MOCK_BLE_DEVICE, anker_user_id=_FAKE_SB2_USER_ID_BYTES) + assert dev._anker_user_id == _FAKE_SB2_USER_ID_BYTES + + +def test_sb2_prime_user_id_falls_back_to_env_var(monkeypatch): + monkeypatch.setenv(ENV_SB2_ANKER_USER_ID, _FAKE_SB2_USER_ID_ENV) + dev = Solarbank2Prime(MOCK_BLE_DEVICE) + assert dev._anker_user_id == _FAKE_SB2_USER_ID_ENV.encode("ascii") + + +def test_sb2_prime_user_id_missing_raises(monkeypatch): + monkeypatch.delenv(ENV_SB2_ANKER_USER_ID, raising=False) + with pytest.raises(ValueError, match="Anker"): + Solarbank2Prime(MOCK_BLE_DEVICE) + + +def test_sb2_prime_static_key_encrypt_round_trips(monkeypatch): + """``_encrypt_with_static_key`` output is decryptable using the same + Prime static GCM parameters. + + The GCM tag verifying on decrypt is what proves key/nonce/AAD are + wired correctly. ``_decrypt_payload`` falls back to ``NEGOTIATION_KEY`` + when ``_shared_secret`` is not yet set, mirroring the path used to + read RX 48xx handshake responses. + """ + monkeypatch.setenv(ENV_SB2_ANKER_USER_ID, _FAKE_SB2_USER_ID_ENV) + dev = Solarbank2Prime(MOCK_BLE_DEVICE) + assert dev._shared_secret is None # static-key path + + plaintext = bytes.fromhex("a104deadbeefa200") + encrypted = dev._encrypt_with_static_key(plaintext) + + # GCM output = ciphertext + 16-byte tag. + assert len(encrypted) == len(plaintext) + 16 + assert dev._decrypt_payload(encrypted) == plaintext