# coding: utf-8
from dataclasses import dataclass, field
from enum import Enum, IntEnum
from typing import (
Any,
Callable,
List,
Optional,
Pattern,
Sequence,
Set,
Tuple,
Union,
cast,
)
import hooqu.patterns as patterns
import numpy as np
from hooqu.analyzers import Analyzer
from hooqu.analyzers.runners import AnalyzerContext
from hooqu.constraints import (
AnalysisBasedConstraint,
Constraint,
ConstraintDecorator,
ConstraintResult,
completeness_constraint,
compliance_constraint,
max_constraint,
mean_constraint,
min_constraint,
pattern_match_constraint,
quantile_constraint,
size_constraint,
standard_deviation_constraint,
sum_constraint,
uniqueness_constraint,
)
from hooqu.constraints.constraint import ConstraintStatus
[docs]class CheckLevel(Enum):
WARNING = 0
ERROR = 1
[docs]class CheckStatus(IntEnum):
SUCCESS = 0
WARNING = 1
ERROR = 2
[docs]@dataclass(frozen=True, eq=True)
class CheckResult:
check: Any
status: CheckStatus
constraint_results: Sequence[ConstraintResult] = field(default_factory=tuple)
def is_one(value: Union[float, int]) -> bool:
return value == 1
[docs]@dataclass(frozen=True, eq=True)
class Check:
level: CheckLevel
description: str
constraints: Tuple[Constraint, ...] = field(default_factory=tuple)
[docs] def add_constraint(self, constraint: Constraint) -> "Check":
"""
Returns a new Check object with the given constraint added to the
constraints list.
Parameters
-------------
constraint:
New constraint to be added
"""
return Check(self.level, self.description, self.constraints + (constraint,))
def _add_filterable_constraint(
self, creation_func: Callable[[Optional[str]], Constraint]
) -> "CheckWithLastConstraintFilterable":
"""
Adds a constraint that can subsequently be replaced
with a filtered version.
"""
constraint_without_filtering = creation_func(None)
return CheckWithLastConstraintFilterable(
self.level,
self.description,
self.constraints + (constraint_without_filtering,),
creation_func,
)
def required_analyzers(self) -> Set[Analyzer]:
rc = (
c.inner if isinstance(c, ConstraintDecorator) else c
for c in self.constraints
) # map
anbc: List[AnalysisBasedConstraint] = cast(
List[AnalysisBasedConstraint],
list(filter(lambda c: isinstance(c, AnalysisBasedConstraint), rc)),
) # collect
analyzers = {c.analyzer for c in anbc} # map
return analyzers
[docs] def has_size(
self, assertion: Callable[[int], bool], hint: Optional[str] = None
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that calculates the data frame size and runs the assertion
on it.
Parameters
----------
assertion:
A callable that receives a long input parameter and returns a boolean.
The callable will receive the value of the size (number of rows)
and return a boolean based on whether it satisfies a condition, e.g.
``lambda sz: sz > 5``.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: size_constraint(assertion, filter_, hint)
)
[docs] def has_min(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the minimum of the column
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: min_constraint(column, assertion, filter_, hint)
)
[docs] def has_max(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the maximum of the column
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: max_constraint(column, assertion, filter_, hint)
)
[docs] def is_complete(
self, column: str, hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on a column completion.
Parameters
----------
column:
Column to run the assertion on.
"""
return self._add_filterable_constraint(
lambda filter_: completeness_constraint(column, is_one, filter_, hint)
)
[docs] def has_completeness(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on a column completion
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: completeness_constraint(column, assertion, filter_, hint)
)
[docs] def has_mean(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the mean of the column.
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: mean_constraint(column, assertion, filter_, hint)
)
[docs] def has_standard_deviation(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the standard deviation of the column.
Note that unlike pandas this calculate the population variance
i.e. degree of freedom (ddof=0). NaNs are ignored when performing the
calculation.
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: standard_deviation_constraint(
column, assertion, filter_, hint
)
)
[docs] def has_sum(
self,
column: str,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the sum of the column.
Parameters
----------
column:
Column to run the assertion on.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: sum_constraint(column, assertion, filter_, hint)
)
[docs] def has_quantile(
self,
column: str,
q: float,
assertion: Callable[[float], bool],
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on the quantile of the column.
Note that the quantile calculation is done using the "nearest" interpolation,
meaning that the closest value of the column ``column`` is returned
Parameters
----------
column:
Column to run the assertion on.
q:
The q-th quantile to calculate which must be between 0 and 1 inclusive.
assertion:
A callable that receives a float and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: quantile_constraint(column, q, assertion, filter_, hint)
)
[docs] def satisfies(
self,
column_condition: str,
constraint_name: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that evaluates on the column_condition
and executes the assertion.
This is useful for complex or custom checks that are better described
using a valid expression.
Parameters
-----------
column_condition:
The column expression to be evaluated. If using a Pandas data-frame
this expression is evaluated with ``pandas.eval``.
constraint_name:
A name that summarizes the check being made. This name is being used to name
the metrics for the analysis being done.
assertion:
Callable that receives a float input parameter and returns a boolean.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: compliance_constraint(
constraint_name, column_condition, assertion, filter_, hint
)
)
[docs] def is_non_negative(
self,
column: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts that a column contains no negative values
Parameters
----------
column:
Column to run the assertion on
assertion:
Callable that receives a float input parameter and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
# coalescing column to not count NULL values as non-compliant
return self.satisfies(
f"`{column}`.fillna(0) >= 0",
f"{column} is non-negative",
assertion,
hint=hint,
)
[docs] def is_positive(
self,
column: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts that a column contains positive values.
Parameters
----------
column:
Column to run the assertion on
assertion:
Callable that receives a float input parameter and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
# coalescing column to not count NULL values as non-compliant
return self.satisfies(
f"`{column}`.fillna(1.0) > 0",
f"{column} is positive",
assertion,
hint=hint,
)
[docs] def is_contained_in(
self,
column: str,
allowed_values: Sequence[Union[str, int]],
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Asserts that every non-null value in a column is contained in a set of
predefined values. Note that this only works on a set of string sequences.
Parameters
----------
column:
Column to run the assertion on
allowed_values:
Allowed values for the column
assertion:
Callable that receives a float input parameter and returns a boolean
hint:
A hint to provide additional context why a constraint could have failed
"""
allowed_values = list(allowed_values)
is_numeric_sequence = all(
isinstance(value, (int, np.integer)) for value in allowed_values
)
if not allowed_values:
raise ValueError("Empty list of allowed values used")
if not isinstance(allowed_values[0], str) and not is_numeric_sequence:
raise ValueError(
"The type of allowed values should be string or integer but got"
f" '{type(allowed_values[0])}'"
)
predicate = f"`{column}`.isna() or `{column}`.isin({allowed_values})"
return self.satisfies(
predicate, f"{column} contained in {allowed_values}", assertion, hint
)
[docs] def is_contained_in_range(
self,
column: str,
lower_bound: float,
upper_bound: float,
include_lower_bound: bool = True,
include_upper_bound: bool = True,
hint: Optional[str] = None,
) -> "CheckWithLastConstraintFilterable":
"""
Asserts that the non-null values in a numeric column fall into the
predefined interval
Parameters
----------
column:
Column to run the assertion on
lower_bound:
lower bound of the interval
upper_bound:
upper bound of the interval
include_lower_bound:
is a value equal to the lower bound allowed?
include_upper_bound:
is a value equal to the upper bound allowed?
hint:
A hint to provide additional context why a constraint could have failed
"""
left_operand = ">=" if include_lower_bound else ">"
right_operand = "<=" if include_upper_bound else "<"
predicate = (
f"`{column}`.isna() or "
f"(`{column}` {left_operand} {lower_bound} "
f" and `{column}` {right_operand} {upper_bound})"
)
return self.satisfies(
predicate, f"{column} between {lower_bound} and {upper_bound}", hint=hint
)
[docs] def is_unique(
self, column: str, hint: Optional[str] = None
) -> "CheckWithLastConstraintFilterable":
"""
Creates a constraint that asserts on a column uniqueness.
Parameters
----------
column:
Column to run the assertion on
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: uniqueness_constraint([column], is_one, filter_, hint)
)
[docs] def has_uniqueness(
self,
columns: Union[Sequence[str], str],
assertion: Callable[[float], bool],
hint: Optional[str] = None,
):
"""
Creates a constraint that asserts on uniqueness in a single or combined
set of key columns.
Parameters
----------
columns:
Column or columns to run the assertion on
assertion:
Callable that receives a double input parameter and returns a boolean.
The input is the fraction of unique values in columns.
hint:
A hint to provide additional context why a constraint could have failed
"""
if isinstance(columns, str):
columns = [columns]
return self._add_filterable_constraint(
lambda filter_: uniqueness_constraint(
columns, assertion, filter_, hint=hint
)
)
[docs] def has_pattern(
self,
column: str,
pattern: Union[str, Pattern],
assertion: Callable[[float], bool] = is_one,
name: Optional[str] = None,
hint: Optional[str] = None,
):
"""
Checks for pattern compliance. Given a column name and a regular
expression, defines a Check on the average compliance of the
column's values to the regular expression.
Parameters
----------
column:
Name of the column that should be checked.
pattern:
The columns values will be checked for a match against this pattern.
assertion:
Callable that receives a double input parameter and returns a boolean.
The input is the fraction of unique values in columns.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self._add_filterable_constraint(
lambda filter_: pattern_match_constraint(
column, pattern, assertion, filter_, name=name, hint=hint
)
)
[docs] def contains_credit_card_number(
self,
column: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
):
"""
Check to run against the compliance of a column against a credit card pattern.
Parameters
----------
column:
Name of the column that should be checked.
assertion:
Callable that receives a double input parameter and returns a boolean.
The input is the fraction of unique values in columns.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self.has_pattern(
column,
patterns.CREDITCARD,
assertion=assertion,
name=f"containsCreditCardNumber({column})",
hint=hint,
)
[docs] def contains_email(
self,
column: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
):
"""
Check to run against the compliance of a column against a against an
e-mail pattern.
Parameters
----------
column:
Name of the column that should be checked.
assertion:
Callable that receives a double input parameter and returns a boolean.
The input is the fraction of unique values in columns.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self.has_pattern(
column,
patterns.EMAIL,
assertion=assertion,
name=f"containsEmail({column})",
hint=hint,
)
[docs] def contains_url(
self,
column: str,
assertion: Callable[[float], bool] = is_one,
hint: Optional[str] = None,
):
"""
Check to run against the compliance of a column against a against an
URL pattern.
Parameters
----------
column:
Name of the column that should be checked.
assertion:
Callable that receives a double input parameter and returns a boolean.
The input is the fraction of unique values in columns.
hint:
A hint to provide additional context why a constraint could have failed
"""
return self.has_pattern(
column,
patterns.URL,
assertion=assertion,
name=f"containsURL({column})",
hint=hint,
)
[docs] def evaluate(self, context: AnalyzerContext) -> CheckResult:
"""
Evaluate this check on computed metrics
Parameters
----------
context:
result of the metrics computation
"""
constraint_results = [c.evaluate(context.metric_map) for c in self.constraints]
any_failures: bool = any(
(c.status == ConstraintStatus.FAILURE for c in constraint_results)
)
check_status = CheckStatus.SUCCESS
if any_failures and self.level == CheckLevel.ERROR:
check_status = CheckStatus.ERROR
elif any_failures and self.level == CheckLevel.WARNING:
check_status = CheckStatus.WARNING
return CheckResult(self, check_status, constraint_results)
[docs]class CheckWithLastConstraintFilterable(Check):
def __init__(
self,
level: CheckLevel,
description: str,
constraints: Tuple[Constraint, ...],
create_replacement: Callable[[Optional[str]], Constraint],
):
super().__init__(level, description, constraints)
self.create_replacement = create_replacement
[docs] def where(self, query: Optional[str]) -> Check:
"""
Defines a filter to apply before evaluating the previous constraint
Parameters
-----------
filter:
A Pandas `query` sring to evaluate.
Returns
--------
A filtered Check
"""
adjusted_constraints = self.constraints[:-1] + (self.create_replacement(query),)
return Check(self.level, self.description, adjusted_constraints)
@classmethod
def apply(
cls,
level: CheckLevel,
description: str,
constraints: Tuple[Constraint, ...],
create_replacement: Callable[[Optional[str]], Constraint],
) -> "CheckWithLastConstraintFilterable":
return CheckWithLastConstraintFilterable(
level, description, constraints, create_replacement
)