Source code for anadroid.results_analysis.LogAnalyzer

import os
import re

from logcatparser.logCatParser import LogCatParser

from anadroid.results_analysis.AbstractAnalyzer import AbstractAnalyzer
from anadroid.utils.Utils import execute_shell_command, loge
from manafa.utils.Logger import log


[docs]class LogAnalyzer(AbstractAnalyzer): """Implements AbstractAnalyzer interface to allow analyze Android logs produced during profiling sessions using logcatparser. Calculate statistics about the produced logs to analyze and characterize test executions. """ def __init__(self, profiler): self.supported_filters = {"fatal_errors", "ANR", "Exceptions"} super(LogAnalyzer, self).__init__(profiler) self.logcatparser = LogCatParser(log_format="threadtime")
[docs] def setup(self, **kwargs): super().setup()
[docs] def fetch_log_files(self, dir_path, test_id=""): #return os.path.join(app.curr_local_dir, f'test_{test_id}.logcat') TODO fetch test file name format from cfg file return [os.path.join(dir_path, f) for f in os.listdir(dir_path) if f'{test_id}.logcat' in f]
[docs] def analyze_tests(self, app=None, results_dir=None, **kwargs): target_dir = app.curr_local_dir if results_dir is None and app is not None else results_dir for log_file in self.fetch_log_files(target_dir): test_id = kwargs['test_id'] if 'test_id' in kwargs else os.path.basename(log_file).split("_")[1].split(".")[0] self.analyze_test(app, log_file, output_filename=f'test_{test_id}_logresume.json') self.clean()
[docs] def analyze_test(self, app, log_file, output_filename=None, **kwargs): self.logcatparser.parse_file(log_file) the_dir = os.path.dirname(log_file) self.logcatparser.save_results(os.path.join(the_dir, output_filename))
[docs] def clean(self): super().clean() self.logcatparser = LogCatParser(log_format="threadtime")
[docs] def show_results(self, app_list): #for analyzed_app in app_list: #print(analyzed_app) #print("loganalyzer TODO show result for each test") pass
[docs] def validate_test(self, app, test_id, **kwargs): log_file = kwargs.get('log_filename') if 'log_filename' in kwargs else 'batata' # todo self.logcatparser.parse_file(log_file) return self.validate_filters()
[docs] def validate_filters(self): for filter_name, fv in self.validation_filters.filters.items(): if filter_name in self.supported_filters: for filt in fv: val_for_filter = self.get_val_for_filter(filter_name, ) if not filt.apply_filter(val_for_filter): log(f"filter {filter_name} failed. value: {val_for_filter}") return False else: log(f"unsupported filter {filter_name}") return False return True
[docs] def get_val_for_filter(self, filter_name, add_data=None): if filter_name == "fatal_errors": return self.logcatparser.get_parser_resume()['stats']['fatal'] elif filter_name == "ANR": return self.logcatparser.get_parser_resume()['known_errors']['ANR'] elif filter_name == "Exceptions": return self.logcatparser.get_parser_resume()['known_errors']['Exception'] val = super().get_val_for_filter(filter_name, add_data) if val is None: loge(f"unsupported value ({val}) for {filter_name} ({self.__class__})") return val