diff --git a/pymobiledevice3/__main__.py b/pymobiledevice3/__main__.py index ee2efb78..ec2ad515 100644 --- a/pymobiledevice3/__main__.py +++ b/pymobiledevice3/__main__.py @@ -6,10 +6,12 @@ import coloredlogs from pymobiledevice3.exceptions import AccessDeniedError, ConnectionFailedToUsbmuxdError, DeprecationError, \ - DeveloperModeError, DeveloperModeIsNotEnabledError, DeviceHasPasscodeSetError, DeviceNotFoundError, InternalError, \ - InvalidServiceError, MessageNotSupportedError, MissingValueError, NoDeviceConnectedError, NoDeviceSelectedError, \ - NotEnoughDiskSpaceError, NotPairedError, PairingDialogResponsePendingError, PasswordRequiredError, \ - RSDRequiredError, SetProhibitedError, TunneldConnectionError, UserDeniedPairingError + DeveloperModeError, DeveloperModeIsNotEnabledError, DeviceHasPasscodeSetError, DeviceNotFoundError, \ + FeatureNotSupportedError, InternalError, InvalidServiceError, MessageNotSupportedError, MissingValueError, \ + NoDeviceConnectedError, NoDeviceSelectedError, NotEnoughDiskSpaceError, NotPairedError, OSNotSupportedError, \ + PairingDialogResponsePendingError, PasswordRequiredError, RSDRequiredError, SetProhibitedError, \ + TunneldConnectionError, UserDeniedPairingError +from pymobiledevice3.osu.os_utils import get_os_utils coloredlogs.install(level=logging.INFO) @@ -134,10 +136,7 @@ def main() -> None: except PasswordRequiredError: logger.error('Device is password protected. Please unlock and retry') except AccessDeniedError: - if sys.platform == 'win32': - logger.error('This command requires admin privileges. Consider retrying with "run-as administrator".') - else: - logger.error('This command requires root privileges. Consider retrying with "sudo".') + logger.error(get_os_utils().access_denied_error) except BrokenPipeError: traceback.print_exc() except TunneldConnectionError: @@ -150,9 +149,18 @@ def main() -> None: logger.error('Not enough disk space') except RSDRequiredError: logger.error('The requested operation requires an RSD instance. For more information see:\n' - 'https://github.com/doronz88/pymobiledevice3?tab=readme-ov-file#working-with-developer-tools-ios--170') + 'https://github.com/doronz88/pymobiledevice3?tab=readme-ov-file#working-with-developer-tools-ios' + '--170') except DeprecationError: logger.error('failed to query MobileGestalt, MobileGestalt deprecated (iOS >= 17.4).') + except OSNotSupportedError as e: + logger.error( + f'Unsupported OS - {e.os_name}. To add support, consider contributing at ' + f'https://github.com/doronz88/pymobiledevice3.') + except FeatureNotSupportedError as e: + logger.error( + f'Missing implementation of `{e.feature}` on `{e.os_name}`. To add support, consider contributing at ' + f'https://github.com/doronz88/pymobiledevice3.') if __name__ == '__main__': diff --git a/pymobiledevice3/bonjour.py b/pymobiledevice3/bonjour.py index ce708160..ebc2b361 100644 --- a/pymobiledevice3/bonjour.py +++ b/pymobiledevice3/bonjour.py @@ -1,6 +1,5 @@ import asyncio import dataclasses -import sys from socket import AF_INET, AF_INET6, inet_ntop from typing import List, Mapping, Optional @@ -8,10 +7,13 @@ from zeroconf import IPVersion, ServiceListener, ServiceStateChange, Zeroconf from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf +from pymobiledevice3.osu.os_utils import get_os_utils + REMOTEPAIRING_SERVICE_NAMES = ['_remotepairing._tcp.local.'] MOBDEV2_SERVICE_NAMES = ['_apple-mobdev2._tcp.local.'] REMOTED_SERVICE_NAMES = ['_remoted._tcp.local.'] -DEFAULT_BONJOUR_TIMEOUT = 1 if sys.platform != 'win32' else 2 # On Windows, it takes longer to get the addresses +OSUTILS = get_os_utils() +DEFAULT_BONJOUR_TIMEOUT = OSUTILS.bonjour_timeout @dataclasses.dataclass @@ -64,23 +66,6 @@ class BonjourQuery: listener: BonjourListener -def get_ipv6_ips() -> List[str]: - ips = [] - if sys.platform == 'win32': - # TODO: verify on windows - ips = [f'{adapter.ips[0].ip[0]}%{adapter.ips[0].ip[2]}' for adapter in get_adapters() if adapter.ips[0].is_IPv6] - else: - for adapter in get_adapters(): - for ip in adapter.ips: - if not ip.is_IPv6: - continue - if ip.ip[0] in ('::1', 'fe80::1'): - # skip localhost - continue - ips.append(f'{ip.ip[0]}%{adapter.nice_name}') - return ips - - def query_bonjour(service_names: List[str], ip: str) -> BonjourQuery: aiozc = AsyncZeroconf(interfaces=[ip]) listener = BonjourListener(ip) @@ -106,7 +91,7 @@ async def browse(service_names: List[str], ips: List[str], timeout: float = DEFA async def browse_ipv6(service_names: List[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: - return await browse(service_names, get_ipv6_ips(), timeout=timeout) + return await browse(service_names, OSUTILS.get_ipv6_ips(), timeout=timeout) async def browse_ipv4(service_names: List[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: diff --git a/pymobiledevice3/cli/cli_common.py b/pymobiledevice3/cli/cli_common.py index a3bc6bc4..5c588449 100644 --- a/pymobiledevice3/cli/cli_common.py +++ b/pymobiledevice3/cli/cli_common.py @@ -2,7 +2,6 @@ import json import logging import os -import sys import uuid from functools import wraps from typing import Callable, List, Mapping, Optional, Tuple @@ -18,12 +17,15 @@ from pymobiledevice3.exceptions import AccessDeniedError, DeviceNotFoundError, NoDeviceConnectedError, \ NoDeviceSelectedError from pymobiledevice3.lockdown import LockdownClient, create_using_usbmux +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService from pymobiledevice3.tunneld import get_tunneld_devices from pymobiledevice3.usbmux import select_devices_by_connection_type USBMUX_OPTION_HELP = 'usbmuxd listener address (in the form of either /path/to/unix/socket OR HOST:PORT' COLORED_OUTPUT = True +UDID_ENV_VAR = 'PYMOBILEDEVICE3_UDID' +OSUTILS = get_os_utils() class RSDOption(Option): @@ -108,42 +110,10 @@ def get_last_used_terminal_formatting(buf: str) -> str: return '\x1b' + buf.rsplit('\x1b', 1)[1].split('m')[0] + 'm' -def wait_return() -> None: - if sys.platform != 'win32': - import signal - print("Press Ctrl+C to send a SIGINT or use 'kill' command to send a SIGTERM") - signal.sigwait([signal.SIGINT, signal.SIGTERM]) - else: - input('Press ENTER to exit>') - - -UDID_ENV_VAR = 'PYMOBILEDEVICE3_UDID' - - -def is_admin_user() -> bool: - """ Check if the current OS user is an Administrator or root. - - See: https://github.com/Preston-Landers/pyuac/blob/master/pyuac/admin.py - - :return: True if the current user is an 'Administrator', otherwise False. - """ - if os.name == 'nt': - import win32security - - try: - admin_sid = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid, None) - return win32security.CheckTokenMembership(None, admin_sid) - except Exception: - return False - else: - # Check for root on Posix - return os.geteuid() == 0 - - def sudo_required(func): @wraps(func) def wrapper(*args, **kwargs): - if not is_admin_user(): + if not OSUTILS.is_admin: raise AccessDeniedError() else: func(*args, **kwargs) diff --git a/pymobiledevice3/cli/developer.py b/pymobiledevice3/cli/developer.py index 21a1759c..c589d4f9 100644 --- a/pymobiledevice3/cli/developer.py +++ b/pymobiledevice3/cli/developer.py @@ -19,11 +19,12 @@ import pymobiledevice3 from pymobiledevice3.cli.cli_common import BASED_INT, Command, RSDCommand, default_json_encoder, print_json, \ - user_requested_colored_output, wait_return + user_requested_colored_output from pymobiledevice3.exceptions import DeviceAlreadyInUseError, DvtDirListError, ExtractingStackshotError, \ RSDRequiredError, UnrecognizedSelectorError from pymobiledevice3.lockdown import LockdownClient from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.remote.core_device.app_service import AppServiceService from pymobiledevice3.remote.core_device.device_info import DeviceInfoService from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService @@ -52,6 +53,7 @@ from pymobiledevice3.services.simulate_location import DtSimulateLocation from pymobiledevice3.tcp_forwarder import LockdownTcpForwarder +OSUTILS = get_os_utils() BSC_SUBCLASS = 0x40c BSC_CLASS = 0x4 VFS_AND_TRACES_SET = {0x03010000, 0x07ff0000} @@ -813,7 +815,7 @@ def accessibility_settings_set(service_provider: LockdownClient, setting, value) """ service = AccessibilityAudit(service_provider) service.set_setting(setting, eval(value)) - wait_return() + OSUTILS.wait_return() @accessibility.command('shell', cls=Command) @@ -890,7 +892,7 @@ def condition_set(service_provider: LockdownClient, profile_identifier): """ set a specific condition """ with DvtSecureSocketProxyService(lockdown=service_provider) as dvt: ConditionInducer(dvt).set(profile_identifier) - wait_return() + OSUTILS.wait_return() @developer.command(cls=Command) @@ -963,7 +965,7 @@ def check_in(service_provider: LockdownClient, hostname, force): with DtDeviceArbitration(service_provider) as device_arbitration: try: device_arbitration.check_in(hostname, force=force) - wait_return() + OSUTILS.wait_return() except DeviceAlreadyInUseError as e: logger.error(e.message) @@ -1009,7 +1011,7 @@ def dvt_simulate_location_set(service_provider: LockdownClient, latitude, longit """ with DvtSecureSocketProxyService(service_provider) as dvt: LocationSimulation(dvt).set(latitude, longitude) - wait_return() + OSUTILS.wait_return() @dvt_simulate_location.command('play', cls=Command) @@ -1019,7 +1021,7 @@ def dvt_simulate_location_play(service_provider: LockdownClient, filename: str, """ play a .gpx file """ with DvtSecureSocketProxyService(service_provider) as dvt: LocationSimulation(dvt).play_gpx_file(filename, disable_sleep=disable_sleep) - wait_return() + OSUTILS.wait_return() @developer.group() diff --git a/pymobiledevice3/cli/webinspector.py b/pymobiledevice3/cli/webinspector.py index 6d3e4a7f..9624ae9f 100644 --- a/pymobiledevice3/cli/webinspector.py +++ b/pymobiledevice3/cli/webinspector.py @@ -21,11 +21,12 @@ from pygments import formatters, highlight, lexers from pygments.styles import get_style_by_name -from pymobiledevice3.cli.cli_common import Command, wait_return +from pymobiledevice3.cli.cli_common import Command from pymobiledevice3.common import get_home_folder from pymobiledevice3.exceptions import InspectorEvaluateError, LaunchingApplicationError, \ RemoteAutomationNotEnabledError, WebInspectorNotEnabledError, WirError from pymobiledevice3.lockdown import LockdownClient, create_using_usbmux +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.services.web_protocol.cdp_server import app from pymobiledevice3.services.web_protocol.driver import By, Cookie, WebDriver from pymobiledevice3.services.web_protocol.inspector_session import InspectorSession @@ -59,6 +60,7 @@ 'switch', 'synchronized', 'this', 'throw', 'throws', 'transient', 'true', 'try', 'typeof', 'var', 'void', 'volatile', 'while', 'with', 'yield', ] +OSUTILS = get_os_utils() logger = logging.getLogger(__name__) @@ -153,7 +155,7 @@ def launch(service_provider: LockdownClient, url, timeout): driver.start_session() print('Getting URL') driver.get(url) - wait_return() + OSUTILS.wait_return() session.stop_session() inspector.close() diff --git a/pymobiledevice3/exceptions.py b/pymobiledevice3/exceptions.py index 1a1fd4ad..20957b19 100644 --- a/pymobiledevice3/exceptions.py +++ b/pymobiledevice3/exceptions.py @@ -13,6 +13,7 @@ 'NoDeviceSelectedError', 'MessageNotSupportedError', 'InvalidServiceError', 'InspectorEvaluateError', 'LaunchingApplicationError', 'BadCommandError', 'BadDevError', 'ConnectionFailedError', 'CoreDeviceError', 'AccessDeniedError', 'RSDRequiredError', 'SysdiagnoseTimeoutError', 'GetProhibitedError', + 'FeatureNotSupportedError', 'OSNotSupportedError' ] from typing import List, Optional @@ -363,3 +364,22 @@ class RSDRequiredError(PyMobileDevice3Exception): class SysdiagnoseTimeoutError(PyMobileDevice3Exception, TimeoutError): """ Timeout collecting new sysdiagnose archive """ pass + + +class SupportError(PyMobileDevice3Exception): + def __init__(self, os_name): + self.os_name = os_name + super().__init__() + + +class OSNotSupportedError(SupportError): + """ Operating system is not supported. """ + pass + + +class FeatureNotSupportedError(SupportError): + """ Feature has not been implemented for OS. """ + + def __init__(self, os_name, feature): + super().__init__(os_name) + self.feature = feature diff --git a/pymobiledevice3/osu/__init__.py b/pymobiledevice3/osu/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pymobiledevice3/osu/os_utils.py b/pymobiledevice3/osu/os_utils.py new file mode 100644 index 00000000..897deaaa --- /dev/null +++ b/pymobiledevice3/osu/os_utils.py @@ -0,0 +1,81 @@ +import inspect +import socket +import sys +from datetime import datetime +from pathlib import Path +from typing import List, Tuple + +from pymobiledevice3.exceptions import FeatureNotSupportedError, OSNotSupportedError + +DEFAULT_AFTER_IDLE_SEC = 3 +DEFAULT_INTERVAL_SEC = 3 +DEFAULT_MAX_FAILS = 3 + + +class OsUtils: + _instance = None + _os_name = None + + @classmethod + def create(cls) -> 'OsUtils': + if cls._instance is None: + cls._os_name = sys.platform + if cls._os_name == 'win32': + from pymobiledevice3.osu.win_util import Win32 + cls._instance = Win32() + elif cls._os_name == 'darwin': + from pymobiledevice3.osu.posix_util import Darwin + cls._instance = Darwin() + elif cls._os_name == 'linux': + from pymobiledevice3.osu.posix_util import Linux + cls._instance = Linux() + elif cls._os_name == 'cygwin': + from pymobiledevice3.osu.posix_util import Cygwin + cls._instance = Cygwin() + else: + raise OSNotSupportedError(cls._os_name) + return cls._instance + + @property + def is_admin(self) -> bool: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + @property + def usbmux_address(self) -> Tuple[str, int]: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + @property + def bonjour_timeout(self) -> int: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + @property + def loopback_header(self) -> bytes: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + @property + def access_denied_error(self) -> str: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + @property + def pair_record_path(self) -> Path: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + def get_ipv6_ips(self) -> List[str]: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + def set_keepalive(self, sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, + interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + def parse_timestamp(self, time_stamp) -> datetime: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + def chown_to_non_sudo_if_needed(self, path: Path) -> None: + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + def wait_return(self): + raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) + + +def get_os_utils() -> OsUtils: + return OsUtils.create() diff --git a/pymobiledevice3/osu/posix_util.py b/pymobiledevice3/osu/posix_util.py new file mode 100644 index 00000000..815b82ca --- /dev/null +++ b/pymobiledevice3/osu/posix_util.py @@ -0,0 +1,90 @@ +import datetime +import os +import signal +import socket +import struct +from pathlib import Path +from typing import List, Tuple + +from ifaddr import get_adapters + +from pymobiledevice3.osu.os_utils import DEFAULT_AFTER_IDLE_SEC, DEFAULT_INTERVAL_SEC, DEFAULT_MAX_FAILS, OsUtils +from pymobiledevice3.usbmux import MuxConnection + +_DARWIN_TCP_KEEPALIVE = 0x10 +_DARWIN_TCP_KEEPINTVL = 0x101 +_DARWIN_TCP_KEEPCNT = 0x102 + + +class Posix(OsUtils): + @property + def is_admin(self) -> bool: + return os.geteuid() == 0 + + @property + def usbmux_address(self) -> Tuple[str, int]: + return MuxConnection.USBMUXD_PIPE, socket.AF_UNIX + + @property + def bonjour_timeout(self) -> int: + return 1 + + @property + def access_denied_error(self) -> str: + return 'This command requires root privileges. Consider retrying with "sudo".' + + def get_ipv6_ips(self) -> List[str]: + return [f'{adapter.ips[0].ip[0]}%{adapter.nice_name}' for adapter in get_adapters() if + adapter.ips[0].is_IPv6] + + def chown_to_non_sudo_if_needed(self, path: Path) -> None: + if os.getenv('SUDO_UID') is None: + return + os.chown(path, int(os.getenv('SUDO_UID')), int(os.getenv('SUDO_GID'))) + + def parse_timestamp(self, time_stamp) -> datetime: + return datetime.datetime.fromtimestamp(time_stamp) + + def wait_return(self): + print("Press Ctrl+C to send a SIGINT or use 'kill' command to send a SIGTERM") + signal.sigwait([signal.SIGINT, signal.SIGTERM]) + + +class Darwin(Posix): + @property + def pair_record_path(self) -> Path: + return Path('/var/db/lockdown/') + + @property + def loopback_header(self) -> bytes: + return struct.pack('>I', socket.AF_INET6) + + def set_keepalive(self, sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, + interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPALIVE, after_idle_sec) + sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPINTVL, interval_sec) + sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPCNT, max_fails) + + +class Linux(Posix): + @property + def pair_record_path(self) -> Path: + return Path('/var/lib/lockdown/') + + @property + def loopback_header(self) -> bytes: + return b'\x00\x00\x86\xdd' + + def set_keepalive(self, sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, + interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails) + + +class Cygwin(Posix): + @property + def usbmux_address(self) -> Tuple[str, int]: + return MuxConnection.ITUNES_HOST, socket.AF_INET diff --git a/pymobiledevice3/osu/win_util.py b/pymobiledevice3/osu/win_util.py new file mode 100644 index 00000000..024acae9 --- /dev/null +++ b/pymobiledevice3/osu/win_util.py @@ -0,0 +1,62 @@ +import datetime +import os +import socket +from pathlib import Path +from typing import List, Tuple + +import win32security +from ifaddr import get_adapters + +from pymobiledevice3.osu.os_utils import DEFAULT_AFTER_IDLE_SEC, DEFAULT_INTERVAL_SEC, OsUtils +from pymobiledevice3.usbmux import MuxConnection + + +class Win32(OsUtils): + @property + def is_admin(self) -> bool: + """ Check if the current OS user is an Administrator or root. + See: https://github.com/Preston-Landers/pyuac/blob/master/pyuac/admin.py + :return: True if the current user is an 'Administrator', otherwise False. + """ + try: + admin_sid = win32security.CreateWellKnownSid(win32security.WinBuiltinAdministratorsSid, None) + return win32security.CheckTokenMembership(None, admin_sid) + except Exception: + return False + + @property + def usbmux_address(self) -> Tuple[str, int]: + return MuxConnection.ITUNES_HOST, socket.AF_INET + + @property + def bonjour_timeout(self) -> int: + return 2 + + @property + def loopback_header(self) -> bytes: + return b'\x00\x00\x86\xdd' + + @property + def access_denied_error(self) -> str: + return 'This command requires admin privileges. Consider retrying with "run-as administrator".' + + @property + def pair_record_path(self) -> Path: + return Path(os.environ.get('ALLUSERSPROFILE', ''), 'Apple', 'Lockdown') + + def get_ipv6_ips(self) -> List[str]: + return [f'{adapter.ips[0].ip[0]}%{adapter.ips[0].ip[2]}' for adapter in get_adapters() if + adapter.ips[0].is_IPv6] + + def set_keepalive(self, sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, + interval_sec: int = DEFAULT_INTERVAL_SEC, **kwargs) -> None: + sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec * 1000, interval_sec * 1000)) + + def parse_timestamp(self, time_stamp) -> datetime: + return datetime.datetime.fromtimestamp(time_stamp / 1000) + + def chown_to_non_sudo_if_needed(self, path: Path) -> None: + return + + def wait_return(self): + input('Press ENTER to exit>') diff --git a/pymobiledevice3/pair_records.py b/pymobiledevice3/pair_records.py index 6e3d0cdd..7cb248df 100644 --- a/pymobiledevice3/pair_records.py +++ b/pymobiledevice3/pair_records.py @@ -1,8 +1,6 @@ import logging -import os import platform import plistlib -import sys import uuid from contextlib import suppress from pathlib import Path @@ -11,17 +9,11 @@ from pymobiledevice3 import usbmux from pymobiledevice3.common import get_home_folder from pymobiledevice3.exceptions import MuxException, NotPairedError +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.usbmux import PlistMuxConnection -from pymobiledevice3.utils import chown_to_non_sudo_if_needed - -PAIR_RECORDS_PATH = { - 'win32': Path(os.environ.get('ALLUSERSPROFILE', ''), 'Apple', 'Lockdown'), - 'darwin': Path('/var/db/lockdown/'), - 'linux': Path('/var/lib/lockdown/'), -} logger = logging.getLogger(__name__) - +OSUTILS = get_os_utils() PAIRING_RECORD_EXT = 'plist' @@ -32,8 +24,7 @@ def generate_host_id(hostname: str = None) -> str: def get_itunes_pairing_record(identifier: str) -> Optional[Mapping]: - platform_type = 'linux' if not sys.platform.startswith('linux') else sys.platform - filename = PAIR_RECORDS_PATH[platform_type] / f'{identifier}.plist' + filename = OSUTILS.pair_record_path / f'{identifier}.plist' try: with open(filename, 'rb') as f: pair_record = plistlib.load(f) @@ -82,7 +73,7 @@ def create_pairing_records_cache_folder(pairing_records_cache_folder: Path = Non pairing_records_cache_folder = get_home_folder() else: pairing_records_cache_folder.mkdir(parents=True, exist_ok=True) - chown_to_non_sudo_if_needed(pairing_records_cache_folder) + OSUTILS.chown_to_non_sudo_if_needed(pairing_records_cache_folder) return pairing_records_cache_folder diff --git a/pymobiledevice3/remote/tunnel_service.py b/pymobiledevice3/remote/tunnel_service.py index 536a7991..93a31eba 100644 --- a/pymobiledevice3/remote/tunnel_service.py +++ b/pymobiledevice3/remote/tunnel_service.py @@ -15,7 +15,7 @@ from collections import namedtuple from contextlib import asynccontextmanager, suppress from pathlib import Path -from socket import AF_INET6, create_connection +from socket import create_connection from ssl import VerifyMode from typing import AsyncGenerator, List, Mapping, Optional, TextIO, cast @@ -45,6 +45,7 @@ from srptools.constants import PRIME_3072, PRIME_3072_GEN from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.services.lockdown_service import LockdownService try: @@ -63,13 +64,10 @@ from pymobiledevice3.remote.utils import get_rsds, resume_remoted_if_required, stop_remoted_if_required from pymobiledevice3.remote.xpc_message import XpcInt64Type, XpcUInt64Type from pymobiledevice3.service_connection import ServiceConnection -from pymobiledevice3.utils import asyncio_print_traceback, chown_to_non_sudo_if_needed, set_keepalive - -if sys.platform == 'darwin': - LOOKBACK_HEADER = struct.pack('>I', AF_INET6) -else: - LOOKBACK_HEADER = b'\x00\x00\x86\xdd' +from pymobiledevice3.utils import asyncio_print_traceback +OSUTIL = get_os_utils() +LOOPBACK_HEADER = OSUTIL.loopback_header logger = logging.getLogger(__name__) IPV6_HEADER_SIZE = 40 @@ -153,14 +151,14 @@ async def wait_closed(self) -> None: @asyncio_print_traceback async def tun_read_task(self) -> None: - read_size = self.tun.mtu + len(LOOKBACK_HEADER) + read_size = self.tun.mtu + len(LOOPBACK_HEADER) try: if sys.platform != 'win32': async with aiofiles.open(self.tun.fileno(), 'rb', opener=lambda path, flags: path, buffering=0) as f: while True: packet = await f.read(read_size) - assert packet.startswith(LOOKBACK_HEADER) - packet = packet[len(LOOKBACK_HEADER):] + assert packet.startswith(LOOPBACK_HEADER) + packet = packet[len(LOOPBACK_HEADER):] await self.send_packet_to_device(packet) else: while True: @@ -240,7 +238,7 @@ def quic_event_received(self, event: QuicEvent) -> None: elif isinstance(event, StreamDataReceived): self._queue.put_nowait(json.loads(CDTunnelPacket.parse(event.data).body)) elif isinstance(event, DatagramFrameReceived): - self.tun.write(LOOKBACK_HEADER + event.data) + self.tun.write(LOOPBACK_HEADER + event.data) @staticmethod def _encode_cdtunnel_packet(data: Mapping) -> bytes: @@ -267,7 +265,7 @@ async def sock_read_task(self) -> None: ipv6_header = await self._reader.readexactly(IPV6_HEADER_SIZE) ipv6_length = struct.unpack('>H', ipv6_header[4:6])[0] ipv6_body = await self._reader.readexactly(ipv6_length) - self.tun.write(LOOKBACK_HEADER + ipv6_header + ipv6_body) + self.tun.write(LOOPBACK_HEADER + ipv6_header + ipv6_body) except (OSError, asyncio.exceptions.IncompleteReadError) as e: self._logger.warning(f'got {e.__class__.__name__} in {asyncio.current_task().get_name()}') await self.wait_closed() @@ -428,7 +426,7 @@ async def start_tcp_tunnel(self) -> AsyncGenerator[TunnelResult, None]: host = self.hostname port = parameters['port'] sock = create_connection((host, port)) - set_keepalive(sock) + OSUTIL.set_keepalive(sock) ctx = SSLPSKContext(ssl.PROTOCOL_TLSv1_2) ctx.psk = self.encryption_key ctx.set_ciphers('PSK') @@ -453,7 +451,7 @@ def save_pair_record(self) -> None: 'private_key': self.ed25519_private_key.private_bytes_raw(), 'remote_unlock_host_key': self.remote_unlock_host_key })) - chown_to_non_sudo_if_needed(self.pair_record_path) + OSUTIL.chown_to_non_sudo_if_needed(self.pair_record_path) @property def pair_record(self) -> Optional[Mapping]: diff --git a/pymobiledevice3/service_connection.py b/pymobiledevice3/service_connection.py index 7a40afec..ae3335a1 100755 --- a/pymobiledevice3/service_connection.py +++ b/pymobiledevice3/service_connection.py @@ -12,13 +12,13 @@ from pymobiledevice3.exceptions import ConnectionFailedError, ConnectionTerminatedError, NoDeviceConnectedError, \ PyMobileDevice3Exception +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.usbmux import MuxDevice, select_device -from pymobiledevice3.utils import set_keepalive DEFAULT_AFTER_IDLE_SEC = 3 DEFAULT_INTERVAL_SEC = 3 DEFAULT_MAX_FAILS = 3 - +OSUTIL = get_os_utils() SHELL_USAGE = """ # This shell allows you to communicate directly with every service layer behind the lockdownd daemon. @@ -79,7 +79,7 @@ def __init__(self, sock: socket.socket, mux_device: MuxDevice = None): def create_using_tcp(hostname: str, port: int, keep_alive: bool = True) -> 'ServiceConnection': sock = socket.create_connection((hostname, port)) if keep_alive: - set_keepalive(sock) + OSUTIL.set_keepalive(sock) return ServiceConnection(sock) @staticmethod diff --git a/pymobiledevice3/services/dvt/instruments/process_control.py b/pymobiledevice3/services/dvt/instruments/process_control.py index a1382d82..c9e15b90 100644 --- a/pymobiledevice3/services/dvt/instruments/process_control.py +++ b/pymobiledevice3/services/dvt/instruments/process_control.py @@ -1,11 +1,12 @@ import dataclasses -import datetime -import sys import typing +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService from pymobiledevice3.services.remote_server import MessageAux +OSUTIL = get_os_utils() + @dataclasses.dataclass class OutputReceivedEvent: @@ -16,10 +17,7 @@ class OutputReceivedEvent: @classmethod def create(cls, message) -> 'OutputReceivedEvent': try: - if sys.platform == 'win32': - date = datetime.datetime.fromtimestamp(message[2].value / 1000) - else: - date = datetime.datetime.fromtimestamp(message[2].value) + date = OSUTIL.parse_timestamp(message[2].value) except (ValueError, OSError): date = None diff --git a/pymobiledevice3/tunneld.py b/pymobiledevice3/tunneld.py index 713e75b6..43d82f33 100644 --- a/pymobiledevice3/tunneld.py +++ b/pymobiledevice3/tunneld.py @@ -4,7 +4,6 @@ import logging import os import signal -import sys import traceback from contextlib import asynccontextmanager, suppress from typing import Dict, List, Mapping, Optional, Tuple, Union @@ -14,7 +13,6 @@ import requests import uvicorn from fastapi import FastAPI -from ifaddr import get_adapters from packaging.version import Version from pymobiledevice3 import usbmux @@ -22,6 +20,7 @@ from pymobiledevice3.exceptions import ConnectionFailedError, ConnectionFailedToUsbmuxdError, GetProhibitedError, \ InvalidServiceError, MuxException, PairingError, TunneldConnectionError from pymobiledevice3.lockdown import create_using_usbmux +from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.remote.common import TunnelProtocol from pymobiledevice3.remote.module_imports import start_tunnel from pymobiledevice3.remote.remote_service_discovery import RSD_PORT, RemoteServiceDiscoveryService @@ -41,6 +40,7 @@ REMOTEPAIRING_INTERVAL = 5 USBMUX_INTERVAL = 2 +OSUTILS = get_os_utils() @dataclasses.dataclass @@ -80,13 +80,7 @@ def tunnel_exists_for_udid(self, udid: str) -> bool: async def monitor_usb_task(self) -> None: previous_ips = [] while True: - if sys.platform == 'win32': - current_ips = [f'{adapter.ips[0].ip[0]}%{adapter.ips[0].ip[2]}' for adapter in get_adapters() if - adapter.ips[0].is_IPv6] - else: - current_ips = [f'{adapter.ips[0].ip[0]}%{adapter.nice_name}' for adapter in get_adapters() if - adapter.ips[0].is_IPv6] - + current_ips = OSUTILS.get_ipv6_ips() added = [ip for ip in current_ips if ip not in previous_ips] removed = [ip for ip in previous_ips if ip not in current_ips] diff --git a/pymobiledevice3/usbmux.py b/pymobiledevice3/usbmux.py index 7c9fd93c..363055ac 100644 --- a/pymobiledevice3/usbmux.py +++ b/pymobiledevice3/usbmux.py @@ -1,7 +1,6 @@ import abc import plistlib import socket -import sys import time from dataclasses import dataclass from typing import List, Mapping, Optional @@ -11,6 +10,7 @@ from pymobiledevice3.exceptions import BadCommandError, BadDevError, ConnectionFailedError, \ ConnectionFailedToUsbmuxdError, MuxException, MuxVersionError, NotPairedError +from pymobiledevice3.osu.os_utils import get_os_utils usbmuxd_version = Enum(Int32ul, BINARY=0, @@ -163,12 +163,7 @@ def create_usbmux_socket(usbmux_address: Optional[str] = None) -> SafeStreamSock address = usbmux_address family = socket.AF_UNIX else: - if sys.platform in ['win32', 'cygwin']: - address = MuxConnection.ITUNES_HOST - family = socket.AF_INET - else: - address = MuxConnection.USBMUXD_PIPE - family = socket.AF_UNIX + address, family = get_os_utils().usbmux_address return SafeStreamSocket(address, family) except ConnectionRefusedError: raise ConnectionFailedToUsbmuxdError() diff --git a/pymobiledevice3/utils.py b/pymobiledevice3/utils.py index 701e3ccf..43726c78 100644 --- a/pymobiledevice3/utils.py +++ b/pymobiledevice3/utils.py @@ -1,32 +1,10 @@ import asyncio -import os -import platform -import socket -import sys import traceback from functools import wraps -from pathlib import Path from typing import Callable from construct import Int8ul, Int16ul, Int32ul, Int64ul, Select -if sys.platform != 'win32': - from os import chown - -DEFAULT_AFTER_IDLE_SEC = 3 -DEFAULT_INTERVAL_SEC = 3 -DEFAULT_MAX_FAILS = 3 - -_DARWIN_TCP_KEEPALIVE = 0x10 -_DARWIN_TCP_KEEPINTVL = 0x101 -_DARWIN_TCP_KEEPCNT = 0x102 - - -def chown_to_non_sudo_if_needed(path: Path) -> None: - if os.getenv('SUDO_UID') is None or sys.platform == 'win32': - return - chown(path, int(os.getenv('SUDO_UID')), int(os.getenv('SUDO_GID'))) - def plist_access_path(d, path: tuple, type_=None, required=False): for component in path: @@ -70,45 +48,3 @@ async def wrapper(*args, **kwargs): raise return wrapper - - -def set_keepalive(sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, - interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: - """ - set keep-alive parameters on a given socket - - :param sock: socket to operate on - :param after_idle_sec: idle time used when SO_KEEPALIVE is enabled - :param interval_sec: interval between keepalives - :param max_fails: number of keepalives before close - - """ - plat = platform.system() - if plat == 'Linux': - return _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails) - if plat == 'Darwin': - return _set_keepalive_darwin(sock, after_idle_sec, interval_sec, max_fails) - if plat == 'Windows': - return _set_keepalive_win(sock, after_idle_sec, interval_sec) - raise RuntimeError(f'Unsupported platform {plat}') - - -def _set_keepalive_linux(sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, - interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec) - sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails) - - -def _set_keepalive_darwin(sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, - interval_sec: int = DEFAULT_INTERVAL_SEC, max_fails: int = DEFAULT_MAX_FAILS) -> None: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPALIVE, after_idle_sec) - sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPINTVL, interval_sec) - sock.setsockopt(socket.IPPROTO_TCP, _DARWIN_TCP_KEEPCNT, max_fails) - - -def _set_keepalive_win(sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, - interval_sec: int = DEFAULT_INTERVAL_SEC) -> None: - sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec * 1000, interval_sec * 1000)) diff --git a/pyproject.toml b/pyproject.toml index 42dd2575..983a6b91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ dynamic = ["dependencies", "version"] [project.optional-dependencies] -test = ["pytest"] +test = ["pytest", "pytest-asyncio"] [project.urls] "Homepage" = "https://github.com/doronz88/pymobiledevice3"