diff --git a/config.yaml b/config.yaml index 84c7bd11..307e6d7f 100644 --- a/config.yaml +++ b/config.yaml @@ -1,16 +1,15 @@ ---- config: # Configuration values to set up basic communication # Set your COM port e.g. COM3 for Windows, /dev/ttyACM0 for Linux... # Use AUTO for COM port auto-discovery (may not work on every setup) # COM_PORT: "/dev/ttyACM0" # COM_PORT: "COM3" - COM_PORT: "AUTO" + COM_PORT: AUTO # Theme to use (located in res/themes) # Use the name of the folder as value # Choose a theme made for your screen size (see DISPLAY_SIZE inside theme.yaml) - THEME: 3.5inchTheme2 + THEME: Gradient # Hardware sensors reading # Choose the appropriate method for reading your hardware sensors: @@ -18,14 +17,14 @@ config: # - LHM use LibreHardwareMonitor library to read hardware sensors (Windows only - NEEDS ADMIN RIGHTS) # - STUB / STATIC use random/static data instead of real hardware sensors # - AUTO use the best method based on your OS: Windows OS will use LHM, other OS will use Python libraries - HW_SENSORS: AUTO + HW_SENSORS: PYTHON # Network interfaces # Linux/MacOS interfaces are named "eth0", "wlan0", "wlp1s0", "enp2s0"... # For Windows use the interfaces pretty name: "Ethernet 2", "Wi-Fi", ... # Leave the fields empty if the card does not exist on your setup - ETH: "" # Ethernet Card - WLO: "" # Wi-Fi Card + ETH: Ethernet # Ethernet Card + WLO: '' # Wi-Fi Card # CPU fan # For Linux/MacOS platforms, the CPU fan is amongst all fan sensors gathered from the motherboard chipset @@ -41,14 +40,14 @@ config: # OpenWeatherMap API KEY. Can be obtained by creating a free account on https://home.openweathermap.org/users/sign_up. # You need to subscribe to the 3.0 OneCallAPI that has 1000 free daily calls - WEATHER_API_KEY: "" + WEATHER_API_KEY: '' # Location from which to display the weather. Use for example https://www.latlong.net/ to get latitude/longitude - WEATHER_LATITUDE: 45.75 - WEATHER_LONGITUDE: 4.85 + WEATHER_LATITUDE: '45.75' + WEATHER_LONGITUDE: '4.85' # Units used to display temperatures (metric - °C, imperial - °F, standard - °K) WEATHER_UNITS: metric # Language is used by the API. Find more here https://openweathermap.org/api/one-call-3#multi - WEATHER_LANGUAGE: en + WEATHER_LANGUAGE: de display: # Display revision: @@ -60,7 +59,7 @@ display: # - WEACT_B for WeAct Studio Display FS V1 0.96" # - SIMU for simulated display (image written in screencap.png). Width & height will be detected from the theme # To identify your smart screen: https://github.com/mathoudebine/turing-smart-screen-python/wiki/Hardware-revisions - REVISION: A + REVISION: C_USB # Display Brightness # Set this as the desired %, 0 being completely dark and 100 being max brightness diff --git a/configure.py b/configure.py index 53b85845..8a42e897 100755 --- a/configure.py +++ b/configure.py @@ -66,7 +66,7 @@ SIZE_3_5_INCH = "3.5\"" SIZE_5_INCH = "5\"" SIZE_8_8_INCH = "8.8\"" -SIZE_8_8_INCH_USB = "8.8\" (V1.1)" +SIZE_8_8_INCH_USB = "8.8\" (V1.1) or 9.2\"" SIZE_2_1_INCH = "2.1\"" # Only for retro compatibility SIZE_2_x_INCH = "2.1\" / 2.8\"" SIZE_0_96_INCH = "0.96\"" @@ -124,20 +124,20 @@ "sk": "Slovak", "sl": "Slovenian", "sp": "Spanish", "sv": "Swedish", "th": "Thai", "tr": "Turkish", "ua": "Ukrainian", "vi": "Vietnamese", "zu": "Zulu"} -MAIN_DIRECTORY = str(Path(__file__).parent.resolve()) + "/" -THEMES_DIR = MAIN_DIRECTORY + 'res/themes' +MAIN_DIRECTORY = Path(__file__).resolve().parent +THEMES_DIR = MAIN_DIRECTORY / "res/themes" -circular_mask = Image.open(MAIN_DIRECTORY + "res/backgrounds/circular-mask.png") + +circular_mask = Image.open(MAIN_DIRECTORY / "res/backgrounds/circular-mask.png") def get_theme_data(name: str): - dir = os.path.join(THEMES_DIR, name) + dir = THEMES_DIR / name + # checking if it is a directory - if os.path.isdir(dir): - # Check if a theme.yaml file exists - theme = os.path.join(dir, 'theme.yaml') - if os.path.isfile(theme): - # Get display size from theme.yaml - with open(theme, "rt", encoding='utf8') as stream: + if dir.is_dir(): + theme = dir / "theme.yaml" + if theme.is_file(): + with open(theme, "rt", encoding="utf8") as stream: theme_data, ind, bsi = ruamel.yaml.util.load_yaml_guess_indent(stream) return theme_data return None @@ -189,8 +189,7 @@ def __init__(self): self.window = Tk() self.window.title('Turing System Monitor configuration') self.window.geometry("820x580") - self.window.iconphoto(True, PhotoImage(file=MAIN_DIRECTORY + "res/icons/monitor-icon-17865/64.png")) - # When window gets focus again, reload theme preview in case it has been updated by theme editor + self.window.iconphoto(True,PhotoImage(file=str(MAIN_DIRECTORY / "res/icons/monitor-icon-17865/64.png"))) # When window gets focus again, reload theme preview in case it has been updated by theme editor self.window.bind("", self.on_theme_change) self.window.after(0, self.on_fan_speed_update) @@ -313,14 +312,17 @@ def run(self): def load_theme_preview(self): theme_data = get_theme_data(self.theme_cb.get()) + if theme_data and theme_data['display'].get("DISPLAY_SIZE", '3.5"') == SIZE_2_1_INCH: + theme_preview.paste(circular_mask, mask=circular_mask) + try: - theme_preview = Image.open(MAIN_DIRECTORY + "res/themes/" + self.theme_cb.get() + "/preview.png") + theme_preview = Image.open(MAIN_DIRECTORY / "res" / "themes" / self.theme_cb.get() / "preview.png") if theme_data['display'].get("DISPLAY_SIZE", '3.5"') == SIZE_2_1_INCH: # This is a circular screen: apply a circle mask over the preview theme_preview.paste(circular_mask, mask=circular_mask) except: - theme_preview = Image.open(MAIN_DIRECTORY + "res/docs/no-preview.png") + theme_preview = Image.open(MAIN_DIRECTORY / "res/docs/no-preview.png") finally: theme_preview.thumbnail((320, 480), Image.Resampling.LANCZOS) self.theme_preview_img = ImageTk.PhotoImage(theme_preview) @@ -338,7 +340,7 @@ def load_theme_preview(self): self.theme_author.place(x=10, y=self.theme_preview_img.height() + 15) def load_config_values(self): - with open(MAIN_DIRECTORY + "config.yaml", "rt", encoding='utf8') as stream: + with open(MAIN_DIRECTORY / "config.yaml", "rt", encoding='utf8') as stream: self.config, ind, bsi = ruamel.yaml.util.load_yaml_guess_indent(stream) # Check if theme is valid @@ -450,7 +452,7 @@ def save_config_values(self): self.config['display']['DISPLAY_REVERSE'] = [k for k, v in reverse_map.items() if v == self.orient_cb.get()][0] self.config['display']['BRIGHTNESS'] = int(self.brightness_slider.get()) - with open(MAIN_DIRECTORY + "config.yaml", "w", encoding='utf-8') as file: + with open(MAIN_DIRECTORY / "config.yaml", "w", encoding='utf-8') as file: ruamel.yaml.YAML().dump(self.config, file) def save_additional_config(self, ping: str, api_key: str, lat: str, long: str, unit: str, lang: str): @@ -461,7 +463,7 @@ def save_additional_config(self, ping: str, api_key: str, lat: str, long: str, u self.config['config']['WEATHER_UNITS'] = unit self.config['config']['WEATHER_LANGUAGE'] = lang - with open(MAIN_DIRECTORY + "config.yaml", "w", encoding='utf-8') as file: + with open(MAIN_DIRECTORY / "config.yaml", "w", encoding='utf-8') as file: ruamel.yaml.YAML().dump(self.config, file) def on_theme_change(self, e=None): @@ -471,25 +473,44 @@ def on_weatherping_click(self): self.more_config_window.show() def on_open_theme_folder_click(self): - path = f'"{MAIN_DIRECTORY}res/themes"' + #path = f'"{MAIN_DIRECTORY}res/themes"' + #if platform.system() == "Windows": + # os.startfile(path) + #elif platform.system() == "Darwin": + # subprocess.Popen(["open", path]) + #else: + # subprocess.Popen(["xdg-open", path]) + path = MAIN_DIRECTORY / "res/themes" + if platform.system() == "Windows": os.startfile(path) elif platform.system() == "Darwin": - subprocess.Popen(["open", path]) + subprocess.Popen(["open", str(path)]) else: - subprocess.Popen(["xdg-open", path]) + subprocess.Popen(["xdg-open", str(path)]) + def on_theme_editor_click(self): - subprocess.Popen( - f'"{MAIN_DIRECTORY}{glob.glob("theme-editor.*", root_dir=MAIN_DIRECTORY)[0]}" "{self.theme_cb.get()}"', - shell=True) + theme_editor = next(MAIN_DIRECTORY.glob("theme-editor.*")) + + if platform.system() == "Windows": + subprocess.Popen([str(theme_editor), self.theme_cb.get()], shell=True) + else: + subprocess.Popen([str(theme_editor), self.theme_cb.get()]) + def on_save_click(self): self.save_config_values() def on_saverun_click(self): self.save_config_values() - subprocess.Popen(f'"{MAIN_DIRECTORY}{glob.glob("main.*", root_dir=MAIN_DIRECTORY)[0]}"', shell=True) + main_file = next(MAIN_DIRECTORY.glob("main.*")) + + if platform.system() == "Windows": + subprocess.Popen([str(main_file)], shell=True) + else: + subprocess.Popen([str(main_file)]) + self.window.destroy() def on_brightness_change(self, e=None): diff --git a/library/lcd/lcd_comm_turing_usb.py b/library/lcd/lcd_comm_turing_usb.py index 6fee436f..5ccd6bf8 100644 --- a/library/lcd/lcd_comm_turing_usb.py +++ b/library/lcd/lcd_comm_turing_usb.py @@ -19,9 +19,11 @@ # along with this program. If not, see . import math +import os import platform import queue import struct +import shutil import subprocess import time from io import BytesIO @@ -37,11 +39,400 @@ from library.lcd.lcd_comm import Orientation, LcdComm VENDOR_ID = 0x1cbe -PRODUCT_ID = 0x0088 +PRODUCT_ID = [0x0088, 0x0092] # 8.8", 9.2" MAX_CHUNK_BYTES = 1024*1024 # Data sent to screen cannot exceed 1024MB or there will be a timeout +# Command IDs used by the vendor protocol (subset) +CMD_UPLOAD_JPEG = 101 +CMD_UPLOAD_PNG = 102 +CMD_GET_H264_CHUNK_SIZE = 17 +CMD_PLAY_H264_CHUNK = 121 +CMD_GET_STREAM_STATUS = 122 +CMD_STOP_STREAM = 123 + +# Default max payload for frame uploads (device/transport limit) +MAX_IMAGE_PAYLOAD_DEFAULT = MAX_CHUNK_BYTES + +def _resp_ok(resp: Optional[bytes]) -> bool: + if not resp: + return False + b1 = resp[1] if len(resp) > 1 else None + b8 = resp[8] if len(resp) > 8 else None + return (b1 == 0xC8) or (b8 == 0xC8) + +def send_jpeg(dev, jpeg_data: bytes): + img_size = len(jpeg_data) + cmd_packet = build_command_packet_header(CMD_UPLOAD_JPEG) + cmd_packet[8] = (img_size >> 24) & 0xFF + cmd_packet[9] = (img_size >> 16) & 0xFF + cmd_packet[10] = (img_size >> 8) & 0xFF + cmd_packet[11] = img_size & 0xFF + full_payload = encrypt_command_packet(cmd_packet) + jpeg_data + return write_to_device(dev, full_payload) + +def _encode_jpeg_under_limit( + image: Image.Image, + *, + max_bytes: int, + quality: int = 95, + subsampling: int = -1, +) -> bytes: + if subsampling not in (-1, 0, 1, 2): + raise ValueError("subsampling must be one of: -1, 0, 1, 2") + img = image + if img.mode not in ("RGB", "L"): + img = img.convert("RGB") + elif img.mode == "L": + img = img.convert("RGB") + + subs = (2, 1, 0) if subsampling == -1 else (subsampling,) + best = b"" + for sub in subs: + q = int(quality) + while q >= 1: + buf = BytesIO() + try: + img.save( + buf, + format="JPEG", + quality=q, + optimize=False, + progressive=False, + subsampling=sub, + ) + except TypeError: + img.save(buf, format="JPEG", quality=q, optimize=False, progressive=False) + data = buf.getvalue() + if not best or len(data) < len(best): + best = data + if len(data) <= max_bytes: + return data + q = q - 5 if q > 10 else q - 1 + + raise RuntimeError(f"Could not transcode JPEG under max_bytes: {len(best)} > {max_bytes}") + +def send_pil_image_auto( + dev, + image: Image.Image, + *, + max_bytes: int = MAX_IMAGE_PAYLOAD_DEFAULT, +) -> None: + # First try PNG (preferred) + png = _encode_png(image) + if len(png) <= max_bytes: + send_image(dev, png) + return + # Fallback to JPEG when over limit (default behavior) + jpg = _encode_jpeg_under_limit(image, max_bytes=max_bytes, quality=90, subsampling=-1) + send_jpeg(dev, jpg) + +# ---- MP4 parsing + Annex-B extraction (pure Python fallback) ---- +from dataclasses import dataclass +from typing import Iterable, Tuple, Set + +def _u32be(b: bytes, off: int = 0) -> int: + return int.from_bytes(b[off:off+4], "big", signed=False) + +def _u64be(b: bytes, off: int = 0) -> int: + return int.from_bytes(b[off:off+8], "big", signed=False) + +def _iter_mp4_boxes(data: bytes, start: int, end: int) -> Iterable[tuple[bytes, int, int]]: + i = start + while i + 8 <= end: + size = _u32be(data, i) + typ = data[i+4:i+8] + hdr = 8 + if size == 1: + if i + 16 > end: + break + size = _u64be(data, i+8) + hdr = 16 + elif size == 0: + size = end - i + if size < hdr: + break + j = i + int(size) + if j > end: + break + yield typ, i + hdr, j + i = j + +def _mp4_find_box(data: bytes, start: int, end: int, typ: bytes) -> Optional[tuple[int, int]]: + for t, ps, pe in _iter_mp4_boxes(data, start, end): + if t == typ: + return ps, pe + return None + +@dataclass +class _Mp4H264Track: + nal_len_size: int + sps_list: list[bytes] + pps_list: list[bytes] + sample_sizes: list[int] + chunk_offsets: list[int] + stsc: list[tuple[int, int, int]] # (first_chunk, samples_per_chunk, sample_desc_idx) + sync_samples: Optional[Set[int]] + +def _mp4_parse_avcc(avcc: bytes) -> tuple[int, list[bytes], list[bytes]]: + if len(avcc) < 7: + raise ValueError("avcC too small") + nal_len_size = (avcc[4] & 0x03) + 1 + num_sps = avcc[5] & 0x1F + off = 6 + sps_list: list[bytes] = [] + for _ in range(num_sps): + if off + 2 > len(avcc): + raise ValueError("avcC truncated (SPS length)") + n = int.from_bytes(avcc[off:off+2], "big") + off += 2 + if off + n > len(avcc): + raise ValueError("avcC truncated (SPS data)") + sps_list.append(avcc[off:off+n]) + off += n + if off + 1 > len(avcc): + raise ValueError("avcC truncated (PPS count)") + num_pps = avcc[off] + off += 1 + pps_list: list[bytes] = [] + for _ in range(num_pps): + if off + 2 > len(avcc): + raise ValueError("avcC truncated (PPS length)") + n = int.from_bytes(avcc[off:off+2], "big") + off += 2 + if off + n > len(avcc): + raise ValueError("avcC truncated (PPS data)") + pps_list.append(avcc[off:off+n]) + off += n + return nal_len_size, sps_list, pps_list + +def _mp4_load_moov(path: str) -> bytes: + with open(path, "rb") as f: + f.seek(0, os.SEEK_END) + file_size = f.tell() + f.seek(0, os.SEEK_SET) + while f.tell() + 8 <= file_size: + off0 = f.tell() + hdr = f.read(8) + if len(hdr) < 8: + break + size = _u32be(hdr, 0) + typ = hdr[4:8] + hdr_size = 8 + if size == 1: + ext = f.read(8) + if len(ext) < 8: + break + size = _u64be(ext, 0) + hdr_size = 16 + elif size == 0: + size = file_size - off0 + if size < hdr_size: + break + payload_size = int(size) - hdr_size + if typ == b"moov": + return f.read(payload_size) + f.seek(payload_size, os.SEEK_CUR) + raise ValueError("MP4: moov box not found") + +def _mp4_pick_h264_video_track(moov: bytes) -> _Mp4H264Track: + moov_start = 0 + moov_end = len(moov) + for t_trak, trak_ps, trak_pe in _iter_mp4_boxes(moov, moov_start, moov_end): + if t_trak != b"trak": + continue + mdia = _mp4_find_box(moov, trak_ps, trak_pe, b"mdia") + if mdia is None: + continue + mdia_ps, mdia_pe = mdia + hdlr = _mp4_find_box(moov, mdia_ps, mdia_pe, b"hdlr") + if hdlr is None: + continue + hdlr_ps, hdlr_pe = hdlr + hdlr_payload = moov[hdlr_ps:hdlr_pe] + if len(hdlr_payload) < 12 or hdlr_payload[8:12] != b"vide": + continue + + minf = _mp4_find_box(moov, mdia_ps, mdia_pe, b"minf") + if minf is None: + continue + stbl = _mp4_find_box(moov, minf[0], minf[1], b"stbl") + if stbl is None: + continue + stbl_ps, stbl_pe = stbl + + stsd = _mp4_find_box(moov, stbl_ps, stbl_pe, b"stsd") + stsz = _mp4_find_box(moov, stbl_ps, stbl_pe, b"stsz") + stsc = _mp4_find_box(moov, stbl_ps, stbl_pe, b"stsc") + stco = _mp4_find_box(moov, stbl_ps, stbl_pe, b"stco") + co64 = _mp4_find_box(moov, stbl_ps, stbl_pe, b"co64") + stss = _mp4_find_box(moov, stbl_ps, stbl_pe, b"stss") + if stsd is None or stsz is None or stsc is None or (stco is None and co64 is None): + continue + + stsd_payload = moov[stsd[0]:stsd[1]] + if len(stsd_payload) < 8: + continue + entry_count = _u32be(stsd_payload, 4) + off = 8 + found = False + nal_len_size = 4 + sps_list: list[bytes] = [] + pps_list: list[bytes] = [] + for _ in range(entry_count): + if off + 8 > len(stsd_payload): + break + ent_size = _u32be(stsd_payload, off) + fmt = stsd_payload[off+4:off+8] + ent_end = off + int(ent_size) + if ent_size < 8 or ent_end > len(stsd_payload): + break + if fmt in (b"avc1", b"avc3"): + child_start = off + 8 + 78 + if child_start < ent_end: + for t2, ps2, pe2 in _iter_mp4_boxes(stsd_payload, child_start, ent_end): + if t2 == b"avcC": + nal_len_size, sps_list, pps_list = _mp4_parse_avcc(stsd_payload[ps2:pe2]) + found = True + break + elif fmt in (b"hvc1", b"hev1"): + raise ValueError("MP4 contains HEVC/H.265; device expects H.264") + if found: + break + off = ent_end + if not found: + continue + + stsz_payload = moov[stsz[0]:stsz[1]] + if len(stsz_payload) < 12: + continue + fixed_size = _u32be(stsz_payload, 4) + sample_count = _u32be(stsz_payload, 8) + sample_sizes: list[int] = [] + if fixed_size: + sample_sizes = [int(fixed_size)] * int(sample_count) + else: + need = 12 + int(sample_count) * 4 + if len(stsz_payload) < need: + continue + off2 = 12 + for _ in range(int(sample_count)): + sample_sizes.append(int(_u32be(stsz_payload, off2))) + off2 += 4 + + if stco is not None: + stco_payload = moov[stco[0]:stco[1]] + if len(stco_payload) < 8: + continue + n = _u32be(stco_payload, 4) + need = 8 + int(n) * 4 + if len(stco_payload) < need: + continue + chunk_offsets = [int(_u32be(stco_payload, 8 + 4*i)) for i in range(int(n))] + else: + co64_payload = moov[co64[0]:co64[1]] # type: ignore[index] + if len(co64_payload) < 8: + continue + n = _u32be(co64_payload, 4) + need = 8 + int(n) * 8 + if len(co64_payload) < need: + continue + chunk_offsets = [int(_u64be(co64_payload, 8 + 8*i)) for i in range(int(n))] + + stsc_payload = moov[stsc[0]:stsc[1]] + if len(stsc_payload) < 8: + continue + n = _u32be(stsc_payload, 4) + need = 8 + int(n) * 12 + if len(stsc_payload) < need: + continue + stsc_entries: list[tuple[int, int, int]] = [] + off3 = 8 + for _ in range(int(n)): + first_chunk = int(_u32be(stsc_payload, off3)) + samples_per_chunk = int(_u32be(stsc_payload, off3+4)) + desc_idx = int(_u32be(stsc_payload, off3+8)) + stsc_entries.append((first_chunk, samples_per_chunk, desc_idx)) + off3 += 12 + stsc_entries.sort(key=lambda x: x[0]) + + sync_samples: Optional[Set[int]] = None + if stss is not None: + stss_payload = moov[stss[0]:stss[1]] + if len(stss_payload) >= 8: + n2 = _u32be(stss_payload, 4) + need = 8 + int(n2) * 4 + if len(stss_payload) >= need: + sync_samples = set(int(_u32be(stss_payload, 8 + 4*i)) for i in range(int(n2))) + + return _Mp4H264Track( + nal_len_size=int(nal_len_size), + sps_list=sps_list, + pps_list=pps_list, + sample_sizes=sample_sizes, + chunk_offsets=chunk_offsets, + stsc=stsc_entries, + sync_samples=sync_samples, + ) + + raise ValueError("MP4: no H.264 video track found") + +def _mp4_iter_sample_locations(track: _Mp4H264Track) -> Iterable[tuple[int, int, int]]: + sizes = track.sample_sizes + sample_idx0 = 0 + entries = track.stsc + entry_idx = 0 + if not sizes: + return + for chunk_idx1, chunk_off in enumerate(track.chunk_offsets, start=1): + while (entry_idx + 1) < len(entries) and chunk_idx1 >= entries[entry_idx + 1][0]: + entry_idx += 1 + samples_per_chunk = entries[entry_idx][1] + off = int(chunk_off) + for _ in range(samples_per_chunk): + if sample_idx0 >= len(sizes): + return + sz = int(sizes[sample_idx0]) + yield sample_idx0 + 1, off, sz + off += sz + sample_idx0 += 1 + +def _mp4_extract_h264_annexb(in_path: str, out_path: str, *, repeat_headers: bool = True) -> None: + moov = _mp4_load_moov(in_path) + track = _mp4_pick_h264_video_track(moov) + start_code = b"\x00\x00\x00\x01" + spspps = b"".join(start_code + s for s in track.sps_list) + b"".join(start_code + p for p in track.pps_list) + if not spspps: + raise ValueError("MP4: missing SPS/PPS in avcC") + + with open(in_path, "rb") as fin, open(out_path, "wb") as fout: + fout.write(spspps) + nls = int(track.nal_len_size) + if nls not in (1,2,3,4): + raise ValueError(f"MP4: unsupported NAL length size: {nls}") + sync = track.sync_samples + for sample_no, off, sz in _mp4_iter_sample_locations(track): + if repeat_headers and sync is not None and sample_no in sync: + fout.write(spspps) + fin.seek(off, os.SEEK_SET) + data = fin.read(sz) + if len(data) != sz: + raise ValueError("MP4: truncated sample read") + pos = 0 + end = len(data) + while pos + nls <= end: + nal_len = int.from_bytes(data[pos:pos+nls], "big") + pos += nls + if nal_len <= 0: + continue + if pos + nal_len > end: + raise ValueError("MP4: invalid NAL length in sample") + fout.write(start_code) + fout.write(data[pos:pos+nal_len]) + pos += nal_len + + def build_command_packet_header(a0: int) -> bytearray: packet = bytearray(500) @@ -71,9 +462,11 @@ def encrypt_command_packet(data: bytearray) -> bytearray: def find_usb_device(): - dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID) + for pid in PRODUCT_ID: + dev = usb.core.find(idVendor=VENDOR_ID, idProduct=pid) if dev is None: - raise ValueError('USB device not found') + raise ValueError(f'USB device not found') + try: dev.set_configuration() @@ -242,7 +635,7 @@ def delay(dev, rst): print("Sending Delay Command (ID 122)...") cmd_packet = build_command_packet_header(122) response = write_to_device(dev, encrypt_command_packet(cmd_packet)) - if response and response[8] > rst: + if response and len(response) > 8 and response[8] > rst: delay(dev, rst) @@ -252,65 +645,100 @@ def extract_h264_from_mp4(mp4_path: str): raise FileNotFoundError(f"Input file not found: {input_path}") output_path = input_path.with_suffix(".h264") - if output_path.exists(): print(f"{output_path.name} already exists. Skipping extraction.") return output_path - cmd = ["ffmpeg", "-y", # overwrite without asking - "-i", str(input_path), # input file - "-c:v", "copy", # copy video stream - "-bsf:v", "h264_mp4toannexb", # convert to Annex-B - "-an", # remove audio - "-f", "h264", # set output format - str(output_path) # output file - ] - - print(f"Extracting H.264 from {input_path.name}...") - subprocess.run(cmd, check=True) + # Prefer ffmpeg when available (fast + robust). Fall back to pure-Python MP4->Annex-B extraction. + ffmpeg = shutil.which("ffmpeg") + if ffmpeg: + cmd = [ + ffmpeg, + "-y", + "-i", + str(input_path), + "-c:v", + "copy", + "-bsf:v", + "h264_mp4toannexb", + "-an", + "-f", + "h264", + str(output_path), + ] + print(f"Extracting H.264 from {input_path.name} with ffmpeg...") + subprocess.run(cmd, check=True) + print(f"Done. Saved as {output_path.name}") + return output_path + + print(f"ffmpeg not found; extracting H.264 from {input_path.name} with built-in MP4 parser...") + _mp4_extract_h264_annexb(str(input_path), str(output_path), repeat_headers=True) print(f"Done. Saved as {output_path.name}") return output_path + def send_video(dev, video_path, loop=False): output_path = extract_h264_from_mp4(video_path) + write_to_device(dev, encrypt_command_packet(build_command_packet_header(111))) write_to_device(dev, encrypt_command_packet(build_command_packet_header(112))) write_to_device(dev, encrypt_command_packet(build_command_packet_header(13))) send_brightness_command(dev, 32) # 14 write_to_device(dev, encrypt_command_packet(build_command_packet_header(41))) - clear_image(dev) # 102, 3703 + clear_image(dev) # 102 send_frame_rate_command(dev, 25) # 15 - # send_image(dev, './102_25011_payload.png') #102, 25011 + + # Negotiate chunk size if supported + resp = write_to_device(dev, encrypt_command_packet(build_command_packet_header(CMD_GET_H264_CHUNK_SIZE))) + chunk_size = 202752 + try: + if resp and len(resp) >= 12: + negotiated = int.from_bytes(resp[8:12], byteorder="big", signed=False) + if 0 < negotiated <= 1024 * 1024: + chunk_size = negotiated + except Exception: + pass + print("Sending Send Video Command (ID 121)...") try: - while (True): - with open(output_path, 'rb') as f: + while True: + with open(output_path, "rb") as f: while True: - data = f.read(202752) - chunksize = len(data) + data = f.read(chunk_size) if not data: break - print(f" Chunk Size: {chunksize} bytes") - cmd_packet = build_command_packet_header(121) + chunksize = len(data) + is_last = f.tell() == os.path.getsize(output_path) + + cmd_packet = build_command_packet_header(CMD_PLAY_H264_CHUNK) cmd_packet[8] = (chunksize >> 24) & 0xFF cmd_packet[9] = (chunksize >> 16) & 0xFF cmd_packet[10] = (chunksize >> 8) & 0xFF cmd_packet[11] = chunksize & 0xFF + if is_last: + cmd_packet[12] = 1 full_payload = encrypt_command_packet(cmd_packet) + data response = write_to_device(dev, full_payload) - time.sleep(0.03) - if response is None or len(response) < 9 or response[8] <= 3: + + # Flow control (queue depth is usually reported in response[8] to cmd 122) + if response is None: delay(dev, 2) - print("Video sent successfully.") + else: + # Poll stream status when queue is high + st = write_to_device(dev, encrypt_command_packet(build_command_packet_header(CMD_GET_STREAM_STATUS))) + if st and len(st) > 8 and st[8] > 3: + delay(dev, 2) + + print("Video sent successfully.") if not loop: break except KeyboardInterrupt: print("\nLoop interrupted by user. Sending reset...") finally: - write_to_device(dev, encrypt_command_packet(build_command_packet_header(123))) + write_to_device(dev, encrypt_command_packet(build_command_packet_header(CMD_STOP_STREAM))) def _encode_png(image: Image.Image) -> bytes: @@ -452,24 +880,48 @@ def _write_file_command(dev, file_path: str) -> bool: logger.info("Writing remote file from: %s", file_path) try: + total_size = Path(file_path).stat().st_size + sent = 0 + chunk_index = 0 + + preferred_cap = min(1024 * 1024, MAX_CHUNK_BYTES) + with open(file_path, "rb") as fh: - chunk_index = 0 while True: - data_chunk = fh.read(202752) + data_chunk = fh.read(preferred_cap) if not data_chunk: break - chunk_size = len(data_chunk) chunk_index += 1 - logger.debug("Chunk %d size: %d bytes", chunk_index, chunk_size) + chunk_len = len(data_chunk) + sent += chunk_len + is_last = sent >= total_size + # [8..11]=chunk_capacity, [12..15]=chunk_len, [16]=last_flag, payload=chunk cmd_packet = build_command_packet_header(39) - cmd_packet[8] = (chunk_size >> 24) & 0xFF - cmd_packet[9] = (chunk_size >> 16) & 0xFF - cmd_packet[10] = (chunk_size >> 8) & 0xFF - cmd_packet[11] = chunk_size & 0xFF + cap = preferred_cap + cmd_packet[8] = (cap >> 24) & 0xFF + cmd_packet[9] = (cap >> 16) & 0xFF + cmd_packet[10] = (cap >> 8) & 0xFF + cmd_packet[11] = cap & 0xFF + cmd_packet[12] = (chunk_len >> 24) & 0xFF + cmd_packet[13] = (chunk_len >> 16) & 0xFF + cmd_packet[14] = (chunk_len >> 8) & 0xFF + cmd_packet[15] = chunk_len & 0xFF + if is_last: + cmd_packet[16] = 1 response = write_to_device(dev, encrypt_command_packet(cmd_packet) + data_chunk) + + # Fallback: legacy layout uses [8..11]=chunk_len only + if response is None or (not _resp_ok(response)): + legacy_packet = build_command_packet_header(39) + legacy_packet[8] = (chunk_len >> 24) & 0xFF + legacy_packet[9] = (chunk_len >> 16) & 0xFF + legacy_packet[10] = (chunk_len >> 8) & 0xFF + legacy_packet[11] = chunk_len & 0xFF + response = write_to_device(dev, encrypt_command_packet(legacy_packet) + data_chunk) + if response is None: logger.error("Write command failed at chunk %d", chunk_index) return False @@ -483,6 +935,7 @@ def _write_file_command(dev, file_path: str) -> bool: logger.error("Error writing file: %s", exc) return False + # This class is for Turing Smart Screen newer models (5.2" / 8" / 8.8" HW rev 1.x / 9.2") # These models are not detected as serial ports but as (Win)USB devices class LcdCommTuringUSB(LcdComm): @@ -566,6 +1019,5 @@ def DisplayPILImage(self, image: Image.Image, x: int = 0, y: int = 0, image_widt # print("new_size =", new_size/1024) - # Send PNG data - encoded = _encode_png(base_image) - send_image(self.dev, encoded) + # Send image data (auto JPEG fallback when payload exceeds device limit) + send_pil_image_auto(self.dev, base_image, max_bytes=MAX_IMAGE_PAYLOAD_DEFAULT) diff --git a/main.py b/main.py index 38676496..9872f533 100755 --- a/main.py +++ b/main.py @@ -68,7 +68,7 @@ # If pystray cannot be loaded do not stop the program, just ignore it. The tray icon will not be displayed. pass -MAIN_DIRECTORY = str(Path(__file__).parent.resolve()) + "/" +MAIN_DIRECTORY = Path(__file__).resolve().parent if __name__ == "__main__": @@ -110,17 +110,20 @@ def clean_stop(tray_icon=None): except: os._exit(0) - def on_signal_caught(signum, frame=None): logger.info("Caught signal %d, exiting" % signum) clean_stop() - def on_configure_tray(tray_icon, item): logger.info("Configure from tray icon") - subprocess.Popen(f'"{MAIN_DIRECTORY}{glob.glob("configure.*", root_dir=MAIN_DIRECTORY)[0]}"', shell=True) - clean_stop(tray_icon) + configure_file = next(MAIN_DIRECTORY.glob("configure.*")) + + if platform.system() == "Windows": + subprocess.Popen([str(configure_file)], shell=True) + else: + subprocess.Popen([str(configure_file)]) + clean_stop(tray_icon) def on_exit_tray(tray_icon, item): logger.info("Exit from tray icon") @@ -165,7 +168,7 @@ def on_win32_wm_event(hWnd, msg, wParam, lParam): tray_icon = pystray.Icon( name='Turing System Monitor', title='Turing System Monitor', - icon=Image.open(MAIN_DIRECTORY + "res/icons/monitor-icon-17865/64.png"), + icon=Image.open(MAIN_DIRECTORY / "res/icons/monitor-icon-17865/64.png"), menu=pystray.Menu( pystray.MenuItem( text='Configure', diff --git a/res/backgrounds/example_1920x480.png b/res/backgrounds/example_1920x480.png new file mode 100644 index 00000000..65e3cad4 Binary files /dev/null and b/res/backgrounds/example_1920x480.png differ diff --git a/simple-program.py b/simple-program.py index 287c92d1..e088f967 100755 --- a/simple-program.py +++ b/simple-program.py @@ -35,6 +35,7 @@ from library.lcd.lcd_comm_rev_b import LcdCommRevB from library.lcd.lcd_comm_rev_c import LcdCommRevC from library.lcd.lcd_comm_rev_d import LcdCommRevD +from library.lcd.lcd_comm_turing_usb import LcdCommTuringUSB from library.lcd.lcd_comm_weact_a import LcdCommWeActA from library.lcd.lcd_comm_weact_b import LcdCommWeActB from library.lcd.lcd_simulated import LcdSimulated @@ -52,7 +53,7 @@ # - D for Kipye Qiye Smart Display 3.5" # - SIMU for simulated display (image written in screencap.png) # To identify your smart screen: https://github.com/mathoudebine/turing-smart-screen-python/wiki/Hardware-revisions -REVISION = "A" +REVISION = "USB_C" # Display width & height in pixels for portrait orientation # /!\ Do not switch width/height here for landscape, use lcd_comm.SetOrientation below @@ -60,7 +61,7 @@ # 480x480 for 2.1" models # 480x800 for 5" models # 480x1920 for 8.8" models -WIDTH, HEIGHT = 320, 480 +WIDTH, HEIGHT = 480, 1920 assert WIDTH <= HEIGHT, "Indicate display width/height for PORTRAIT orientation: width <= height" @@ -101,6 +102,9 @@ def sighandler(signum, frame): elif REVISION == "WEACT_B": logger.info("Selected Hardware WeAct Studio Display FS V1 0.96\"") lcd_comm = LcdCommWeActB(com_port=COM_PORT, display_width=WIDTH, display_height=HEIGHT) + elif REVISION == "USB_C": + logger.info("Selected Hardware Revision USB C (Turing Smart Screen 8.8\" or 9.2\")") + lcd_comm = LcdCommTuringUSB(com_port=COM_PORT, display_width=WIDTH, display_height=HEIGHT) elif REVISION == "SIMU": logger.info("Selected Simulated LCD") lcd_comm = LcdSimulated(display_width=WIDTH, display_height=HEIGHT)