Source code for hooqu.checks

# coding: utf-8

from dataclasses import dataclass, field
from enum import Enum, IntEnum
from typing import Any, Callable, List, Optional, Sequence, Set, Tuple, Union, cast

import numpy as np

from hooqu.analyzers import Analyzer
from hooqu.analyzers.runners import AnalyzerContext
from hooqu.constraints import (
    AnalysisBasedConstraint,
    Constraint,
    ConstraintDecorator,
    ConstraintResult,
    completeness_constraint,
    compliance_constraint,
    max_constraint,
    mean_constraint,
    min_constraint,
    quantile_constraint,
    size_constraint,
    standard_deviation_constraint,
    sum_constraint,
    uniqueness_constraint,
)
from hooqu.constraints.constraint import ConstraintStatus


[docs]class CheckLevel(Enum): WARNING = 0 ERROR = 1
[docs]class CheckStatus(IntEnum): SUCCESS = 0 WARNING = 1 ERROR = 2
[docs]@dataclass(frozen=True, eq=True) class CheckResult: check: Any status: CheckStatus constraint_results: Sequence[ConstraintResult] = field(default_factory=tuple)
def is_one(value: Union[float, int]) -> bool: return value == 1
[docs]@dataclass(frozen=True, eq=True) class Check: level: CheckLevel description: str constraints: Tuple[Constraint, ...] = field(default_factory=tuple)
[docs] def add_constraint(self, constraint: Constraint) -> "Check": """ Returns a new Check object with the given constraint added to the constraints list. Parameters ------------- constraint: New constraint to be added """ return Check(self.level, self.description, self.constraints + (constraint,))
def _add_filterable_constraint( self, creation_func: Callable[[Optional[str]], Constraint] ) -> "CheckWithLastConstraintFilterable": """ Adds a constraint that can subsequently be replaced with a filtered version. """ constraint_without_filtering = creation_func(None) return CheckWithLastConstraintFilterable( self.level, self.description, self.constraints + (constraint_without_filtering,), creation_func, ) def required_analyzers(self) -> Set[Analyzer]: rc = ( c.inner if isinstance(c, ConstraintDecorator) else c for c in self.constraints ) # map anbc: List[AnalysisBasedConstraint] = cast( List[AnalysisBasedConstraint], list(filter(lambda c: isinstance(c, AnalysisBasedConstraint), rc)), ) # collect analyzers = {c.analyzer for c in anbc} # map return analyzers
[docs] def has_size( self, assertion: Callable[[int], bool], hint: Optional[str] = None ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that calculates the data frame size and runs the assertion on it. Parameters ---------- assertion: A callable that receives a long input parameter and returns a boolean. The callable will receive the value of the size (number of rows) and return a boolean based on whether it satisfies a condition, e.g. ``lambda sz: sz > 5``. hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: size_constraint(assertion, filter_, hint) )
[docs] def has_min( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the minimum of the column Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: min_constraint(column, assertion, filter_, hint) )
[docs] def has_max( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the maximum of the column Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: max_constraint(column, assertion, filter_, hint) )
[docs] def is_complete( self, column: str, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on a column completion. Parameters ---------- column: Column to run the assertion on. """ return self._add_filterable_constraint( lambda filter_: completeness_constraint(column, is_one, filter_, hint) )
[docs] def has_completeness( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on a column completion Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: completeness_constraint(column, assertion, filter_, hint) )
[docs] def has_mean( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the mean of the column. Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: mean_constraint(column, assertion, filter_, hint) )
[docs] def has_standard_deviation( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the standard deviation of the column. Note that unlike pandas this calculate the population variance i.e. degree of freedom (ddof=0). NaNs are ignored when performing the calculation. Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: standard_deviation_constraint( column, assertion, filter_, hint ) )
[docs] def has_sum( self, column: str, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the sum of the column. Parameters ---------- column: Column to run the assertion on. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: sum_constraint(column, assertion, filter_, hint) )
[docs] def has_quantile( self, column: str, q: float, assertion: Callable[[float], bool], hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on the quantile of the column. Note that the quantile calculation is done using the "nearest" interpolation, meaning that the closest value of the column ``column`` is returned Parameters ---------- column: Column to run the assertion on. q: The q-th quantile to calculate which must be between 0 and 1 inclusive. assertion: A callable that receives a float and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: quantile_constraint(column, q, assertion, filter_, hint) )
def satisfies( self, column_condition: str, constraint_name: str, assertion: Callable[[float], bool] = is_one, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": return self._add_filterable_constraint( lambda filter_: compliance_constraint( constraint_name, column_condition, assertion, filter_, hint ) )
[docs] def is_non_negative( self, column: str, assertion: Callable[[float], bool] = is_one, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts that a column contains no negative values Parameters ---------- column: Column to run the assertion on assertion: Callable that receives a float input parameter and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ # coalescing column to not count NULL values as non-compliant return self.satisfies( f"`{column}`.fillna(0) >= 0", f"{column} is non-negative", assertion, hint=hint, )
[docs] def is_positive( self, column: str, assertion: Callable[[float], bool] = is_one, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts that a column contains positive values. Parameters ---------- column: Column to run the assertion on assertion: Callable that receives a float input parameter and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ # coalescing column to not count NULL values as non-compliant return self.satisfies( f"`{column}`.fillna(1.0) > 0", f"{column} is positive", assertion, hint=hint, )
[docs] def is_contained_in( self, column: str, allowed_values: Sequence[Union[str, int]], assertion: Callable[[float], bool] = is_one, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Asserts that every non-null value in a column is contained in a set of predefined values. Note that this only works on a set of string sequences. Parameters ---------- column: Column to run the assertion on allowed_values: Allowed values for the column assertion: Callable that receives a float input parameter and returns a boolean hint: A hint to provide additional context why a constraint could have failed """ allowed_values = list(allowed_values) is_numeric_sequence = all( isinstance(value, (int, np.integer)) for value in allowed_values ) if not allowed_values: raise ValueError("Empty list of allowed values used") if not isinstance(allowed_values[0], str) and not is_numeric_sequence: raise ValueError( "The type of allowed values should be string or integer but got" f" '{type(allowed_values[0])}'" ) predicate = f"`{column}`.isna() or `{column}`.isin({allowed_values})" return self.satisfies( predicate, f"{column} contained in {allowed_values}", assertion, hint )
[docs] def is_contained_in_range( self, column: str, lower_bound: float, upper_bound: float, include_lower_bound: bool = True, include_upper_bound: bool = True, hint: Optional[str] = None, ) -> "CheckWithLastConstraintFilterable": """ Asserts that the non-null values in a numeric column fall into the predefined interval Parameters ---------- column: Column to run the assertion on lower_bound: lower bound of the interval upper_bound: upper bound of the interval include_lower_bound: is a value equal to the lower bound allowed? include_upper_bound: is a value equal to the upper bound allowed? hint: A hint to provide additional context why a constraint could have failed """ left_operand = ">=" if include_lower_bound else ">" right_operand = "<=" if include_upper_bound else "<" predicate = ( f"`{column}`.isna() or " f"(`{column}` {left_operand} {lower_bound} " f" and `{column}` {right_operand} {upper_bound})" ) return self.satisfies( predicate, f"{column} between {lower_bound} and {upper_bound}", hint=hint )
[docs] def is_unique( self, column: str, hint: Optional[str] = None ) -> "CheckWithLastConstraintFilterable": """ Creates a constraint that asserts on a column uniqueness. Parameters ---------- column: Column to run the assertion on hint: A hint to provide additional context why a constraint could have failed """ return self._add_filterable_constraint( lambda filter_: uniqueness_constraint([column], is_one, filter_, hint) )
[docs] def has_uniqueness( self, columns: Union[Sequence[str], str], assertion: Callable[[float], bool], hint: Optional[str] = None, ): """ Creates a constraint that asserts on uniqueness in a single or combined set of key columns. Parameters ---------- columns: Column or columns to run the assertion on assertion: Callable that receives a double input parameter and returns a boolean. The input is the fraction of unique values in columns. """ if isinstance(columns, str): columns = [columns] return self._add_filterable_constraint( lambda filter_: uniqueness_constraint( columns, assertion, filter_, hint=hint ) )
[docs] def evaluate(self, context: AnalyzerContext) -> CheckResult: """ Evaluate this check on computed metrics Parameters ---------- context: result of the metrics computation """ constraint_results = [c.evaluate(context.metric_map) for c in self.constraints] any_failures: bool = any( (c.status == ConstraintStatus.FAILURE for c in constraint_results) ) check_status = CheckStatus.SUCCESS if any_failures and self.level == CheckLevel.ERROR: check_status = CheckStatus.ERROR elif any_failures and self.level == CheckLevel.WARNING: check_status = CheckStatus.WARNING return CheckResult(self, check_status, constraint_results)
[docs]class CheckWithLastConstraintFilterable(Check): def __init__( self, level: CheckLevel, description: str, constraints: Tuple[Constraint, ...], create_replacement: Callable[[Optional[str]], Constraint], ): super().__init__(level, description, constraints) self.create_replacement = create_replacement
[docs] def where(self, query: Optional[str]) -> Check: """ Defines a filter to apply before evaluating the previous constraint Parameters ----------- filter: A Pandas `query` sring to evaluate. Returns -------- A filtered Check """ adjusted_constraints = self.constraints[:-1] + (self.create_replacement(query),) return Check(self.level, self.description, adjusted_constraints)
@classmethod def apply( cls, level: CheckLevel, description: str, constraints: Tuple[Constraint, ...], create_replacement: Callable[[Optional[str]], Constraint], ) -> "CheckWithLastConstraintFilterable": return CheckWithLastConstraintFilterable( level, description, constraints, create_replacement )