Source code for hooqu.analyzers.runners.analysis_runner

from collections import defaultdict
from dataclasses import dataclass, field
from itertools import accumulate
from typing import Dict, List, Mapping, Optional, Sequence, Set, cast

import pandas as pd
from more_itertools import partition

from hooqu.analyzers import Analyzer, ScanShareableAnalyzer
from hooqu.analyzers.analyzer import AggDefinition
from hooqu.analyzers.preconditions import find_first_failing
from hooqu.metrics import Metric


[docs]@dataclass(frozen=True, eq=True) class AnalyzerContext: metric_map: Mapping[Analyzer, Metric] = field(default_factory=dict)
[docs] def all_metrics(self) -> List[Metric]: return list(self.metric_map.values())
def __add__(self, other: "AnalyzerContext"): return AnalyzerContext({**self.metric_map, **other.metric_map})
[docs] def metric(self, analyzer: Analyzer) -> Optional[Metric]: return self.metric_map.get(analyzer, None)
[docs] @classmethod def success_metrics_as_dataframe( cls, analyzer_context: "AnalyzerContext", for_analyzers: Sequence[Analyzer] = None, ) -> pd.DataFrame: if not for_analyzers: for_analyzers = [] mp = analyzer_context.metric_map # originally implemented in getSimplifiedOutputForSelectedAnalyzers # Get the analyzers are required that were sucessful mp = { k: mp[k] for k in mp if ( (not len(for_analyzers) or k in for_analyzers) and mp[k].value.isSuccess ) } # Get metrics as Double and replace simple name with description renamed: List[Metric] = [] for a in mp: # TODO: rename metric renamed.extend(map(lambda x: x, mp[a].flatten())) df = pd.DataFrame(metric.asdict() for metric in renamed) df = df.sort_values(by="entity", ascending=False, ignore_index=True) return df
[docs]def do_analysis_run( data, analyzers: Sequence[Analyzer], aggregate_with=None, # unused for now save_state_with=None, # unused for now metric_repository_options=None, # it will be a dict or something similar ) -> AnalyzerContext: """ Compute the metrics from the analyzers configured in the analysis Parameters ---------- data: data on which to operate analyzers: the analyzers to run aggregate_With: (not implemented) load existing states for the configured analyzers and aggregate them (optional) save_States_With: (not implemented) persist resulting states for the configured analyzers (optional) metric_repository_options: (not implemented) options related to the MetricsRepository file_output_options: (not implemented probably will be removed) options related to File Ouput. Returns ------- An AnalyzerContext holding the requested metrics per analyzer """ if not analyzers: return AnalyzerContext() # TODO: # If we already calculated some metric and they are in the metric repo # we will take it from the metric repo instead. # relevant in the case of multiple checks on the same datasource_ # also do some additional checks here # for for now they are the same analyzers_to_run = analyzers passed_analyzers = list( filter( lambda an: find_first_failing(data, an.preconditions()) is None, analyzers_to_run, ) ) # Create the failure metrics from the precondition violations failed_analyzers = set(analyzers_to_run) - set(passed_analyzers) precondition_failures = compute_precondition_failure_metrics(failed_analyzers, data) # Originally the idea is be able to run all the analysis on a single scan # assuming that internally pandas would do something like that # however apparently there is no big gain from running all aggregations at once # so for now we run the aggregation sequentially. # TODO: Deal with gruping analyzers (if necessary) metrics = run_analyzers_sequentially(data, analyzers) return metrics + precondition_failures
[docs]def run_non_scanning_analyzers(data, analyzers: Sequence[Analyzer]): metrics_by_analyzer: Dict[Analyzer, Metric] = {} for an in analyzers: metrics_by_analyzer[an] = an.calculate(data) return AnalyzerContext(metrics_by_analyzer)
[docs]def compute_precondition_failure_metrics( failed_analyzers: Set[Analyzer], data ) -> AnalyzerContext: def first_exception_to_failure_metric(analyzer): first_exception = find_first_failing(data, analyzer.preconditions()) if not first_exception: raise AssertionError("At least one exception should be found in a failing") return analyzer.to_failure_metric(first_exception) failures = {a: first_exception_to_failure_metric(a) for a in failed_analyzers} return AnalyzerContext(failures)
[docs]def run_analyzers_sequentially( data, analyzers: Sequence[Analyzer], aggregate_with=None, save_state_with=None ) -> AnalyzerContext: """ Apparently from the initial tests I made there is not a lot of gain from running all the aggregations at once. """ if not len(analyzers): return AnalyzerContext() metrics_by_analyzer: Dict[Analyzer, Metric] = {} for an in analyzers: try: metrics_by_analyzer[an] = an.calculate(data) except Exception as e: metrics_by_analyzer[an] = an.to_failure_metric(e) analyzer_context = AnalyzerContext(metrics_by_analyzer) return analyzer_context
[docs]def run_scanning_analyzers( data, analyzers: Sequence[Analyzer], aggregate_with=None, save_state_with=None ) -> AnalyzerContext: others, shareable = partition( lambda a: isinstance(a, ScanShareableAnalyzer), analyzers ) shareable_list: List[ScanShareableAnalyzer] = cast( List[ScanShareableAnalyzer], list(shareable) ) def merge_aggregations(aggregations_list: List[AggDefinition]): ma = defaultdict(set) # type: ignore for ags in aggregations_list: for k in ags: ma[k] = ma[k] | ags[k] return dict(ma) # Compute aggregation functions of shareable analyzers in a single pass over # the data # On Pandas this does not make a lot of sense results = None metrics_by_analyzer: Dict[Analyzer, Metric] = {} if len(shareable_list): try: # aggregations = # list(flatten(a._aggregation_functions() for a in shareable)) # This is a dic with column -> aggregation lists merged_aggregations = merge_aggregations( [a._aggregation_functions() for a in shareable_list] ) # aggregations_names = list(flatten(list(merged_aggregations.values()))) # Compute offsets so that the analyzers can correctly pick their results # from the row # FIXME: Note that this only works if the aggregation does not generates # from now on internally the analyzers will use the function name so the # offset is not used (at least for the pandas implementation) agg_functions = [0] + [ len(a._aggregation_functions()) for a in shareable_list ] offsets = list(accumulate(agg_functions, lambda a, b: a + b))[:-1] results = data.agg(merged_aggregations) for an, offset in zip(shareable_list, offsets): metrics_by_analyzer[an] = _success_or_failure_metric_from( an, results, offset ) except Exception as e: metrics_by_analyzer = {a: a.to_failure_metric(e) for a in analyzers} analyzer_context = AnalyzerContext(metrics_by_analyzer) else: analyzer_context = AnalyzerContext() # TODO: Run not shareable analyzers return analyzer_context
# originally implementedd in AnalysisRunner.scala def _success_or_failure_metric_from( analyzer: ScanShareableAnalyzer, aggregation_result, offset: int ) -> Metric: try: r = analyzer.metric_from_aggregation_result(aggregation_result, offset) return r except Exception as e: return analyzer.to_failure_metric(e)