From 4d7bceaded947bd63d737de180064679ad4c77b8 Mon Sep 17 00:00:00 2001 From: codematrixer Date: Mon, 9 Sep 2024 15:12:27 +0800 Subject: [PATCH] feat: screen recorder --- hmdriver2/_gesture.py | 3 +- hmdriver2/_screenrecord.py | 118 +++++++++++++++++++++++++++++++++++++ hmdriver2/driver.py | 16 +++-- hmdriver2/exception.py | 4 ++ 4 files changed, 135 insertions(+), 6 deletions(-) create mode 100644 hmdriver2/_screenrecord.py diff --git a/hmdriver2/_gesture.py b/hmdriver2/_gesture.py index 25cf4ee..29383c9 100644 --- a/hmdriver2/_gesture.py +++ b/hmdriver2/_gesture.py @@ -4,6 +4,7 @@ from typing import List, Union from . import logger from .utils import delay +from .driver import Driver from .proto import HypiumResponse, Point from .exception import InjectGestureError @@ -13,7 +14,7 @@ class _Gesture: SAMPLE_TIME_NORMAL = 50 SAMPLE_TIME_MAX = 100 - def __init__(self, driver: "Driver", sampling_ms=50): # type: ignore + def __init__(self, driver: Driver, sampling_ms=50): """ Initialize a gesture object. diff --git a/hmdriver2/_screenrecord.py b/hmdriver2/_screenrecord.py new file mode 100644 index 0000000..f71c17d --- /dev/null +++ b/hmdriver2/_screenrecord.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- + +import socket +import json +import time +import os +import typing +import subprocess +import atexit +import hashlib +import threading +import numpy as np +from queue import Queue +from datetime import datetime + +import cv2 + +from . import logger +from .driver import Driver +from .exception import ScreenRecordError + + +# Developing + + +class _ScreenRecord: + def __init__(self, driver: Driver): + self._client = driver._client + self.video_path = None + + self.jpeg_queue = Queue() + self.threads: typing.List[threading.Thread] = [] + self.stop_event = threading.Event() + + def _send_message(self, api: str, args: list): + _msg = { + "module": "com.ohos.devicetest.hypiumApiHelper", + "method": "Captures", + "params": { + "api": api, + "args": args + }, + "request_id": datetime.now().strftime("%Y%m%d%H%M%S%f") + } + self._client._send_msg(_msg) + + def start(self, video_path: str): + + self.video_path = video_path + + self._send_message("startCaptureScreen", []) + + reply: str = self._client._recv_msg(1024, decode=True) + if "true" in reply: + record_th = threading.Thread(target=self._record_worker) + writer_th = threading.Thread(target=self._video_writer) + record_th.start() + writer_th.start() + self.threads.extend([record_th, writer_th]) + else: + raise ScreenRecordError("Failed to start device screen capture.") + + def _record_worker(self): + """Capture screen frames and save current frames.""" + + # JPEG start and end markers. + start_flag = b'\xff\xd8' + end_flag = b'\xff\xd9' + buffer = bytearray() + while not self.stop_event.is_set(): + try: + buffer += self._client._recv_msg(4096 * 1024) + except Exception as e: + print(f"Error receiving data: {e}") + break + + start_idx = buffer.find(start_flag) + end_idx = buffer.find(end_flag) + while start_idx != -1 and end_idx != -1 and end_idx > start_idx: + # Extract one JPEG image + jpeg_image: bytearray = buffer[start_idx:end_idx + 2] + self.jpeg_queue.put(jpeg_image) + + buffer = buffer[end_idx + 2:] + + # Search for the next JPEG image in the buffer + start_idx = buffer.find(start_flag) + end_idx = buffer.find(end_flag) + + def _video_writer(self): + """Write frames to video file.""" + cv2_instance = None + while not self.stop_event.is_set(): + if not self.jpeg_queue.empty(): + jpeg_image = self.jpeg_queue.get(timeout=0.1) + img = cv2.imdecode(np.frombuffer(jpeg_image, np.uint8), cv2.IMREAD_COLOR) + if cv2_instance is None: + height, width = img.shape[:2] + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + cv2_instance = cv2.VideoWriter(self.video_path, fourcc, 10, (width, height)) + + cv2_instance.write(img) + + if cv2_instance: + cv2_instance.release() + + def stop(self) -> str: + try: + self.stop_event.set() + for t in self.threads: + t.join() + + self._send_message("stopCaptureScreen", []) + self._client._recv_msg(1024, decode=True) + except Exception as e: + logger.error(f"An error occurred: {e}") + + return self.video_path diff --git a/hmdriver2/driver.py b/hmdriver2/driver.py index 5f52f64..e259b5f 100644 --- a/hmdriver2/driver.py +++ b/hmdriver2/driver.py @@ -15,7 +15,6 @@ from ._uiobject import UiObject from .hdc import list_devices from .exception import DeviceNotFoundError -from ._gesture import _Gesture from .proto import HypiumResponse, KeyCode, Point, DisplayRotation, DeviceInfo @@ -270,10 +269,6 @@ def input_text(self, x, y, text: str): point = self._to_abs_pos(x, y) self.hdc.input_text(point.x, point.y, text) - @cached_property - def gesture(self): - return _Gesture(self) - def dump_hierarchy(self) -> Dict: """ Dump the UI hierarchy of the device screen. @@ -282,3 +277,14 @@ def dump_hierarchy(self) -> Dict: Dict: The dumped UI hierarchy as a dictionary. """ return self.hdc.dump_hierarchy() + + @cached_property + def gesture(self): + from ._gesture import _Gesture + return _Gesture(self) + + # @cached_property + # def screenrecord(self): + # # FIXME + # from ._screenrecord import _ScreenRecord + # return _ScreenRecord(self) diff --git a/hmdriver2/exception.py b/hmdriver2/exception.py index d925bf1..03e35ec 100644 --- a/hmdriver2/exception.py +++ b/hmdriver2/exception.py @@ -26,3 +26,7 @@ class InvokeHypiumError(Exception): class InjectGestureError(Exception): pass + + +class ScreenRecordError(Exception): + pass