-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontroller.py
More file actions
1300 lines (1121 loc) · 52.2 KB
/
Copy pathcontroller.py
File metadata and controls
1300 lines (1121 loc) · 52.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Pump controller and outage manager.
PumpController: sends intensity commands to ReefBeat controllers
with per-device granularity and 3 SoC-based levels
OutageManager: orchestrates the 3-level failover response
"""
import time
import json
import os
import threading
from pathlib import Path
from typing import Optional, Dict, Any
from outage import PowerState
from hotspot import NetworkManager, NetworkMode
from notifier import Notifier
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
try:
import paho.mqtt.client as mqtt
except ImportError:
mqtt = None
# =============================================================================
# Intensity level resolver
# =============================================================================
class IntensityLevel:
"""
Represents one intensity level (normal / eco / survival).
Each level has:
- soc_threshold: SoC below which this level activates
- global_intensity: default intensity for all pumps
- per_device: optional dict overriding intensity per pump key
(key = ctrl["key"], unique per controllable pump;
for RSRUN this is "<device_name>::pump_1" / "pump_2")
Special value: 0 means "turn the pump OFF" at that level.
"""
def __init__(self, name: str, cfg: dict):
self.name = name
self.soc_threshold = cfg.get("soc_threshold", 100)
self.global_intensity = cfg.get("global_intensity", 100)
self.per_device: Dict[str, int] = cfg.get("per_device", {})
def get_intensity(self, pump_key: str) -> int:
"""Get intensity for a specific pump (by unique key), with override."""
return self.per_device.get(pump_key, self.global_intensity)
def __repr__(self):
return (f"Level({self.name}: soc<{self.soc_threshold}%, "
f"global={self.global_intensity}%, "
f"overrides={self.per_device})")
class IntensityResolver:
"""
Resolves the active intensity level based on SoC.
Levels are sorted by soc_threshold descending:
normal (soc >= 60) -> keep original speeds
eco (soc >= 30) -> reduce to save battery
survival (soc < 30) -> minimum for reef survival
On mains power, always returns normal level.
"""
def __init__(self, cfg: dict):
pump_cfg = cfg.get("pump_control", {})
levels_cfg = pump_cfg.get("levels", {})
self._levels = self._build_levels(levels_cfg)
# Optional single TEST level. Its only purpose is to verify that
# the service can actually drive the pumps: change their speed and
# turn one off. It is NOT a SoC-resolved plan -- it is applied
# directly when the test switch is turned on (no outage needed) and
# removed when it is turned off. See PumpController.set_test_plan.
test_cfg = pump_cfg.get("test_level")
self._test_level = (
IntensityLevel("test", test_cfg) if test_cfg else None
)
print("[LEVELS] Configured intensity levels:")
for level in self._levels:
print(f" {level}")
if self._test_level:
print(f"[LEVELS] Test level: {self._test_level}")
@staticmethod
def _build_levels(levels_cfg: dict) -> list:
"""Build a sorted level list from a config dict, with fallback."""
levels = []
for name in ["normal", "eco", "survival"]:
if name in levels_cfg:
levels.append(IntensityLevel(name, levels_cfg[name]))
levels.sort(key=lambda l: l.soc_threshold, reverse=True)
if not levels:
# Fallback defaults
levels = [
IntensityLevel("normal", {
"soc_threshold": 100,
"global_intensity": 100}),
IntensityLevel("eco", {
"soc_threshold": 60,
"global_intensity": 50}),
IntensityLevel("survival", {
"soc_threshold": 30,
"global_intensity": 30}),
]
return levels
@property
def has_test_level(self) -> bool:
return self._test_level is not None
@property
def test_level(self) -> Optional["IntensityLevel"]:
return self._test_level
def resolve(self, soc: float, on_battery: bool) -> IntensityLevel:
"""
Determine the active level based on SoC and power state.
Returns the matching IntensityLevel.
"""
levels = self._levels
if not on_battery:
# On mains: always normal
return levels[0]
# On battery: find the level whose threshold we're below
# Levels are sorted descending by threshold
active = levels[0] # default to normal
for level in levels:
if soc <= level.soc_threshold:
active = level
return active
@property
def normal_level(self) -> IntensityLevel:
return self._levels[0]
# =============================================================================
# Pump controller
# =============================================================================
class PumpController:
"""
Controls pump intensity with per-pump granularity.
Each pump can have its own intensity based on the active level.
For multi-pump devices (RSRUN: return + skimmer share one box+IP),
each pump is tracked and addressed independently via its `pump_index`
("pump_1", "pump_2", ...).
Identification:
- Every controllable pump has a unique `key` (set by the wizard):
* single-pump devices (RSWAVE, RSLED): key == device name
* multi-pump RSRUN: key == "<name>::pump_1"
- `_device_intensities` and per_device overrides are indexed by key.
Intensity semantics:
- 0 means OFF (RSWAVE: stop; RSRUN: schedule_enabled=false on that pump)
- otherwise the value must respect the model's running range; the wizard
already validates this so the controller just forwards what it gets.
"""
def __init__(self, mqtt_client, cfg: dict):
self._client = mqtt_client
self._cfg = cfg
self._pump_cfg = cfg.get("pump_control", {})
self._mqtt_cfg = cfg.get("mqtt", {})
self._resolver = IntensityResolver(cfg)
self._lock = threading.Lock()
# Track per-pump intensity, indexed by unique pump key
self._device_intensities: Dict[str, int] = {}
for ctrl in self._pump_cfg.get("controllers", []):
self._device_intensities[ctrl["key"]] = 100
# Current active level name (for status reporting)
self.active_level_name = "normal"
# Functional command-test mode (drives pumps directly to verify
# speed change + on/off, no outage required). See set_test_plan.
self._test_mode_active = False
# Last health-check reachability per label, to detect a device that
# transitions DOWN -> reachable and needs the current level re-applied.
self._last_health: Dict[str, bool] = {}
# Background restore-retry: when mains returns, the Red Sea devices
# may still be (re)joining Wi-Fi and unreachable for a while. We
# retry restoring their original config until every snapshot is
# successfully re-applied (and dropped), instead of giving up after
# one failed attempt.
self._restore_retry_thread: Optional[threading.Thread] = None
self._stop_restore_retry = threading.Event()
self._restore_cfg = self._pump_cfg.get("restore_retry", {})
@property
def current_intensity(self) -> int:
"""Average intensity across all pumps (for status display)."""
if not self._device_intensities:
return 100
vals = list(self._device_intensities.values())
return round(sum(vals) / len(vals))
def set_test_plan(self, enabled: bool) -> bool:
"""
Turn the command-test mode on/off.
Purpose: verify the service can actually drive the pumps -- change
their speed and turn one off -- WITHOUT needing a real outage and
WITHOUT waiting for the SoC to fall. This is a functional check of
the control path, not an autonomy simulation.
On enable: snapshot each pump's current config (so we can restore
it), then apply the single configured `test_level` immediately.
On disable: restore every pump from its snapshot (reusing the same
retry-capable restore as post-outage recovery).
Returns True if the state changed. No-op (False) if there is no
test_level configured or we're already in the requested state.
"""
if enabled and not self._resolver.has_test_level:
print("[TEST] No 'test_level' configured — cannot enable")
return False
if self._test_mode_active == enabled:
return False
if enabled:
level = self._resolver.test_level
print(f"[TEST] Applying test level: speed -> "
f"{level.global_intensity}% (one or more pumps may be "
"turned off per per_device)")
with self._lock:
self._test_mode_active = True
self.active_level_name = "test"
controllers = self._pump_cfg.get("controllers", [])
for ctrl in controllers:
key = ctrl["key"]
target = level.get_intensity(key)
label = self._ctrl_label(ctrl)
print(f" [PUMP] {label}: -> {target}%"
+ (" (OFF)" if target == 0 else ""))
self._device_intensities[key] = target
# _api_set snapshots the original config first (idempotent),
# so a later restore puts everything back exactly.
self._api_set(ctrl, target)
self._publish_pump_state(level, "test_mode_on")
else:
print("[TEST] Test mode OFF — restoring pumps to original config")
self._test_mode_active = False
# Reuse the robust restore (immediate pass + background retry).
self.restore_normal()
return True
@property
def test_plan_active(self) -> bool:
return self._test_mode_active
def reapply_current_level(self, only_keys: Optional[set] = None) -> int:
"""
Force-resend the CURRENT level's intensity to pumps, bypassing the
"already at target" optimisation in apply_level.
Used to catch devices that were unreachable when the level was first
applied (e.g. a slow RSRUN that only rejoined the hotspot minutes
into the outage): their _device_intensities was already set to the
eco target, so apply_level would never re-command them. Here we
re-issue the command regardless.
If only_keys is given, only those controller keys are (re)commanded;
otherwise all are. Does nothing in functional test mode. Returns the
number of pumps commanded.
"""
if self._test_mode_active:
return 0
# Resolve against the level currently in effect.
level = self._resolver._levels[0]
for lvl in self._resolver._levels:
if lvl.name == self.active_level_name:
level = lvl
break
commanded = 0
with self._lock:
for ctrl in self._pump_cfg.get("controllers", []):
key = ctrl["key"]
if only_keys is not None and key not in only_keys:
continue
target = level.get_intensity(key)
label = self._ctrl_label(ctrl)
print(f" [PUMP] {label}: re-applying {target}% "
f"(level {level.name})")
self._device_intensities[key] = target
self._api_set(ctrl, target)
commanded += 1
return commanded
def apply_level(self, soc: float, on_battery: bool, reason: str = ""):
"""
Determine the appropriate level and apply per-pump intensities.
Only sends commands for pumps whose intensity actually changed.
"""
# While the functional test mode is active, the test level owns the
# pumps: ignore SoC-driven level changes so a poll cycle doesn't
# overwrite the test settings.
if self._test_mode_active:
return
level = self._resolver.resolve(soc, on_battery)
with self._lock:
if level.name == self.active_level_name and reason == "":
return # No change
old_level = self.active_level_name
self.active_level_name = level.name
controllers = self._pump_cfg.get("controllers", [])
# Determine per-pump targets
changes = []
for ctrl in controllers:
key = ctrl["key"]
target = level.get_intensity(key)
current = self._device_intensities.get(key, -1)
if target != current:
changes.append((ctrl, current, target))
self._device_intensities[key] = target
if not changes and old_level == level.name:
return
# Log level change
if old_level != level.name:
print(f"[PUMPS] Level: {old_level} -> {level.name} "
f"(SoC={soc:.0f}%, {reason})")
# Apply changes
for ctrl, old_val, new_val in changes:
label = self._ctrl_label(ctrl)
print(f" [PUMP] {label}: {old_val}% -> {new_val}%")
self._api_set(ctrl, new_val)
# MQTT: publish per-pump state
self._publish_pump_state(level, reason)
def restore_normal(self):
"""
Restore all pumps to their pre-outage configuration.
Each pump that was overridden during the outage has a snapshot on
disk; we re-push that snapshot so the user's original schedule
(RSRUN) or wave program (RSWAVE) comes back exactly as it was.
Pumps that were never overridden keep running untouched.
We do NOT just push 100% via _api_set: that would replace the
user's daily schedule with a flat one-slot or uniform wave.
At mains return the Red Sea devices may still be rejoining Wi-Fi,
so a single attempt often fails. We do one immediate pass, then
spawn a background thread that keeps retrying any pump whose
snapshot is still on disk (i.e. not yet successfully restored).
"""
normal = self._resolver.normal_level
with self._lock:
self.active_level_name = "normal"
# Forget battery-mode reachability tracking; next outage starts fresh.
self._last_health = {}
# Mark all pumps as logically back to normal for status/MQTT.
for ctrl in self._pump_cfg.get("controllers", []):
self._device_intensities[ctrl["key"]] = normal.get_intensity(
ctrl["key"])
self._publish_pump_state(normal, "power_restored")
# One immediate attempt, then background retries for whatever failed.
remaining = self._restore_pass()
if remaining:
self._start_restore_retry()
def _restore_pass(self) -> int:
"""
Attempt to restore every pump that still has a snapshot on disk.
Returns the number of pumps still NOT restored after this pass
(i.e. snapshots still present). _api_restore drops the snapshot
only on success, so a lingering snapshot means "retry needed".
"""
controllers = self._pump_cfg.get("controllers", [])
remaining = 0
for ctrl in controllers:
key = ctrl["key"]
if self._load_snapshot(key) is None:
continue # never overridden, or already restored
label = self._ctrl_label(ctrl)
print(f" [PUMP] {label}: restoring original config")
self._api_restore(ctrl)
# If the snapshot is still there, the restore failed.
if self._load_snapshot(key) is not None:
remaining += 1
return remaining
def _start_restore_retry(self) -> None:
"""Spawn (or restart) the background restore-retry thread."""
if (self._restore_retry_thread is not None
and self._restore_retry_thread.is_alive()):
return # already retrying
self._stop_restore_retry.clear()
self._restore_retry_thread = threading.Thread(
target=self._restore_retry_loop, daemon=True
)
self._restore_retry_thread.start()
def _restore_retry_loop(self) -> None:
"""
Keep retrying restore until all snapshots are gone, a stop is
requested (e.g. a new outage), or we exhaust max attempts.
"""
interval = float(self._restore_cfg.get("interval_s", 30.0))
max_attempts = int(self._restore_cfg.get("max_attempts", 40))
attempt = 0
print(f"[RESTORE] Background retry started "
f"(every {interval:.0f}s, up to {max_attempts} attempts)")
while not self._stop_restore_retry.wait(timeout=interval):
attempt += 1
remaining = self._restore_pass()
if remaining == 0:
print(f"[RESTORE] All pumps restored after {attempt} retry(ies)")
return
if attempt >= max_attempts:
print(f"[RESTORE] Giving up after {attempt} attempts; "
f"{remaining} pump(s) still unrestored. Snapshots kept "
f"on disk -- run 'python3 restore_pumps.py' manually.")
return
print(f"[RESTORE] Attempt {attempt}: {remaining} pump(s) still "
"unreachable, will retry")
def stop_restore_retry(self) -> None:
"""Cancel any in-flight restore-retry (e.g. a new outage began)."""
self._stop_restore_retry.set()
def reconcile_on_startup(self, on_battery: bool) -> None:
"""
Called at service startup. Handles the case where the Pi rebooted
mid-outage and we still have snapshots on disk.
- If we are back on mains power (on_battery=False) and snapshots
exist, the outage ended while we were down: restore everything.
- If we are still on battery, leave snapshots in place; the next
apply_level/restore_normal cycle will use them.
"""
controllers = self._pump_cfg.get("controllers", [])
stale = [c for c in controllers
if self._load_snapshot(c["key"]) is not None]
if not stale:
return
if on_battery:
print(f"[STARTUP] {len(stale)} stale snapshot(s) found, "
"still on battery -- keeping them")
return
print(f"[STARTUP] {len(stale)} stale snapshot(s) found and mains "
"is back -- restoring originals")
self.restore_normal()
@staticmethod
def _ctrl_label(ctrl: dict) -> str:
"""Human-readable label for a pump entry (used in logs)."""
if ctrl.get("pump_index"):
sub = ctrl.get("pump_name") or ctrl["pump_index"]
return f"{ctrl['name']} / {sub}"
return ctrl["name"]
# -------------------------------------------------------------------------
# Snapshot persistence
# -------------------------------------------------------------------------
# Snapshots are kept on disk so that an unplanned reboot of the Pi during
# an outage does not destroy the original schedule/wave configuration.
# On startup, if a snapshot file exists, we know we crashed mid-outage
# and the device may still be running our reduced schedule -- the
# snapshot lets us push back the original config when the mains return.
@property
def _snapshot_base(self) -> Path:
"""Base directory for all on-disk snapshots."""
# NOTE: the override lives under pump_control (self._pump_cfg), not
# at the config root. Reading it from self._cfg was a bug: the
# override never took effect.
path = self._pump_cfg.get("snapshot_dir")
if path:
return Path(path)
return Path("/var/lib/reefbeat-energy-backup")
@property
def _snapshot_dir(self) -> Path:
"""
Where pre-outage snapshots live.
These are captured the first time we override a pump during an
outage, and deleted once the original config is successfully
restored. A file here means "restore still pending".
"""
return self._snapshot_base / "snapshots"
@property
def _reference_dir(self) -> Path:
"""
Where periodic *reference* snapshots live.
Captured on a timer while running in nominal mode (mains, full
speed), these are a safety net: they are NOT deleted on restore,
so we always have a recent known-good config to fall back on even
if the pre-outage snapshot was never taken or got lost.
"""
return self._snapshot_base / "reference"
def _snapshot_path(self, key: str) -> Path:
"""Return the pre-outage snapshot file path for a given pump key."""
# Sanitise the key for filesystem usage (":" is fine on ext4 but ugly)
safe = key.replace("/", "_").replace(":", "-")
return self._snapshot_dir / f"{safe}.json"
def _reference_path(self, key: str) -> Path:
"""Return the reference snapshot file path for a given pump key."""
safe = key.replace("/", "_").replace(":", "-")
return self._reference_dir / f"{safe}.json"
def _save_snapshot(self, key: str, snapshot: Dict[str, Any]) -> None:
"""Persist a snapshot atomically (tmp + rename)."""
try:
self._snapshot_dir.mkdir(parents=True, exist_ok=True)
path = self._snapshot_path(key)
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(snapshot, indent=2))
os.replace(tmp, path)
except OSError as e:
print(f" [SNAP] failed to save {key}: {e}")
def _load_snapshot(self, key: str) -> Optional[Dict[str, Any]]:
"""Load a snapshot from disk if present."""
path = self._snapshot_path(key)
if not path.exists():
return None
try:
return json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
print(f" [SNAP] failed to load {key}: {e}")
return None
def _drop_snapshot(self, key: str) -> None:
"""Remove a pre-outage snapshot file (after successful restore)."""
try:
self._snapshot_path(key).unlink(missing_ok=True)
except OSError:
pass
def _save_reference(self, key: str, snapshot: Dict[str, Any]) -> None:
"""Persist a reference snapshot atomically (tmp + rename)."""
try:
self._reference_dir.mkdir(parents=True, exist_ok=True)
path = self._reference_path(key)
tmp = path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(snapshot, indent=2))
os.replace(tmp, path)
except OSError as e:
print(f" [REFERENCE] failed to save {key}: {e}")
def _load_reference(self, key: str) -> Optional[Dict[str, Any]]:
"""Load a reference snapshot from disk if present."""
path = self._reference_path(key)
if not path.exists():
return None
try:
return json.loads(path.read_text())
except (OSError, json.JSONDecodeError) as e:
print(f" [REFERENCE] failed to load {key}: {e}")
return None
# -------------------------------------------------------------------------
# ReefBeat HTTP primitives
# -------------------------------------------------------------------------
# These mirror what the ha-reefbeat custom component does, but called
# directly so we never depend on Home Assistant being up during an outage.
def _http_get(self, ip: str, path: str) -> Optional[Any]:
"""GET <path> from a ReefBeat device, returns parsed JSON or None."""
if not REQUESTS_AVAILABLE:
return None
try:
r = requests.get(f"http://{ip}{path}", timeout=3)
if r.ok:
return r.json()
print(f" [HTTP] GET {ip}{path} -> {r.status_code}")
except requests.exceptions.RequestException as e:
print(f" [HTTP] GET {ip}{path} -> unreachable ({type(e).__name__})")
return None
def _http_send(self, ip: str, path: str, payload: Any = "",
method: str = "put") -> bool:
"""Send a request (PUT/POST/DELETE) to a ReefBeat device.
The body is always sent as real JSON via requests' json= parameter.
As a safety net, if payload arrives as a string that is itself JSON
(e.g. it got serialized somewhere upstream), we parse it back to an
object first. Passing a JSON *string* to json= would double-encode
it — requests would wrap it in quotes and the firmware rejects it
with "could not parse the received JSON".
"""
if not REQUESTS_AVAILABLE:
return False
url = f"http://{ip}{path}"
# Normalize the payload so json= always gets a dict/list, never a
# pre-serialized JSON string (which would be double-encoded).
body = payload
if isinstance(body, str) and body not in ("",):
try:
body = json.loads(body)
except (ValueError, TypeError):
# Not JSON; leave as-is (caller may intend a raw string).
pass
try:
if method == "put":
r = requests.put(url, json=body, timeout=5)
elif method == "post":
# Empty payload for actions like /off; JSON for others.
if body == "" or body is None:
r = requests.post(url, timeout=5)
else:
r = requests.post(url, json=body, timeout=5)
elif method == "delete":
r = requests.delete(url, timeout=5)
else:
print(f" [HTTP] unknown method: {method}")
return False
if r.ok:
return True
# Include the device's response body: ReefBeat firmwares return a
# short JSON/text explaining WHY a 400 was rejected (e.g. which
# field is invalid), which is essential for diagnosing restores.
detail = ""
try:
txt = r.text.strip()
if txt:
detail = f" | {txt[:200]}"
except Exception:
pass
print(f" [HTTP] {method.upper()} {url} -> {r.status_code}{detail}")
return False
except requests.exceptions.RequestException as e:
print(f" [HTTP] {method.upper()} {url} -> unreachable "
f"({type(e).__name__})")
return False
def _device_off(self, ip: str) -> bool:
"""Globally turn the device OFF via POST /off."""
return self._http_send(ip, "/off", payload="", method="post")
def _device_on(self, ip: str) -> bool:
"""Globally turn the device back ON via DELETE /off."""
return self._http_send(ip, "/off", method="delete")
# -------------------------------------------------------------------------
# RSRUN: per-pump schedule snapshot/override/restore
# -------------------------------------------------------------------------
def _rsrun_snapshot(self, ctrl: dict) -> Optional[Dict[str, Any]]:
"""
Capture the current pump_X subtree from /pump/settings so we can
restore it as-is on power return.
"""
ip = ctrl["ip"]
pump_index = ctrl["pump_index"] # "pump_1" / "pump_2"
settings = self._http_get(ip, "/pump/settings")
if not isinstance(settings, dict):
return None
pump_data = settings.get(pump_index)
if not isinstance(pump_data, dict):
return None
# Keep only what's needed for restore. The schedule is the main thing,
# plus the on/off flag.
return {
"type": "rsrun",
"ip": ip,
"pump_index": pump_index,
"schedule": pump_data.get("schedule"),
"schedule_enabled": pump_data.get("schedule_enabled", True),
}
def _rsrun_apply_intensity(self, ctrl: dict, intensity: int) -> bool:
"""
Push a 1-slot schedule at the requested intensity for one pump of
an RSRUN. Format: [{"st":0,"pd":0,"ti":<intensity>}].
"""
ip = ctrl["ip"]
pump_index = ctrl["pump_index"]
payload = {
pump_index: {
"schedule_enabled": True,
"schedule": [{"st": 0, "pd": 0, "ti": intensity}],
}
}
return self._http_send(ip, "/pump/settings", payload, "put")
def _rsrun_restore(self, ctrl: dict, snapshot: Dict[str, Any]) -> bool:
"""Push the saved schedule back to the device."""
ip = ctrl["ip"]
pump_index = ctrl["pump_index"]
if snapshot.get("schedule") is None:
print(f" [SNAP] {self._ctrl_label(ctrl)}: no schedule "
"in snapshot, skipping restore")
return False
payload = {
pump_index: {
"schedule_enabled": bool(snapshot.get("schedule_enabled", True)),
"schedule": snapshot["schedule"],
}
}
return self._http_send(ip, "/pump/settings", payload, "put")
# -------------------------------------------------------------------------
# RSWAVE: /auto snapshot/override/restore
# -------------------------------------------------------------------------
def _rswave_snapshot(self, ctrl: dict) -> Optional[Dict[str, Any]]:
"""Capture the full /auto payload (intervals + schedule metadata)."""
ip = ctrl["ip"]
auto = self._http_get(ip, "/auto")
if not isinstance(auto, dict) or "intervals" not in auto:
return None
return {"type": "rswave", "ip": ip, "auto": auto}
def _rswave_apply_intensity(self, ctrl: dict, intensity: int) -> bool:
"""
Push a single uniform-flow interval at the requested intensity.
Wave type "un" (Uniforme) gives a steady, non-pulsed forward flow
at `fti`%. The firmware requires every interval to have a "name"
field and specific numeric defaults for frt/rrt/sn.
Push sequence required by the device:
POST /auto/init (with a fresh op uid)
POST /auto (the new schedule body, no uid)
POST /auto/complete (same uid as init)
POST /auto/apply (same uid)
"""
import uuid
ip = ctrl["ip"]
op_uid = str(uuid.uuid4())
wave_uid = str(uuid.uuid4())
# Build a minimal one-interval uniform schedule covering the whole day.
new_interval = {
"wave_uid": wave_uid,
"name": "Backup Mode",
"type": "un", # uniform: steady continuous flow
"direction": "fw",
"frt": 2, # min 2, max 60
"rrt": 2, # min 2, max 60
"fti": intensity, # forward target intensity (min 10, max 100)
"rti": intensity, # reverse target intensity (min 10, max 100)
"pd": 2, # pulse duration (min 2, max 25)
"sn": 3, # sine (min 3, max 10)
"sync": True,
"st": 0, # starts at 00:00
"start": 0,
}
body = {"intervals": [new_interval]}
if not self._http_send(ip, "/auto/init", {"uid": op_uid}, "post"):
return False
if not self._http_send(ip, "/auto", body, "post"):
return False
if not self._http_send(ip, "/auto/complete", {"uid": op_uid}, "post"):
return False
if not self._http_send(ip, "/auto/apply", {"uid": op_uid}, "post"):
return False
return True
def _rswave_restore(self, ctrl: dict, snapshot: Dict[str, Any]) -> bool:
"""
Push the saved /auto schedule back to the device.
Intervals are pushed ONE AT A TIME inside a single edit cycle:
POST /auto/init {"uid": op_uid}
POST /auto {"intervals": [interval_0]}
POST /auto {"intervals": [interval_1]}
... (one POST per interval — they accumulate)
POST /auto/complete {"uid": op_uid}
POST /auto/apply {"uid": op_uid}
Why one at a time: the older ESP8266-based ReefWave firmware has a
small JSON parse buffer and rejects a single POST carrying 3+
intervals ("could not parse the received JSON"), even though the
device happily STORES and runs 5+ intervals (the app pushes them
incrementally too). Pushing them individually keeps each request
tiny, and the firmware appends them. This also works on the newer
ESP32 firmware, so it's a single code path for both.
"""
import uuid
ip = ctrl["ip"]
auto = snapshot.get("auto")
if not isinstance(auto, dict):
print(f" [SNAP] {self._ctrl_label(ctrl)}: invalid snapshot")
return False
intervals = auto.get("intervals")
if not isinstance(intervals, list) or not intervals:
print(f" [SNAP] {self._ctrl_label(ctrl)}: no intervals to restore")
return False
op_uid = str(uuid.uuid4())
if not self._http_send(ip, "/auto/init", {"uid": op_uid}, "post"):
return False
# Push each interval in its own small request; the device accumulates.
for iv in intervals:
if not self._http_send(ip, "/auto", {"intervals": [iv]}, "post"):
return False
if not self._http_send(ip, "/auto/complete", {"uid": op_uid}, "post"):
return False
if not self._http_send(ip, "/auto/apply", {"uid": op_uid}, "post"):
return False
return True
# -------------------------------------------------------------------------
# Snapshot orchestration (capture once before first override, restore once)
# -------------------------------------------------------------------------
def _capture_config(self, ctrl: dict) -> Optional[Dict[str, Any]]:
"""
Read a device's current configuration into a snapshot dict.
Returns None if the device family is unknown or the read failed.
"""
hw = ctrl["hw_model"]
if hw.startswith("RSRUN"):
return self._rsrun_snapshot(ctrl)
if hw.startswith("RSWAVE"):
return self._rswave_snapshot(ctrl)
return None
def _ensure_snapshot(self, ctrl: dict) -> None:
"""
Capture the device's original configuration the first time we are
about to override it during an outage. Idempotent: if a snapshot
already exists on disk, don't overwrite it (we'd lose the original).
If the live read fails (device already unreachable when the outage
hits), we fall back to the most recent *reference* snapshot so we
still have something to restore later.
"""
key = ctrl["key"]
if self._load_snapshot(key) is not None:
return # already have one (e.g. survived a Pi reboot)
snap = self._capture_config(ctrl)
if snap is None:
# Live capture failed — fall back to the periodic reference.
ref = self._load_reference(key)
if ref is not None:
self._save_snapshot(key, ref)
print(f" [SNAP] {self._ctrl_label(ctrl)}: live capture "
"failed, using reference snapshot as fallback")
else:
print(f" [SNAP] {self._ctrl_label(ctrl)}: snapshot failed "
"and no reference available")
return
# Remember whether the device was ON or OFF at snapshot time.
# If it was already off (e.g. user toggle), we don't want to
# turn it back on at restore.
snap["was_off"] = self._is_device_off(ctrl["ip"])
self._save_snapshot(key, snap)
print(f" [SNAP] {self._ctrl_label(ctrl)}: original config saved")
def capture_reference_snapshots(self, force: bool = False) -> int:
"""
Periodically capture each pump's current config as a *reference*
snapshot (safety net). Should only be called in nominal mode
(mains power, pumps at normal intensity) so we never capture a
reduced config as the reference.
Unlike pre-outage snapshots, references are kept indefinitely and
overwritten on each successful capture. Returns the number of
devices captured.
`force=True` bypasses the nominal-mode guard (used by manual tools).
"""
if not force and self.active_level_name != "normal":
# Don't capture while we're running a reduced level: that would
# poison the reference with the eco/critical config.
return 0
captured = 0
for ctrl in self._pump_cfg.get("controllers", []):
snap = self._capture_config(ctrl)
if snap is None:
continue # device unreachable this round; keep old reference
snap["was_off"] = self._is_device_off(ctrl["ip"])
snap["captured_at"] = time.time()
self._save_reference(ctrl["key"], snap)
captured += 1
if captured:
print(f"[REFERENCE] Captured {captured} pump config(s) as reference")
return captured
def _probe_device(self, ip: str, timeout_s: float = 2.0) -> Optional[str]:
"""
Lightweight reachability probe for one device. Returns the device's
reported mode string (e.g. 'auto', 'manual', 'off') if reachable,
or None if unreachable. Stays quiet on failure (the health_check
caller does the logging) to avoid spamming per-request HTTP errors.
"""
if not REQUESTS_AVAILABLE:
return None
try:
r = requests.get(f"http://{ip}/mode", timeout=timeout_s)
if r.ok:
data = r.json()
if isinstance(data, dict):
return str(data.get("mode", "on"))
return "on"
except Exception: # noqa: BLE001 — any failure = unreachable
return None
return None
def health_check(self, network_mode: str = "?",
on_battery: bool = False) -> Dict[str, bool]:
"""
Poll every configured device for reachability and log a single
summary line. Returns a {label: reachable} map.
This is the periodic "is everyone OK?" check. It also surfaces the
active network mode (client / rejoin / hotspot) so the log shows
WHERE we are reaching the devices from — useful to confirm that a
hotspot failover actually brought the pumps back.
"""
controllers = self._pump_cfg.get("controllers", [])
results: Dict[str, bool] = {}
# Shorter timeout on battery to avoid blocking the loop and wasting
# energy on slow retries; devices on hotspot answer fast or not at all.
timeout_s = 1.5 if on_battery else 2.5
ok_count = 0
details = []
seen_ips = {}
label_to_key = {}
for ctrl in controllers:
ip = ctrl.get("ip")
label = self._ctrl_label(ctrl)
label_to_key[label] = ctrl["key"]
if not ip:
results[label] = False
details.append(f"{label}=no-ip")
continue
# Cache per-IP probe: multi-pump RSRUN share one IP/controller.
if ip in seen_ips:
mode = seen_ips[ip]
else:
mode = self._probe_device(ip, timeout_s=timeout_s)
seen_ips[ip] = mode
reachable = mode is not None
results[label] = reachable
if reachable:
ok_count += 1
details.append(f"{label}={mode}")
else:
details.append(f"{label}=DOWN")
total = len(results)
icon = "✅" if ok_count == total and total > 0 else (
"⚠️" if ok_count > 0 else "❌")
ctx = "battery" if on_battery else "mains"
print(f"[HEALTH] {icon} {ok_count}/{total} devices reachable "
f"| net={network_mode} | {ctx} | "
+ ", ".join(details))