diff --git a/README.md b/README.md index 1750e56..492956d 100644 --- a/README.md +++ b/README.md @@ -71,13 +71,35 @@ d.long_click(0.5, 0.4) d.swipe(0.5, 0.8, 0.5, 0.4, speed=2000) d.input_text(0.5, 0.5, "adbcdfg") + +# Device touch gersture +d.gesture.start(630, 984, interval=.5).move(0.2, 0.4).pause(interval=1).move(0.5, 0.6).action() +d.gesture.start(0.77, 0.49).action() # click + + # App Element -d(text="showToast").info +d(id="swiper").exists() +d(type="Button", text="tab_recrod").exists() +d(text="tab_recrod", isAfter=True).exists() +d(text="tab_recrod").click_if_exists() +d(type="Button", index=3).click() +d(text="tab_recrod").double_click() +d(text="tab_recrod").long_click() + +component: ComponentData = d(type="ListItem", index=1).find_component() +d(type="ListItem").drag_to(component) + +d(text="tab_recrod").input_text("abc") +d(text="tab_recrod").clear_text() +d(text="tab_recrod").pinch_in() +d(text="tab_recrod").pinch_out() + +d(text="tab_recrod").info # { # "id": "", # "key": "", # "type": "Button", -# "text": "showToast", +# "text": "tab_recrod", # "description": "", # "isSelected": False, # "isChecked": False, @@ -99,21 +121,6 @@ d(text="showToast").info # } # } -d(id="swiper").exists() -d(type="Button", text="tab_recrod").exists() -d(text="tab_recrod", isAfter=True).exists() -d(text="tab_recrod").click_if_exists() -d(type="Button", index=3).click() -d(text="tab_recrod").double_click() -d(text="tab_recrod").long_click() - -component: ComponentData = d(type="ListItem", index=1).find_component() -d(type="ListItem").drag_to(component) - -d(text="tab_recrod").input_text("abc") -d(text="tab_recrod").clear_text() -d(text="tab_recrod").pinch_in() -d(text="tab_recrod").pinch_out() # Dump hierarchy d.dump_hierarchy() @@ -121,7 +128,7 @@ d.dump_hierarchy() # Toast Watcher d.toast_watcher.start() d(type="Button", text="tab_recrod").click() # 触发toast的操作 -toast = d.toast_watcher.get() +toast = d.toast_watcher.get_toast() ``` diff --git a/agent.so b/agent.so new file mode 100644 index 0000000..8298f2c Binary files /dev/null and b/agent.so differ diff --git a/docs/DEVELOP.md b/docs/DEVELOP.md index 96a90b3..338635e 100644 --- a/docs/DEVELOP.md +++ b/docs/DEVELOP.md @@ -189,6 +189,46 @@ {"result":{"bundleName":"com.samples.test.uitest","text":"testMessage","type":"Toast"}} ``` +### PointerMatrix.create +**send** +``` +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.create","this":null,"args":[1,104],"message_type":"hypium"},"request_id":"20240906204116056319"} +``` +**recv** +``` +{"result":"PointerMatrix#0"} +``` + +### PointerMatrix.setPoint +**send** +``` +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.setPoint","this":"PointerMatrix#0","args":[0,0,{"x":65536630,"y":984}],"message_type":"hypium"},"request_id":"20240906204116061416"} + +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.setPoint","this":"PointerMatrix#0","args":[0,1,{"x":3277430,"y":984}],"message_type":"hypium"},"request_id":"20240906204116069343"} + +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.setPoint","this":"PointerMatrix#0","args":[0,2,{"x":3277393,"y":994}],"message_type":"hypium"},"request_id":"20240906204116072723"} + +... + +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.setPoint","this":"PointerMatrix#0","args":[0,102,{"x":2622070,"y":1632}],"message_type":"hypium"},"request_id":"20240906204116359992"} + +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"PointerMatrix.setPoint","this":"PointerMatrix#0","args":[0,103,{"x":633,"y":1632}],"message_type":"hypium"},"request_id":"20240906204116363228"} +``` +**recv** +``` +{"result":null} +``` + +### injectMultiPointerAction +**send** +``` +{"module":"com.ohos.devicetest.hypiumApiHelper","method":"callHypiumApi","params":{"api":"Driver.injectMultiPointerAction","this":"Driver#0","args":["PointerMatrix#0",2000],"message_type":"hypium"},"request_id":"20240906204116366578"} +``` +**recv** +``` +{"result":true} +``` + ## Component ### Component.getId diff --git a/hmdriver2/_gesture.py b/hmdriver2/_gesture.py new file mode 100644 index 0000000..25cf4ee --- /dev/null +++ b/hmdriver2/_gesture.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- + +import math +from typing import List, Union +from . import logger +from .utils import delay +from .proto import HypiumResponse, Point +from .exception import InjectGestureError + + +class _Gesture: + SAMPLE_TIME_MIN = 10 + SAMPLE_TIME_NORMAL = 50 + SAMPLE_TIME_MAX = 100 + + def __init__(self, driver: "Driver", sampling_ms=50): # type: ignore + """ + Initialize a gesture object. + + Args: + driver (Driver): The driver object to interact with. + sampling_ms (int): Sampling time for gesture operation points in milliseconds. Default is 50. + """ + self.driver = driver + self.steps: List[GestureStep] = [] + self.sampling_ms = self._validate_sampling_time(sampling_ms) + + def _validate_sampling_time(self, sampling_time: int) -> int: + """ + Validate the input sampling time. + + Args: + sampling_time (int): The given sampling time. + + Returns: + int: Valid sampling time within allowed range. + """ + if _Gesture.SAMPLE_TIME_MIN <= sampling_time <= _Gesture.SAMPLE_TIME_MAX: + return sampling_time + return _Gesture.SAMPLE_TIME_NORMAL + + def _release(self): + self.steps = [] + + def start(self, x: Union[int, float], y: Union[int, float], interval: float = 0.5) -> '_Gesture': + """ + Start gesture operation. + + Args: + x: oordinate as a percentage or absolute value. + y: coordinate as a percentage or absolute value. + interval (float, optional): Duration to hold at start position in seconds. Default is 0.5. + + Returns: + Gesture: Self instance to allow method chaining. + """ + self._ensure_can_start() + self._add_step(x, y, "start", interval) + return self + + def move(self, x: Union[int, float], y: Union[int, float], interval: float = 0.5) -> '_Gesture': + """ + Move to specified position. + + Args: + x: coordinate as a percentage or absolute value. + y: coordinate as a percentage or absolute value. + interval (float, optional): Duration of move in seconds. Default is 0.5. + + Returns: + Gesture: Self instance to allow method chaining. + """ + self._ensure_started() + self._add_step(x, y, "move", interval) + return self + + def pause(self, interval: float = 1) -> '_Gesture': + """ + Pause at current position for specified duration. + + Args: + interval (float, optional): Duration to pause in seconds. Default is 1. + + Returns: + Gesture: Self instance to allow method chaining. + """ + self._ensure_started() + pos = self.steps[-1].pos + self.steps.append(GestureStep(pos, "pause", interval)) + return self + + @delay + def action(self): + """ + Execute the gesture action. + """ + logger.info(f">>>Gesture steps: {self.steps}") + total_points = self._calculate_total_points() + + pointer_matrix = self._create_pointer_matrix(total_points) + self._generate_points(pointer_matrix, total_points) + + self._inject_pointer_actions(pointer_matrix) + + self._release() + + def _create_pointer_matrix(self, total_points: int): + """ + Create a pointer matrix for the gesture. + + Args: + total_points (int): Total number of points. + + Returns: + PointerMatrix: Pointer matrix object. + """ + fingers = 1 + api = "PointerMatrix.create" + data: HypiumResponse = self.driver._client.invoke(api, this=None, args=[fingers, total_points]) + return data.result + + def _inject_pointer_actions(self, pointer_matrix): + """ + Inject pointer actions into the driver. + + Args: + pointer_matrix (PointerMatrix): Pointer matrix to inject. + """ + api = "Driver.injectMultiPointerAction" + self.driver._client.invoke(api, this=self.driver._this_driver, args=[pointer_matrix, 2000]) + + def _add_step(self, x: int, y: int, step_type: str, interval: float): + """ + Add a step to the gesture. + + Args: + x (int): x-coordinate of the point. + y (int): y-coordinate of the point. + step_type (str): Type of step ("start", "move", or "pause"). + interval (float): Interval duration in seconds. + """ + point: Point = self.driver._to_abs_pos(x, y) + step = GestureStep(point.to_tuple(), step_type, interval) + self.steps.append(step) + + def _ensure_can_start(self): + """ + Ensure that the gesture can start. + """ + if self.steps: + raise InjectGestureError("Can't start gesture twice") + + def _ensure_started(self): + """ + Ensure that the gesture has started. + """ + if not self.steps: + raise InjectGestureError("Please call gesture.start first") + + def _generate_points(self, pointer_matrix, total_points): + """ + Generate points for the pointer matrix. + + Args: + pointer_matrix (PointerMatrix): Pointer matrix to populate. + total_points (int): Total points to generate. + """ + + def set_point(point_index: int, point: Point, interval: int = None): + """ + Set a point in the pointer matrix. + + Args: + point_index (int): Index of the point. + point (Point): The point object. + interval (int, optional): Interval duration. + """ + if interval is not None: + point.x += 65536 * interval + api = "PointerMatrix.setPoint" + self.driver._client.invoke(api, this=pointer_matrix, args=[0, point_index, point.to_dict()]) + + point_index = 0 + + for index, step in enumerate(self.steps): + if step.type == "start": + point_index = self._generate_start_point(step, point_index, set_point) + elif step.type == "move": + point_index = self._generate_move_points(index, step, point_index, set_point) + elif step.type == "pause": + point_index = self._generate_pause_points(step, point_index, set_point) + + step = self.steps[-1] + while point_index < total_points: + set_point(point_index, Point(*step.pos)) + point_index += 1 + + def _generate_start_point(self, step, point_index, set_point): + """ + Generate start points. + + Args: + step (GestureStep): Gesture step. + point_index (int): Current point index. + set_point (function): Function to set the point in pointer matrix. + + Returns: + int: Updated point index. + """ + set_point(point_index, Point(*step.pos), step.interval) + point_index += 1 + pos = step.pos[0], step.pos[1] + set_point(point_index, Point(*pos)) + return point_index + 1 + + def _generate_move_points(self, index, step, point_index, set_point): + """ + Generate move points. + + Args: + index (int): Step index. + step (GestureStep): Gesture step. + point_index (int): Current point index. + set_point (function): Function to set the point in pointer matrix. + + Returns: + int: Updated point index. + """ + last_step = self.steps[index - 1] + offset_x = step.pos[0] - last_step.pos[0] + offset_y = step.pos[1] - last_step.pos[1] + distance = int(math.sqrt(offset_x ** 2 + offset_y ** 2)) + interval_ms = step.interval + cur_steps = self._calculate_move_step_points(distance, interval_ms) + + step_x = int(offset_x / cur_steps) + step_y = int(offset_y / cur_steps) + + set_point(point_index - 1, Point(*last_step.pos), self.sampling_ms) + x, y = last_step.pos[0], last_step.pos[1] + for _ in range(cur_steps): + x += step_x + y += step_y + set_point(point_index, Point(x, y), self.sampling_ms) + point_index += 1 + return point_index + + def _generate_pause_points(self, step, point_index, set_point): + """ + Generate pause points. + + Args: + step (GestureStep): Gesture step. + point_index (int): Current point index. + set_point (function): Function to set the point in pointer matrix. + + Returns: + int: Updated point index. + """ + points = int(step.interval / self.sampling_ms) + for _ in range(points): + set_point(point_index, Point(*step.pos), int(step.interval / self.sampling_ms)) + point_index += 1 + pos = step.pos[0] + 3, step.pos[1] + set_point(point_index, Point(*pos)) + return point_index + 1 + + def _calculate_total_points(self) -> int: + """ + Calculate the total number of points needed for the gesture. + + Returns: + int: Total points. + """ + total_points = 0 + for index, step in enumerate(self.steps): + if step.type == "start": + total_points += 2 + elif step.type == "move": + total_points += self._calculate_move_step_points( + *self._calculate_move_distance(step, index)) + elif step.type == "pause": + points = int(step.interval / self.sampling_ms) + total_points += points + 1 + return total_points + + def _calculate_move_distance(self, step, index): + """ + Calculate move distance and interval. + + Args: + step (GestureStep): Gesture step. + index (int): Step index. + + Returns: + tuple: Tuple (distance, interval_ms). + """ + last_step = self.steps[index - 1] + offset_x = step.pos[0] - last_step.pos[0] + offset_y = step.pos[1] - last_step.pos[1] + distance = int(math.sqrt(offset_x ** 2 + offset_y ** 2)) + interval_ms = step.interval + return distance, interval_ms + + def _calculate_move_step_points(self, distance: int, interval_ms: float) -> int: + """ + Calculate the number of move step points based on distance and time. + + Args: + distance (int): Distance to move. + interval_ms (float): Move duration in milliseconds. + + Returns: + int: Number of move step points. + """ + if interval_ms < self.sampling_ms or distance < 1: + return 1 + nums = interval_ms / self.sampling_ms + return distance if nums > distance else int(nums) + + +class GestureStep: + """Class to store each step of a gesture, not to be used directly, use via Gesture class""" + + def __init__(self, pos: tuple, step_type: str, interval: float): + """ + Initialize a gesture step. + + Args: + pos (tuple): Tuple containing x and y coordinates. + step_type (str): Type of step ("start", "move", "pause"). + interval (float): Interval duration in seconds. + """ + self.pos = pos[0], pos[1] + self.interval = int(interval * 1000) + self.type = step_type + + def __repr__(self): + return f"GestureStep(pos=({self.pos[0]}, {self.pos[1]}), type='{self.type}', interval={self.interval})" + + def __str__(self): + return self.__repr__() \ No newline at end of file diff --git a/hmdriver2/_toast.py b/hmdriver2/_toast.py deleted file mode 100644 index 7684490..0000000 --- a/hmdriver2/_toast.py +++ /dev/null @@ -1,35 +0,0 @@ -# -*- coding: utf-8 -*- - -from .proto import HypiumResponse - - -class ToastWatcher: - def __init__(self, driver: "Driver"): # type: ignore - self.driver = driver - - def start(self) -> bool: - """ - Initiates the observer to listen for a UI toast event - - Returns: - bool: True if the observer starts successfully, else False. - """ - api = "Driver.uiEventObserverOnce" - resp: HypiumResponse = self.driver._invoke(api, args=["toastShow"]) - return resp.result - - def get(self, timeout: int = 3) -> str: - """ - Read the latest toast message content from the recent period. - - Args: - timeout (int): The maximum time to wait for a toast to appear if there are no matching toasts within the given time. - - Returns: - str: The content of the latest toast message. - """ - api = "Driver.getRecentUiEvent" - resp: HypiumResponse = self.driver._invoke(api, args=[timeout]) - if resp.result: - return resp.result.get("text") - return None \ No newline at end of file diff --git a/hmdriver2/_uiobject.py b/hmdriver2/_uiobject.py index 288b9d9..5ecbf99 100644 --- a/hmdriver2/_uiobject.py +++ b/hmdriver2/_uiobject.py @@ -5,6 +5,7 @@ from typing import List, Union from . import logger +from .utils import delay from ._client import HMClient from .exception import ElementNotFoundError from .proto import DriverData, ComponentData, ByData, HypiumResponse, Point, Rect, ElementInfo @@ -67,18 +68,18 @@ def __len__(self): return self.count def exists(self, retries: int = 2, wait_time=1) -> bool: - component = self.find_component(retries, wait_time) - return True if component else False + obj = self.find_component(retries, wait_time) + return True if obj else False def __set_component(self, component: ComponentData): self._component = component - def find_component(self, retries: int = 1, wait_time=1) -> Union[ComponentData, None]: + def find_component(self, retries: int = 1, wait_time=1) -> 'UiObject': for attempt in range(retries): components = self.__find_components() if components and self._index < len(components): self.__set_component(components[self._index]) - return self._component + return self if attempt < retries: time.sleep(wait_time) @@ -209,32 +210,41 @@ def info(self) -> ElementInfo: bounds=self.bounds, boundsCenter=self.boundsCenter) + @delay def click(self): return self.__operate("Component.click") + @delay def click_if_exists(self): try: return self.__operate("Component.click") except ElementNotFoundError: pass + @delay def double_click(self): return self.__operate("Component.doubleClick") + @delay def long_click(self): return self.__operate("Component.longClick") + @delay def drag_to(self, component: ComponentData): return self.__operate("Component.dragTo", [component.value]) + @delay def input_text(self, text: str): return self.__operate("Component.inputText", [text]) + @delay def clear_text(self): return self.__operate("Component.clearText") + @delay def pinch_in(self, scale: float = 0.5): return self.__operate("Component.pinchIn", [scale]) + @delay def pinch_out(self, scale: float = 2): return self.__operate("Component.pinchOut", [scale]) diff --git a/hmdriver2/driver.py b/hmdriver2/driver.py index fe9f0fb..5f52f64 100644 --- a/hmdriver2/driver.py +++ b/hmdriver2/driver.py @@ -10,11 +10,12 @@ except ImportError: from cached_property import cached_property +from .utils import delay from ._client import HMClient from ._uiobject import UiObject from .hdc import list_devices -from ._toast import ToastWatcher from .exception import DeviceNotFoundError +from ._gesture import _Gesture from .proto import HypiumResponse, KeyCode, Point, DisplayRotation, DeviceInfo @@ -54,7 +55,6 @@ def _invoke(self, api: str, args: List = []) -> HypiumResponse: return self._client.invoke(api, this=self._this_driver, args=args) def start_app(self, package_name: str, page_name: str = "MainAbility"): - self.unlock() self.hdc.start_app(package_name, page_name) def force_start_app(self, package_name: str, page_name: str = "MainAbility"): @@ -84,12 +84,35 @@ def list_apps(self) -> List: def has_app(self, package_name: str) -> bool: return self.hdc.has_app(package_name) + @cached_property + def toast_watcher(self): + + obj = self + + class _Watcher: + def start(self) -> bool: + api = "Driver.uiEventObserverOnce" + resp: HypiumResponse = obj._invoke(api, args=["toastShow"]) + return resp.result + + def get_toast(self, timeout: int = 3) -> str: + api = "Driver.getRecentUiEvent" + resp: HypiumResponse = obj._invoke(api, args=[timeout]) + if resp.result: + return resp.result.get("text") + return None + + return _Watcher() + + @delay def go_back(self): self.hdc.send_key(KeyCode.BACK) + @delay def go_home(self): self.hdc.send_key(KeyCode.HOME) + @delay def press_key(self, key_code: Union[KeyCode, int]): self.hdc.send_key(key_code) @@ -100,11 +123,11 @@ def screen_off(self): self.hdc.wakeup() self.press_key(KeyCode.POWER) + @delay def unlock(self): self.screen_on() w, h = self.display_size - self.hdc.swipe(0.5 * w, 0.8 * h, 0.5 * w, 0.2 * h) - time.sleep(.5) + self.hdc.swipe(0.5 * w, 0.8 * h, 0.5 * w, 0.2 * h, speed=600) @cached_property def display_size(self) -> Tuple[int, int]: @@ -139,10 +162,7 @@ def device_info(self) -> DeviceInfo: displayRotation=self.display_rotation ) - @cached_property - def toast_watcher(self): - return ToastWatcher(self) - + @delay def open_url(self, url: str): self.hdc.shell(f"aa start -U {url}") @@ -208,6 +228,7 @@ def _to_abs_pos(self, x: Union[int, float], y: Union[int, float]) -> Point: y = int(h * y) return Point(x, y) + @delay def click(self, x: Union[int, float], y: Union[int, float]): # self.hdc.tap(point.x, point.y) @@ -215,16 +236,19 @@ def click(self, x: Union[int, float], y: Union[int, float]): api = "Driver.click" self._invoke(api, args=[point.x, point.y]) + @delay def double_click(self, x: Union[int, float], y: Union[int, float]): point = self._to_abs_pos(x, y) api = "Driver.doubleClick" self._invoke(api, args=[point.x, point.y]) + @delay def long_click(self, x: Union[int, float], y: Union[int, float]): point = self._to_abs_pos(x, y) api = "Driver.longClick" self._invoke(api, args=[point.x, point.y]) + @delay def swipe(self, x1, y1, x2, y2, speed=1000): """ Perform a swipe action on the device screen. @@ -241,10 +265,15 @@ def swipe(self, x1, y1, x2, y2, speed=1000): self.hdc.swipe(point1.x, point1.y, point2.x, point2.y, speed=speed) + @delay def input_text(self, x, y, text: str): point = self._to_abs_pos(x, y) self.hdc.input_text(point.x, point.y, text) + @cached_property + def gesture(self): + return _Gesture(self) + def dump_hierarchy(self) -> Dict: """ Dump the UI hierarchy of the device screen. diff --git a/hmdriver2/exception.py b/hmdriver2/exception.py index 3e0ae08..d925bf1 100644 --- a/hmdriver2/exception.py +++ b/hmdriver2/exception.py @@ -22,3 +22,7 @@ class HdcError(Exception): class InvokeHypiumError(Exception): pass + + +class InjectGestureError(Exception): + pass diff --git a/hmdriver2/hdc.py b/hmdriver2/hdc.py index 5fe5fc6..5dba2a9 100644 --- a/hmdriver2/hdc.py +++ b/hmdriver2/hdc.py @@ -4,33 +4,14 @@ import json import uuid import shlex -import socket import re import subprocess from typing import Union, List, Dict from . import logger +from .utils import FreePort from .proto import CommandResult, KeyCode - - -class _FreePort: - def __init__(self): - self._start = 10000 - self._end = 20000 - self._now = self._start - 1 - - def get(self) -> int: - while True: - self._now += 1 - if self._now > self._end: - self._now = self._start - if not self.is_port_in_use(self._now): - return self._now - - @staticmethod - def is_port_in_use(port: int) -> bool: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - return s.connect_ex(('localhost', port)) == 0 +from .exception import HdcError def _execute_command(cmdargs: Union[str, List[str]]) -> CommandResult: @@ -67,16 +48,16 @@ def __init__(self, serial: str) -> None: self.serial = serial def forward_port(self, rport: int) -> int: - lport: int = _FreePort().get() + lport: int = FreePort().get() result = _execute_command(f"hdc -t {self.serial} fport tcp:{lport} tcp:{rport}") if result.exit_code != 0: - raise RuntimeError("HDC forward port error", result.output) + raise HdcError("HDC forward port error", result.output) return lport def rm_forward(self, lport: int, rport: int) -> int: result = _execute_command(f"hdc -t {self.serial} fport rm tcp:{lport} tcp:{rport}") if result.exit_code != 0: - raise RuntimeError("HDC rm forward error", result.output) + raise HdcError("HDC rm forward error", result.output) return lport def list_fport(self) -> List: @@ -85,38 +66,38 @@ def list_fport(self) -> List: """ result = _execute_command(f"hdc -t {self.serial} fport ls") if result.exit_code != 0: - raise RuntimeError("HDC forward list error", result.output) + raise HdcError("HDC forward list error", result.output) pattern = re.compile(r"tcp:\d+ tcp:\d+") return pattern.findall(result.output) def send_file(self, lpath: str, rpath: str): result = _execute_command(f"hdc -t {self.serial} file send {lpath} {rpath}") if result.exit_code != 0: - raise RuntimeError("HDC send file error", result.output) + raise HdcError("HDC send file error", result.output) return result def recv_file(self, rpath: str, lpath: str): result = _execute_command(f"hdc -t {self.serial} file recv {rpath} {lpath}") if result.exit_code != 0: - raise RuntimeError("HDC receive file error", result.output) + raise HdcError("HDC receive file error", result.output) return result def shell(self, cmd: str, error_raise=True) -> CommandResult: result = _execute_command(f"hdc -t {self.serial} shell {cmd}") if result.error and error_raise: - raise RuntimeError("HDC shell error", f"{cmd}\n{result.output}\n{result.error}") + raise HdcError("HDC shell error", f"{cmd}\n{result.output}\n{result.error}") return result def uninstall(self, bundlename: str): result = _execute_command(f"hdc -t {self.serial} uninstall {bundlename}") if result.exit_code != 0: - raise RuntimeError("HDC uninstall error", result.output) + raise HdcError("HDC uninstall error", result.output) return result def install(self, apkpath: str): result = _execute_command(f"hdc -t {self.serial} install {apkpath}") if result.exit_code != 0: - raise RuntimeError("HDC install error", result.output) + raise HdcError("HDC install error", result.output) return result def list_apps(self) -> List[str]: @@ -185,7 +166,7 @@ def send_key(self, key_code: Union[KeyCode, int]) -> None: MAX = 3200 if key_code > MAX: - raise ValueError("Invalid HDC keycode") + raise HdcError("Invalid HDC keycode") self.shell(f"uitest uiInput keyEvent {key_code}") diff --git a/hmdriver2/proto.py b/hmdriver2/proto.py index 518f735..2cafd87 100644 --- a/hmdriver2/proto.py +++ b/hmdriver2/proto.py @@ -2,7 +2,7 @@ import json from enum import Enum -from typing import Union, List +from typing import Union, List, Tuple from dataclasses import dataclass, asdict @@ -27,6 +27,15 @@ def from_value(cls, value): raise ValueError(f"No matching DisplayRotation for value: {value}") +class AppState: + INIT = 0 # 初始化状态,应用正在初始化 + READY = 1 # 就绪状态,应用已初始化完毕 + FOREGROUND = 2 # 前台状态,应用位于前台 + FOCUS = 3 # 获焦状态。(预留状态,当前暂不支持) + BACKGROUND = 4 # 后台状态,应用位于后台 + EXIT = 5 # 退出状态,应用已退出 + + @dataclass class DeviceInfo: productName: str @@ -74,6 +83,9 @@ class Rect: top: int bottom: int + def get_center(self) -> Tuple[int, int]: + return int((self.left + self.right) / 2), int((self.top + self.bottom) / 2) + @dataclass class Point: diff --git a/hmdriver2/utils.py b/hmdriver2/utils.py new file mode 100644 index 0000000..223aae2 --- /dev/null +++ b/hmdriver2/utils.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- + + +import time +import socket +from functools import wraps + + +def delay(func): + """ + After each UI operation, it is necessary to wait for a while to ensure the stability of the UI, + so as not to affect the next UI operation. + """ + DELAY_TIME = 0.4 + + @wraps(func) + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + time.sleep(DELAY_TIME) + return result + return wrapper + + +class FreePort: + def __init__(self): + self._start = 10000 + self._end = 20000 + self._now = self._start - 1 + + def get(self) -> int: + while True: + self._now += 1 + if self._now > self._end: + self._now = self._start + if not self.is_port_in_use(self._now): + return self._now + + @staticmethod + def is_port_in_use(port: int) -> bool: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(('localhost', port)) == 0 \ No newline at end of file diff --git a/tests/test_driver.py b/tests/test_driver.py index 6a155f5..6fe5976 100644 --- a/tests/test_driver.py +++ b/tests/test_driver.py @@ -28,11 +28,12 @@ def test_device_info(d): def test_force_start_app(d): - d.force_start_app("com.kuaishou.hmapp", "EntryAbility") + d.unlock() + d.force_start_app("com.samples.test.uitest", "EntryAbility") def test_clear_app(d): - d.clear_app("com.kuaishou.hmapp") + d.clear_app("com.samples.test.uitest") def test_install_app(d): @@ -150,10 +151,19 @@ def test_dump_hierarchy(d): def test_toast(d): - d.unlock() d.force_start_app("com.samples.test.uitest", "EntryAbility") d.toast_watcher.start() d(type="Button", text="showToast").click() - toast = d.toast_watcher.get() + toast = d.toast_watcher.get_toast() print(f"toast: {toast}") - assert toast == "testMessage" \ No newline at end of file + assert toast == "testMessage" + + +def test_gesture(d): + d(id="drag").click() + d.gesture.start(630, 984, interval=1).move(0.2, 0.4, interval=.5).pause(interval=1).move(0.5, 0.6, interval=.5).pause(interval=1).action() + d.go_back() + + +def test_gesture_click(d): + d.gesture.start(0.77, 0.49).action() \ No newline at end of file diff --git a/tests/test_element.py b/tests/test_element.py index 4a4cfda..be1ce65 100644 --- a/tests/test_element.py +++ b/tests/test_element.py @@ -71,6 +71,7 @@ def test_info(d): def test_click(d): d(text="showToast1").click_if_exists() + d(text="showToast").find_component().click() d(type="Button", index=3).click() d.click(0.5, 0.2)