Source code for manafa.parsing.perfetto.perfettoParser



import re
from enum import Enum

from manafa.parsing.powerProfile.PowerProfile import PowerProfile

x="""import time
import subprocess
def executeShCommand(command):

    pipes = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    std_out, std_err = pipes.communicate()
    output = std_out.decode("utf-8").lower()
    if pipes.returncode != 0:
        err_msg = "%s. Code: %s" % (std_err.decode('utf-8').strip(), pipes.returncode)
        print("error executing command %s" % command)
        print(err_msg)
        return -1    
    elif len(std_err)==0:
        return output


def epochToDate(ts):
	return time.ctime(ts)
###"""

[docs]def interpolate(x1: float, x2: float, y1: float, y2: float, x: float): """Performs linear interpolation for x between (x1,y1) and (x2,y2) """ return ((y2 - y1) * x + x2 * y1 - x1 * y2) / (x2 - x1) if (x2-x1)>0 else y1
#print(val) #print("---") #return val
[docs]class CPU_STATE(Enum): SUSPEND = "suspend" IDLE = "idle" AWAKE = "awake" ACTIVE = "active"
[docs]class PerfettoCPUEvent(object): """Stores information regarding each cpu frequency in a given time. A perfetto cpufreq event information, corresponding to a line in an results output file in systrace format. Attributes: time: event_time. vals: frequency for each cpu of device. """ def __init__(self, time=0.0, values=[]): self.time=time self.vals=[] for x in values: self.vals.append(x) def __str__(self): return "time: %f vals = %s , " % (self.time, str(self.vals)) def __repr__(self): return str(self)
[docs] def init_all(self, default_len=8, val=0): """inits values for each cpu. Args: default_len: number of cores. val: default value. """ for x in range(0, default_len): if len(self.vals) > x: self.vals[x] = val else: self.vals.append(val)
[docs] def update(self, cpu_id,cpu_freq): """update/insert cpufreq val for each cpu id""" if len(self.vals)> cpu_id: self.vals[cpu_id]=cpu_freq else: for x in range(len(self.vals)-1, cpu_id): self.vals.append(cpu_freq)
[docs] def calculate_CPUs_current(self, state, profile): """given a power profile and a cpu state, returns the instantaneous current being consumed by all cpu cores in that state. Args: state: cpu state in CPU_STATE values profile: power profile class """ total = 0 if state not in ["idle", "suspend"]: for core_id, freq in enumerate(self.vals): bf, aft = profile.get_CPU_core_speed_pair(core_id, freq) lin_inter_val = interpolate(bf[0], aft[0], bf[1], aft[1], freq) total += lin_inter_val total = total / len(self.vals) else: total = profile.get_CPU_state_current(state) return total / 1000
[docs]class PerfettoCPUfreqParser(object): """Parses cpu frequency updates from a log file obtained with perfetto. Attributes: power_profile: current device power profile. start_time: lower timestamp bound to consider. timezone: device timezone. """ def __init__(self, power_profile=None, start_time=0.0, timezone="EST"): self.events = [] self.start_time = start_time self.power_profile = self.load_power_profile(power_profile) if power_profile is not None else {}
[docs] @staticmethod def load_power_profile(xml_profile): """Loads power profile from xml_profile filepath. Returns: object: power profile file. """ return PowerProfile(xml_profile)
[docs] def parse_file(self, filename): """parses filename. Args: filename: path of log file resultant of a profiling session with perfetto. """ with open(filename, 'r') as filehandle: lines = filehandle.read().splitlines() self.parse_history(lines)
[docs] def parse_history(self, lines): """parses event from lines. Args: lines: list of lines from file. """ for line in lines: if line.startswith("#"): continue z = re.match(r"^\s*([^\s]+)\-(\d+)\s*\(\s*(\d+|\-+)\) \[(\d+)\] (\d+|\.+) ([0-9]*\.[0-9]+|[0-9]+)\: (.*)?$",line) if z is not None: time = float(z.groups()[5]) time += self.start_time ev_pair = self.parse_event(z.groups()[6]) if ev_pair is not None: cpu_id = ev_pair[0] cpu_freq = ev_pair[1] self.add_event(time, cpu_id, cpu_freq) else: raise Exception("Error parsing file")
[docs] def add_event(self, time: float, cpu_id: int, cpu_freq: int): """add or update cpu freq event based on values passed as argument. Args: time: timestamp of event. cpu_id: id of cpu. cpu_freq: frequency value. """ if len(self.events) == 0: z = PerfettoCPUEvent(time) z.init_all(default_len=8, val=cpu_freq) self.events.append(z) else: last = self.events[-1] z = PerfettoCPUEvent(time, last.vals) z.update(cpu_id,cpu_freq) self.events.append(z)
[docs] def parse_event(self, ev_str): """ parse frequency and cpu id from string. Args: ev_str: string expecting to have the patttern. Returns: cpu_id(int): id of the cpu. cpu_freq(int): frequency value. """ mat = re.match(r'cpu_frequency: state=(\d+) cpu_id=(\d+)', ev_str) if mat: cpu_id = int(mat.groups()[1]) cpu_freq = int(mat.groups()[0]) return cpu_id, cpu_freq return None
[docs] def get_closest_pair(self, time): """return the closest indexes of samples to time. Args: time: reference time. Returns: lasti(int): before index. i(int): after index. """ lasti = 0 for i, x in enumerate(self.events): if x.time > time: return lasti, i lasti = i return lasti, lasti
#bootTime = float ( executeShCommand ("adb shell cat /proc/stat | grep btime | awk '{print $2}'").strip() ) #print(bootTime) #print(epochToDate(bootTime)) #x = PerfettoCPUfreqParser(bootTime) #x.parseFile("/Users/ruirua/repos/petra_like/results/perfetto/trace-1605638909.systrace") #print(x.events)