Source code for anadroid.device.Device

import json
import os
import re
import threading
import time
from com.dtmilano.android.viewclient import ViewClient
from anadroid.application.AndroidProject import BUILD_TYPE
from anadroid.application.Application import App
from anadroid.device.AbstractDevice import AbstractDevice, ADB_CONN
import difflib

from anadroid.device.DeviceState import DeviceState, DEVICE_STATE_ENFORCE
from anadroid.utils.Utils import execute_shell_command, get_resources_dir, logi, loge, logs, logw

DEFAULT_TCP_PORT = 5555
DEVICE_STATE_TEST_FILENAME = "device_state_on_test.json"
DEVICE_STATE_IDLE_FILENAME = "device_state_on_idle.json"
CONFIG_DIR = os.path.join(get_resources_dir(), 'config')
CONFIG_TEST_FILE = os.path.join(CONFIG_DIR, DEVICE_STATE_TEST_FILENAME) \
    if not os.path.exists(DEVICE_STATE_TEST_FILENAME) else DEVICE_STATE_TEST_FILENAME
CONFIG_IDLE_FILE = os.path.join(CONFIG_DIR, DEVICE_STATE_IDLE_FILENAME) if\
    not os.path.exists(DEVICE_STATE_IDLE_FILENAME) else DEVICE_STATE_IDLE_FILENAME


