Source code for manafa.parsing.batteryStats.BatteryStatsParser

""" This module contains ENUMs that store state values of batterystats events.

BatteryStatsConstants contains constants associated with batterystats events and respective meaning.
"""

import os
import re, json
# sys.path.append(path.dirname(path.dirname(path.abspath(__file__))))

from manafa.parsing.powerProfile.PowerProfile import PowerProfile
from manafa.utils.Utils import get_resources_dir
from manafa.utils.dateUtils import convertBatStatTimeToTimeStamp, batStatResetTimeToTimeStamp
import copy
from manafa.utils.Logger import log, LogSeverity

DEFAULT_JSON_PATH = os.path.join(get_resources_dir(), "batteryStats", "BatteryStatus.json")



[docs]def safe_division(a, b): """function to safely perform division between numbers to avoid division by 0. Args: a (float): base. a (float): quote. Returns: int: the result of the division. """ z = 1 if b == 0 else b return a / z
[docs]class BatteryEvent(object): """Class to store information information parsed from the lines of batterystats history events. This class stores information of one or more lines (when consecutive lines have the same timestamp) of batterystats history events. Attributes: time (float): Stores timestamp of event (s.ms). updates (dict): stores component updates (components referred in power_profile.xml) registered in time timestamp. current (dict): stores instant current being consumed by each component. concurrentUpdates (dict); stores information regarding other updates that are not directly related to energy consumption. """ def __init__(self, time=0.0, vals={}): """Init Battery event with parsed information. Args: time (float): timestamp of event (s.ms). vals (dict): events registered in batterystats history line(s). """ self.time = time self.updates = {} self.currents = {} self.concurrentUpdates = {"tmpwhitelist": [], "job": [], "sync": [], "top": [], "longwake": [], "fg": [], "proc": [], "screenwake": [], "pkgactive": [], "user": [], "userfg": [], "wake_lock_in": [], "alarm": []} self.add_events(vals) def __str__(self): return "time:%f vals = %s , concs= %s " % (self.time, str(self.updates), str(self.concurrentUpdates)) def __repr__(self): return str(self)
[docs] def is_concurrent(self, state): """checks if is a concurrent update (aka event not related to hardware/sensor state change). Args: state (str): timestamp of event (s.ms). Returns: bool: True if successful, False otherwise. """ # return re.match(r"(\+|\-)?(tmpwhitelist|job|sync|top|longwake|fg|proc)", state) return state in self.concurrentUpdates
[docs] def get_current_of_batStatEvent(self): """gets total current being consumed by the current state of hardware usage. Args: state (str): timestamp of event (s.ms). Returns: float: total current being consumed. """ currs = {} total = 0 for v, x in self.currents.items(): try: z = float(x) currs[v] = z / 1000 total += z except ValueError: currs[v] = 0 continue return total / 1000, currs
[docs] def get_voltage_value(self): """gets current voltage value (last value recorded in history lines). Returns: float: total current value. """ return float(self.updates["volt"]) / 1000 if "volt" in self.updates else 0
[docs] def add_events(self, new_events): """Adds a set of events to the current state. Args: new_events (dict): set of events. """ for ev in new_events.keys(): # print("->"+ev + "--") if ev.startswith("-"): conc_update_state = ev.replace("-", "", 1) if self.is_concurrent(conc_update_state): self.concurrentUpdates[conc_update_state] = [n for n in self.concurrentUpdates[conc_update_state] if (n["val"] != new_events[ev]["val"] and n["val2"] != new_events[ev]["val2"])] else: self.updates.pop(conc_update_state, None) else: ev_def = ev.replace("+", "", 1) if self.is_concurrent(ev_def): self.concurrentUpdates[ev_def].append(new_events[ev]) else: self.updates[ev_def] = new_events[ev]
[docs]class BatteryStatsParser(object): """Class that handles parsing of batterystats history events This class parses and batterystats history events from files parsed using parseFile function. It starts by load information contained in the device power profile file and also the current state values known by the profiler, stored in definitions attribute. Attributes: events (:obj:`list` of :obj:`BatteryEvent`): stores BatteryEvents by order of occurence. definitions (dict): stores definitions ok known states support by batterystats and manafa. powerProfile (:obj:`PowerProfile`): stores information parsed from power profile file. android_version (int): Android release version. start_time (int): initial timestamp inferred from first line of batstats history. timezone (str): current device timezone. """ def __init__(self, powerProfile=None, timezone="EST", def_file=DEFAULT_JSON_PATH, android_version=10): self.events = [] self.definitions = self.load_definition_file(def_file) self.powerProfile = PowerProfile(powerProfile) if powerProfile is not None else {} self.android_version = android_version self.start_time = 0 self.timezone = timezone # adb shell date +"%Z %z"
[docs] @staticmethod def load_definition_file(def_file): """loads definitions attribute from json file Args: def_file (str): filepath. """ with open(def_file, "r") as dff: return json.load(dff)
[docs] def get_definition_val(self, key, val=""): """return value of component from key and opt val. Args: key(str): key that identifies component. val(str): optional sub key. Returns: int: value. """ res = re.sub(r"\+|\-", "", key) if res in self.definitions["monoval"]: return 0 if "-" in key else 1 elif res in self.definitions["trival"]: return key elif res in self.definitions["numerical"]: return val elif res in self.definitions["nominal"]: return self.definitions["nominal"][str(key)][val] return None
[docs] def is_trival(self, key): """check if is a component with more than 2 values associated. Args: key: component key. Returns: bool: result of check. """ return re.sub(r"\+|\-", "", key) in self.definitions["trival"]
[docs] def parse_states(self, states): """Parses states from batstats history line. Args: states: string containing the state updates. Returns: dict: parsed events """ accum = False accumulator = "" latest_state = None events = {} for state in re.sub(r"^ ", "", states).split(" "): # print("->" +state) if accum: accumulator += state if state.count('\"') % 2 != 0: accum = not accum accumulator = "" continue if "=" in state: if state.count('\"') % 2 != 0: accum = True accumulator += state continue else: key = state.split("=")[0] val = state.split("=")[1] state = self.get_definition_val(key, val) if self.is_trival(key): # print("%s - %s -%s" %(key,val.split(":")[0],val.split(":")[1])) # return key,val.split(":")[0],val.split(":")[1] events[key] = {"val": val.split(":")[0], "val2": "".join(val.split(":")[1:])} else: # print("%s = %s" %(key,state)) events[key] = state elif state != "" and state != " ": st = self.get_definition_val(state) # print("%s - %s" %(state,st)) # print("val "+str(self.getDefinitionVal(state))) events[state] = st return events
[docs] def parse_history(self, lines_list): """Parse history events from list of lines read from file. Parses history events and stores them in the respective attribute fields. Args: lines_list(:obj:`list` of :obj:`str`): list of lines. """ for i, line in enumerate(lines_list): if re.match(r"^Battery History \([0-9]", line): # header, ignore continue if re.match(r"^Per-PID Stats", line) or len(line) == 0: return elif re.match(r"^\s*([^\s]+) (\(\d+\)) (\d+)(.*)?$", line): x = re.match(r"^\s*([^\s]+) (\(\d+\)) (\d+)(.*)?$", line) time = convertBatStatTimeToTimeStamp(x.groups()[0], timezone=self.timezone) time += self.start_time # print(time) events = self.parse_states(x.groups()[3]) self.add_update(time, events) elif re.match(r"^\s*0 (\(\d+\)) (.*)?$", line): x = re.match(r"^\s*0 (\(\d+\)) (.*)?$", line) if "RESET:TIME" in x.groups()[1]: self.start_time = batStatResetTimeToTimeStamp((x.groups()[1]).replace("RESET:TIME: ", ""), self.timezone) # print(epochToDate(self.start_time)) else: # TODO Handle DcpuStats and DpstStats # print(line) log("Unrecognized patter in line of batstats file", LogSeverity.WARNING)
[docs] def add_update(self, time, bat_events): """Adds new event updates to current state. Args: time(int): timestamp of new events. bat_events(:obj:`list` of :obj:`dict`): """ if len(self.events) == 0: bt = BatteryEvent(time, bat_events) self.estimate_current_consumption(bt) self.events.append(bt) else: last_added = self.events[-1] if last_added.time == time: self.events[-1].add_events(bat_events) else: # TODO try to replace with shallow copy bt = copy.deepcopy(self.events[-1]) bt.time = time bt.add_events(bat_events) self.estimate_current_consumption(bt) self.events.append(bt)
[docs] def estimate_current_consumption(self, bt_event): """estimates current power being consumed by event. Args: bt_event(str): key of the event. """ power = {} for p, v in self.powerProfile.components.items(): st = self.determinate_component_current(bt_event, p, v) power[p] = st # print("%s %s" %(p , str(st))) bt_event.currents = power
[docs] def get_closest_pair(self, time): """returns closest events of a given timestamp time. Args: time(float): timestamp. Returns: int, int: index of the closest events. """ lasti = 0 for i, x in enumerate(self.events): if x.time > time: return lasti, i lasti = i return lasti, lasti
[docs] def get_events_in_between(self, start_time, end_time): """get batstat events occured between start_time and end_time. Args: start_time(int): start timestamp. end_time(int): end timestamp. Returns: dict: events occurred. """ metrics = {} if (end_time - start_time) <= 0: return metrics c_beg_bef, c_beg_aft = self.get_closest_pair(start_time) # {'health: [(event_state,start,end, pctage_duration)], ..} prev_time = self.events[c_beg_aft].time if len(self.events) > 0 else start_time fst_time = prev_time for ev in self.events[c_beg_aft:]: if ev.time > end_time: break for kup, upval in ev.updates.items(): metrics[kup] = [] if kup not in metrics else metrics[kup] contains_update_val = metrics[kup][-1][0] == upval if len( metrics[kup]) > 0 else None # next(filter(lambda x: x[0] == upval, metrics[kup]), None) if not contains_update_val: if len(metrics[kup]) > 0: # update previous state duration_pctage = 100 * ((ev.time - metrics[kup][-1][1]) / (end_time - start_time)) metrics[kup][-1] = ( metrics[kup][-1][0], metrics[kup][-1][1], ev.time, duration_pctage) # last_time? init_time = start_time if fst_time == ev.time else ev.time duration_pctage = 100 * ((end_time - init_time) / (end_time - start_time)) metrics[kup].append((upval, init_time, end_time, duration_pctage)) # concs for cup, cupval in ev.concurrentUpdates.items(): metrics[cup] = [] if cup not in metrics else metrics[cup] # detect pushes for c_i, cval in enumerate(cupval): # is_in_metrics = metrics[cup][-1][0]['val'] == cval['val'] and metrics[cup][-1][0]['val2'] == cval['val2'] if len(metrics[cup]) > 0 else False is_in_metrics = next(filter( lambda x: x[0]['val'] == cval['val'] and x[0]['val2'] == cval['val2'] and x[2] == end_time, metrics[cup]), None) if not is_in_metrics: # new state, add init_time = start_time if prev_time == ev.time else ev.time duration_pctage = 100 * ((end_time - init_time) / (end_time - start_time)) metrics[cup].append((cval, init_time, end_time, duration_pctage)) # detect pops for i, val_of_metrics in enumerate(metrics[cup]): this_ev_contains_val = next(filter( lambda t: t['val'] == val_of_metrics[0]['val'] and t['val2'] == val_of_metrics[0]['val2'], cupval), None) if not this_ev_contains_val: # was popped, update already_popped = val_of_metrics[2] != end_time if not already_popped: prev_start_time = val_of_metrics[1] duration_pctage = 100 * ((ev.time - prev_start_time) / (end_time - start_time)) metrics[cup][i] = (val_of_metrics[0], prev_start_time, ev.time, duration_pctage) prev_time = ev.time return metrics
[docs] def get_CPU_samples_in_between(self, start_time, end_time): """returns cpu states recorded between start and end time. Args: start_time(int): start timestamp. end_time(int): end timestamp. Returns: dict: states occurred. """ l = [] last_ev = self.events[0] if len(self.events) > 0 else None last_time = start_time for x in self.events: if x.time > start_time and x.time < end_time: delta = x.time - last_time state = last_ev.currents["cpu"] voltage = last_ev.get_voltage_value() # float(last_ev.updates["volt"]) pair = (delta, state, voltage) l.append(pair) last_time = x.time last_ev = x last_delta = end_time - last_time last_state = last_ev.currents["cpu"] last_voltage = last_ev.get_voltage_value() # float(last_ev.updates["volt"]) last_pair = (last_delta, last_state, last_voltage) l.append(last_pair) return l
[docs] def determinate_component_current(self, bt_event, comp_name, possible_states): """calculates current being consumed during bt_event by component identified by comp_name. Args: bt_event: event. comp_name: component name. possible_states: possible states of the component. Returns: float: current being consumed. """ current = 0.0 curravg = 0 avg_ct = 0 # screen if comp_name == "screen" and "screen" in bt_event.updates: on_current = possible_states["on"] brightness_level = bt_event.updates["brightness"] if "brightness" in bt_event.updates else 1 relative_full_current = (brightness_level * possible_states["full"] / ( len(self.definitions["nominal"]["brightness"]) - 1)) current += on_current + relative_full_current elif comp_name == "ambient" and "screen_doze" in bt_event.updates: # power profile might have a defined value for ambient/doze screen consumpri doze_current = possible_states["on"] current += doze_current # camera/flashlight elif comp_name == "camera": if "camera" in bt_event.updates: # usually available as avg consumption (Intended as a rough estimate for an application running a preview and capturing approximately 10 full-resolution pictures per minute.) cam_current = possible_states["avg"] current += cam_current if "flashlight" in bt_event.updates: flash_curr = possible_states["flashlight"] current += flash_curr # dsp elif comp_name == "dsp": if "video" in bt_event.updates: video_curr = possible_states["video"] if comp_name == "dsp" else possible_states current += video_curr if "audio" in bt_event.updates: audio_curr = possible_states["audio"] if comp_name == "dsp" else possible_states current += audio_curr # audio elif comp_name == "video" and "video" in bt_event.updates: video_curr = possible_states current += video_curr # video elif comp_name == "audio" and "audio" in bt_event.updates: audio_curr = possible_states current += audio_curr # wifi elif comp_name == "wifi" and "wifi_running" in bt_event.updates: on_current = possible_states["on"] if "on" in possible_states else 0 on_current += possible_states["controller"]["idle"] if ( "controller" in possible_states and "idle" in possible_states["controller"]) else 0 current += on_current if "wifi_scan" in bt_event.updates: if "scan" in possible_states: current += possible_states["scan"] if "controller" in possible_states: curravg = 0 avg_ct = 0 if "tx" in possible_states["controller"]: curravg += possible_states["controller"]["tx"] avg_ct += 1 if "rx" in possible_states["controller"]: curravg += possible_states["controller"]["rx"] avg_ct += 1 current += safe_division(curravg, avg_ct) elif "wifi_radio" in bt_event.updates: current += possible_states["active"] if "active" in possible_states else 0 if "controller" in possible_states: curravg = 0 avg_ct = 0 if "tx" in possible_states["controller"]: curravg += possible_states["controller"]["tx"] avg_ct += 1 if "rx" in possible_states["controller"]: curravg += possible_states["controller"]["rx"] avg_ct += 1 current += safe_division(curravg, avg_ct) # gps elif comp_name == "gps": if "signalqualitybased" in possible_states: # and "gps_signal_quality" in bt_event.updates: # considerate gps signal quality if "gps_signal_quality" in bt_event.updates: val = 1 if bt_event.updates["gps_signal_quality"] == "good" else 0 current += possible_states["signalqualitybased"][val] if "on" in possible_states and "gps" in bt_event.updates: current += possible_states["on"] # bluetooth elif comp_name == "bluetooth": if self.android_version < 7 or "controller" not in possible_states: # account blue on and active vals if "ble_scan" in bt_event.updates: current += possible_states["active"] if "active" in possible_states else 0 current += possible_states["on"] if "on" in possible_states else 0 elif "bluetooth" in bt_event.updates: current += possible_states["on"] if "on" in possible_states else 0 elif "controller" in possible_states: current += possible_states["controller"]["idle"] if ("idle" in possible_states["controller"]) else 0 if "ble_scan" in bt_event.updates: if "tx" in possible_states["controller"]: curravg += possible_states["controller"]["tx"] avg_ct += 1 if "rx" in possible_states["controller"]: curravg += possible_states["controller"]["rx"] avg_ct += 1 current += safe_division(curravg, avg_ct) # radio = modem elif comp_name == "radio": # radio.on if "phone_scanning" in bt_event.updates: # radio.scanning current += possible_states["scanning"] if "scanning" in possible_states else 0 elif "mobile_radio" in bt_event.updates: on_vals = list(possible_states["on"]) if "on" in possible_states else [] signal_stren = bt_event.updates[ "phone_signal_strength"] if "phone_signal_strength" in bt_event.updates else 0 if signal_stren >= len(on_vals) and len(on_vals) > 0: current += on_vals[-1] elif len(on_vals) > 0: current += on_vals[signal_stren] # radio.active == mobile_radio - transmiting elif comp_name == "modem": # same as radio if "phone_scanning" in bt_event.updates: curravg = 0 avg_ct = 0 if "tx" in possible_states["controller"]: on_vals = possible_states["controller"]["tx"] if "tx" in possible_states["controller"] else [] signal_stren = bt_event.updates[ "phone_signal_strength"] if "phone_signal_strength" in bt_event.updates else 0 if signal_stren > len(on_vals) and len(on_vals) > 0: curravg += on_vals[-1] elif len(on_vals) > 0: curravg += on_vals[signal_stren] avg_ct += 1 if "rx" in possible_states["controller"]: curravg += possible_states["controller"]["rx"] avg_ct += 1 current += safe_division(curravg, avg_ct) elif "mobile_radio" in bt_event.updates: current += possible_states["idle"] if "idle" in possible_states else 0 # cpu elif comp_name == "cpu": # retrieve just the component state # if phone has multiple cpu_clusters and that info is present in power profile file # only for devices with heterogeneous CPU architectures. # print(possible_states) # has_multiple_cpu_clusters= "clusters" in possible_states and "cores" in possible_states["clusters"] and len(possible_states["clusters"]["cores"])>1 # if has_multiple_cpu_clusters: # cores_per_cluster = possible_states["clusters"]["cores"] # calculate energy per cluster # if areInMinCPUFreq(bt_event.updates["cpufreq"] , possible_states): # assume is just awake # current += possible_states["awake"] # else: # calculate active state # current += 0 # possible_states["awake"] if "awake" in possible_states else 0 # print("TODO Calculate power according to freq") # else: if "running" in bt_event.updates: # is active or just awake # if "awake" in possible_states and areInMinCPUFreq(bt_event.updates["cpufreq"] , possible_states): # if areInMinCPUFreq(bt_event.updates["cpufreq"] , possible_states): # assume is just awake # current += possible_states["awake"] # else: # calculate active state # current += 0 # possible_states["awake"] if "awake" in possible_states else 0 # print("TODO Calculate power according to freq") current = "active" else: # cpu in idle state # current+= possible_states["idle"] if "idle" in possible_states else 0 current = "idle" return current
[docs] def parse_file(self, filepath): """parses events ands stores event from file with output from dumpsys batterystats command. Args: filepath: output filepath. """ with open(filepath, 'r') as ff: lines = ff.read().splitlines() self.parse_history(lines)
# if __name__ == '__main__': # if len(sys.argv)>1: # pp = "samples/profiles/power_profile.xml" # pp = "samples/profiles/power_profile_pixel3a_grapheneos.xml" # x = BatteryStatsParser(powerProfile=pp,timezone="WET") # x.parseFile(sys.argv[1])