Source code for hooqu.verification_suite

# coding: utf-8

import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple

from hooqu.analyzers import Analyzer
from hooqu.analyzers.runners import AnalyzerContext
from hooqu.analyzers.runners.analysis_runner import do_analysis_run
from hooqu.checks import Check, CheckResult, CheckStatus
from hooqu.dataframe import DataFrameLike
from hooqu.metrics import Metric

logger = logging.getLogger(__name__)


[docs]@dataclass class VerificationResult: status: CheckStatus check_results: Mapping[Check, CheckResult] metrics: Mapping[Analyzer, Metric]
# Helper for the fluent Api
[docs]class VerificationRunBuilder: def __init__(self, data): self.data = data self._checks: List[Check] = [] self._required_analyzers: Optional[Tuple[Analyzer, ...]] = None
[docs] def run(self) -> VerificationResult: return VerificationSuite().do_verification_run( self.data, self._checks, self._required_analyzers, None, None, None, None, )
[docs] def add_check(self, check: Check) -> "VerificationRunBuilder": """ Add a single check to the run. Parameters ---------- check: A check object to be executed during the run """ self._checks.append(check) return self
[docs] def add_checks(self, checks: Sequence[Check]) -> "VerificationRunBuilder": """ Add multiple checks to the run. Parameters ---------- checks: A sequence of check objects to be executed during the run """ self._checks.extend(checks) return self
[docs]class VerificationSuite: def __init__(self): self._checks: List[Check] = [] self._required_analyzers: Optional[Tuple[Analyzer, ...]] = None
[docs] def add_check(self, check: Check) -> "VerificationSuite": """ Add a single check to the run. Parameters ---------- check: A check object to be executed during the run """ self._checks.append(check) return self
[docs] def add_checks(self, checks: Sequence[Check]) -> "VerificationSuite": """ Add multiple checks to the run. Parameters ---------- checks: A sequence of check objects to be executed during the run """ self._checks.extend(checks) return self
[docs] def run(self, data: DataFrameLike) -> VerificationResult: """ Runs all check groups and returns the verification result. Verification result includes all the metrics computed during the run. Parameters ---------- data: tabular data on which the checks should be verified """ return self.do_verification_run( data, self._checks, self._required_analyzers, None, None, None, None, )
[docs] def on_data(self, data): return VerificationRunBuilder(data)
[docs] def do_verification_run( self, data, checks: Sequence[Check], required_analyzers: Optional[Tuple[Analyzer, ...]] = None, aggregate_with: Any = None, # FIXME save_states_with: Any = None, # FIXME # TODO: maybe change this for kwargs metric_repository_options: Optional[Dict[str, Any]] = None, file_output_options: Optional[Dict[str, Any]] = None, ) -> VerificationResult: """ Runs all check groups and returns the verification result. Verification result includes all the metrics computed during the run. Parameters ---------- data: tabular data on which the checks should be verified checks: A sequence of check objects to be executed required_analyzers: Can be used to enforce the calculation of some some metrics regardless of if there are constraints on them (optional) aggregate_with: not implemented loader from which we retrieve initial states to aggregate (optional) save_states_with: not implemented persist resulting states for the configured analyzers (optional) metrics_repository_options: Options related to the MetricsRepository Returns -------- returns Result for every check including the overall status, detailed status for each constraints and all metrics produced """ required_analyzers = required_analyzers or () analyzers = required_analyzers + tuple( [a for check in checks for a in check.required_analyzers()] ) # This rhis returns AnalysisContext analysis_result = do_analysis_run(data, analyzers) verification_result = self.evaluate(checks, analysis_result) # TODO: Save ave or append Results on the metric reposiotory # TODO: Save JsonOutputToFilesystemIfNecessary return verification_result
[docs] def evaluate( self, checks: Sequence[Check], analysis_context: AnalyzerContext, ) -> VerificationResult: check_results = {c: c.evaluate(analysis_context) for c in checks} if not check_results: verification_status = CheckStatus.SUCCESS else: verification_status = max(cr.status for cr in check_results.values()) return VerificationResult( verification_status, check_results, analysis_context.metric_map )