[docs]def background_installer(serial_nr): time.sleep(2) if has_to_click_to_install(serial_nr): click_to_install(serial_nr)
[docs]def has_to_click_to_install(serial_nr): vc = ViewClient(*ViewClient.connectToDeviceOrExit(serialno=serial_nr)) try: vc.findViewByIdOrRaise('com.google.securitycenter:id/name') except: try: vc.findViewByIdOrRaise('com.miui.securitycenter:id/name') except: return False return True
[docs]def click_to_install(serial_nr): vc = ViewClient(*ViewClient.connectToDeviceOrExit(serialno=serial_nr)) res = vc.findViewByIdOrRaise('android:id/button2') res.touch()
# assuming only 1 device connected
[docs]def set_device_conn(conn_type, device_id=None): """sets connection of type conn_type with a connected device or a device with id == device_id. Args: conn_type(ADB_CONN): connection type. device_id: device id. """ device_string = f"-s {device_id}" if device_id is not None else "" if conn_type == ADB_CONN.USB or conn_type == ADB_CONN.USB.value: result = execute_shell_command(f'adb {device_string} disconnect; adb {device_string} usb') if result.validate("No devices/emulators found"): logi("Device is now connected via USB") elif conn_type == ADB_CONN.WIFI or conn_type == ADB_CONN.WIFI.value: device = get_first_connected_device() if device.conn_type == ADB_CONN.WIFI: logi("device already connected via wifi") else: res = execute_shell_command(f"adb {device_string} shell ip -f inet addr show wlan0") if "more than one device" in res.errors: loge("more than one device connected. please specify device id or unplug the device") elif res.validate("Error accessing device network interface") and res.output == "": loge("no wifi connection detected on the device. Please connect the device to a wireless network") else: reg = re.search(r'[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+', (res.output)) if reg is None: raise Exception("Bad address read") ip_addr = reg.group() res = execute_shell_command(f"adb {device_string} tcpip {DEFAULT_TCP_PORT}; adb {device_string} connect {ip_addr}") if res.validate(f"error while connecting to {ip_addr} address") and f"connected to {ip_addr}" in res.output: logs(f"successfully connected to {ip_addr}") logw("Please unplug the device from the workstation in order to disable USB connection") else: raise Exception("Unknown device connection type (not USB or WIFI)")
[docs]def get_first_connected_device(conn_type=ADB_CONN.USB.value): """Retrieves the first connected device that it founds using adb. Retrieves the first device of the list retrieved by adb devices -l command. Args: conn_type: Returns: Device: first device found. """ result = execute_shell_command('adb devices -l | grep \"product\"') # #| cut -f1 -d\ ') conn_type = ADB_CONN.USB if "usb" in result.output.lower() else ADB_CONN.WIFI result.validate(DeviceNotFoundError("No devices/emulators found")) device_serial = result.output.split(" ")[0] if device_serial == "": raise DeviceNotFoundError(f"No devices/emulators connected.") return Device(device_serial, conn_type=conn_type)
[docs]class DeviceNotFoundError(Exception): pass
[docs]class Device(AbstractDevice): """Class that extends AbstractDevice to handle interaction with devices connected via ADB.""" def __init__(self, serial_nr, conn_type=ADB_CONN.USB): super(Device, self).__init__(serial_nr) self.conn_type = conn_type self.props = {} self.state = None self.installed_packages = set() self.__init_installed_packages() self.__init_props() self.device_state = DeviceState(self) self.device_state_test = self.load_device_state(CONFIG_TEST_FILE) self.device_state_idle = self.load_device_state(CONFIG_IDLE_FILE) self.installed_apks = []
[docs] def get_device_props(self): """returns props dict containing device properties that were obtained using getprop command. Returns: props(dict): dict with properties. """ return self.props
[docs] def execute_command(self, cmd, args=[], shell=False): return super().execute_command(cmd, args, shell)
[docs] def execute_root_command(self, cmd, args=[], shell=False): return super(Device, self).execute_root_command(cmd, args, shell)
[docs] def install_apks(self, andr_proj, build_type=BUILD_TYPE.RELEASE, accept_install=False, install_test_apks=False, retry=True): """install apks of type build_type of android project andr_proj. Args: andr_proj(AndroidProject): build_type(BUILD_TYPE): build type. install_test_apks(bool): if test apk installation has to be performed. Returns: installed_packages(set): set of installed packages. """ installed_packages = set() installed_app_list = set() apks_built = andr_proj.get_apks(build_type) if install_test_apks: apks_built += andr_proj.get_test_apks() for apk in apks_built: old_packs = self.installed_packages try: print(f"installing {apk}") self.install_apk(apk) except: if retry: self.install_apks(andr_proj, build_type, accept_install=True, retry=False, install_test_apks=install_test_apks) new_packs = self.list_installed_packages() diff_pkgs = list(filter(lambda x: x not in old_packs, new_packs)) if len(diff_pkgs) == 0: logw("package already installed or mocked device") the_pack = self.get_package_matching(andr_proj.pkg_name) if the_pack is None: # mocked device the_pack = andr_proj.pkg_name diff_pkgs = [the_pack] logi("APK installed " + apk) self.installed_apks.append(apk) app = App(self, andr_proj, andr_proj.pkg_name, apk_path=apk, local_res_dir=andr_proj.results_dir) installed_app_list.add(app) installed_pack = diff_pkgs[0] installed_packages.add(installed_pack) self.installed_packages.add(installed_pack) #return installed_packages return installed_app_list
[docs] def install_apk(self, apk_path, accept_install=True): """installs apk located in apk_path Args: apk_path(str): apk path. Returns: COMMAND_RESULT: result of installation command. """ if accept_install: unlocked = self.is_screen_unlocked() if not unlocked: self.unlock_screen() thread1 = threading.Thread(target=background_installer, args=[self.serial_nr]) thread1.start() logi("installing main APK(s)") res = super().execute_command("install -r %s" % apk_path, args=[], shell=False) res.validate(Exception("Unable to install package " + apk_path))
[docs] def unlock_screen(self, password=None): super(Device, self).unlock_screen(password)
[docs] def is_screen_dreaming(self): return super(Device, self).is_screen_dreaming()
[docs] def is_screen_unlocked(self): return super(Device, self).is_screen_unlocked()
[docs] def uninstall_pkg(self, pkg_name): super().execute_command("uninstall ", args=[pkg_name], shell=False).validate(Exception("Unable to uninstall package " + pkg_name))
[docs] def list_installed_packages(self): """returns list of device's installed packages. Returns: vals(:obj:`list` of :obj:`str`): list of packages. """ vals = [] res = self.execute_command("pm list packages", args=[], shell=True) res.validate(Exception("Error obtaining device packages")) for line in res.output.splitlines(): val = re.sub(r'package:', '', line).strip() vals.append(val) return vals
[docs] def get_min_sdk_version(self): """returns min sdk version of device. Gets ro.build.version.min_supported_target_sdk property. Returns: int: major min sdk version. """ return int(self.props["ro.build.version.min_supported_target_sdk"]) if "ro.build.version.min_supported_target_sdk" in self.props else int(self.props["ro.build.version.sdk"])
[docs] def get_device_sdk_version(self): """returns device sdk version. Gets ro.build.version.sdk. Returns: int: major min sdk version. """ return int(self.props["ro.build.version.sdk"]) if "ro.build.version.sdk" in self.props else 19
def __init_props(self): """fetch properties from device. Fills props attribute with properties coming from getprop command. """ res = super().execute_command("getprop", args=[], shell=True) res.validate(DeviceNotFoundError("There is no connected devices")) for line in res.output.splitlines(): vals= re.sub(r'\[|\]', '', line).split(":") if len(vals) > 1: self.props[vals[0]] = vals[1]
[docs] def get_prop(self, key): if key in self.props: return self.props[key] res = super().execute_command("getprop", args=[key], shell=True) res.validate(DeviceNotFoundError("There is no connected devices")) return res.output.strip()
def __init_installed_packages(self): """updates installed packages list. """ packs = self.list_installed_packages() self.installed_packages.update(packs)
[docs] def get_package_matching(self, pkg_aprox_name): """gets installed package more alike with a given pkg name. Args: pkg_aprox_name: name of the package. Returns: str: the most alike installed pkg name. """ candidates = difflib.get_close_matches(pkg_aprox_name, self.installed_packages, n=1) if len(candidates) > 0: return candidates[0] else: new_cands = difflib.get_close_matches(pkg_aprox_name, self.list_installed_packages(), n=1) return new_cands[0] if len(new_cands) > 0 else None
[docs] def lock_screen(self): super(Device, self).lock_screen()
[docs] def has_package_installed(self, pack_name): """checks if a given package is installed on device. Args: pack_name: package name. Returns: bool: True if installed, False otherwise. """ return pack_name in self.installed_packages
[docs] def contains_file(self, filepath): """checks if device has a file in a given filepath. Args: filepath: path of file to check. Returns: bool: True if file exists, False otherwise. """ res = self.execute_command("test -e ", args=[filepath], shell=True) return res.return_code == 0
[docs] def clear_logcat(self): """clear device logs.""" super().execute_command("logcat -c", args=[])\ .validate()
[docs] def dump_logcat_to_file(self, filename="logcat.out"): """dumps device logs to file with path filename. Args: filename: path or name of file. """ super().execute_command(f"logcat -d > {filename}", args=[]) \ .validate(Exception("Unable to dump logcat to file"))
[docs] def get_device_android_version(self): return super().get_device_android_version()
[docs] def is_rooted(self): """check if it is a rooted device. Returns: bool: True if is rooted, False otherwise. """ return super().execute_command("su -c 'echo hi'", shell=True).validate()
[docs] def load_device_state(self, filepath): """load device state from a file located in filepath Args: filepath: location of file with the state. Returns: json_def(dict): dict with state. """ json_def={} with open(filepath, 'r') as filehandle: json_def = json.load(filehandle) return json_def
[docs] def set_device_state(self, state_cfg=DEVICE_STATE_ENFORCE.TEST, perm_json=None): """enforces a given device state. Args: state_cfg(DEVICE_STATE_ENFORCE): type of state to enforce perm_json: set of states to enforce according to the permission to give in such state. """ vals = {} if state_cfg == DEVICE_STATE_ENFORCE.APP and perm_json is not None: for perm in perm_json: vals += self.device_state.get_states_from_permission(perm) elif state_cfg == DEVICE_STATE_ENFORCE.TEST: vals = self.device_state_test elif state_cfg == DEVICE_STATE_ENFORCE.IDLE: vals = self.device_state_idle for k, v in vals.items(): self.device_state.enforce_state(k, v)
[docs] def get_new_installed_pkgs(self): """if a new pkg was installed, retrieves a list of the new installed packages. Returns: new_pkgs(:obj:`list` of :obj:`str`): list of new packages. """ old_pkgs = self.installed_packages new_pkgs = self.list_installed_packages() has_no_prefix = lambda candidate, pklist: len(list(filter(lambda z: z in candidate and z != candidate, pklist))) == 0 return list(filter( lambda x: x not in old_pkgs and has_no_prefix(x, new_pkgs), new_pkgs))
[docs] def set_log_size(self, log_size_bytes=16384): """sets log size through persist.logd.size property. Args: log_size_bytes: size in bytes. """ super(Device, self).execute_root_command(f"shell setprop persist.logd.size {log_size_bytes}K", shell=True)
[docs] def get_device_model(self): return self.get_prop("ro.product.model")
[docs] def get_device_brand(self): return self.get_prop("ro.product.brand")
[docs] def get_device_ram(self): res = self.execute_command(" cat /proc/meminfo | grep 'MemTotal' | cut -f2 -d:", args=[], shell=True) res.validate("Unable to get device ram") return res.output.strip().replace(" ","")
[docs] def get_device_cores(self): res = self.execute_command(" cat /proc/cpuinfo | grep processor | wc -l", args=[], shell=True) res.validate("Unable to get device cores") return res.output.strip()
[docs] def get_device_max_cpu_freq(self): res = self.execute_command(" cat /proc/cpumaxfreq", args=[], shell=True) if res.validate("Unable to get device max cpu freq"): return res.output.strip().replace(" ", "") return "0"
[docs] def get_kernel_version(self): res = self.execute_command("cat /proc/version", args=[], shell=True) res.validate("Unable to get kernel version") return res.output.strip()
[docs] def get_device_specs(self): return { "device_serial_number": self.serial_nr, "device_model": self.get_device_model(), "device_brand": self.get_device_brand(), "device_ram": self.get_device_ram(), "device_cores": self.get_device_cores(), "device_max_cpu_freq": self.get_device_max_cpu_freq() }
[docs] def get_device_info(self): return { "state_device_id": self.serial_nr, "state_os_version": self.get_prop("ro.build.version"), "state_miui_version": self.get_prop("ro.miui.cust_variant"), "state_api_version": self.get_prop("ro.build.version.sdk"), "state_kernel_version": self.get_kernel_version()[:200], "state_operator": self.get_prop("gsm.sim.operator.alpha"), "state_operator_country": self.get_prop("gsm.operator.iso-country"), "state_nr_installed_apps": len(self.installed_packages), "state_current_lang": self.get_prop("persist.sys.locale") }
[docs] def save_device_specs(self, filepath): res = self.get_device_specs() with open(filepath, 'w') as jj: json.dump(res, jj)
[docs] def save_device_info(self, filepath): res = self.get_device_info() with open(filepath, 'w') as jj: json.dump(res, jj)
[docs] def save_device_state(self, filepath="state.json"): device_stt = self.device_state.get_device_state() if not os.path.exists(os.path.dirname(filepath)): os.mkdir(os.path.dirname(filepath)) with open(filepath, 'w') as jj: json.dump(device_stt, jj, indent=1)