Validation Rules¶
Validation rules are the heart of Invariant's semantic enforcement. They evaluate query plans against catalog metadata and produce issues when problems are detected.
How Validation Works¶
QueryPlan + CatalogSnapshot
↓
[Rule 1] → Issues
[Rule 2] → Issues
[Rule 3] → Issues
↓
Aggregate Issues
↓
Compute Overall Status
↓
ValidationResult
The Validator runs all configured rules and combines their results:
from invariant.validation.domain.services import Validator, IndicatorAggregationRule
validator = Validator(rules=[
IndicatorAggregationRule(),
# Add more rules...
])
result = validator.validate(plan, catalog_snapshot)
Built-in Rules¶
IndicatorAggregationRule¶
Location: invariant.validation.domain.services
Purpose: Blocks naive aggregation of indicators (percentages, rates, means).
Forbidden aggregations: SUM, AVG, MEAN
Logic:
1. For each metric in the query
2. If variable role is INDICATOR
3. And aggregation is in forbidden set
4. Check if an IndicatorDefinition exists
5. If no definition or NOT_AGGREGATABLE → BLOCK
6. If ALLOW_LIST and aggregation not in list → BLOCK
7. If RECOMPUTE → ALLOW (can aggregate via recomputation)
Issue Code: INDICATOR_AGG_NOT_ALLOWED
Example:
# Blocked query
MetricRequest(variable="unemployment_rate", aggregation="SUM")
# Issue produced:
Issue(
code="INDICATOR_AGG_NOT_ALLOWED",
severity=Severity.BLOCK,
message="Cannot SUM indicator 'unemployment_rate' because it is a derived value. "
"Indicators require recomputation, not naive aggregation.",
remediations=[
Remediation(
action="DEFINE_INDICATOR",
label="Define numerator/denominator so the system can recompute safely",
),
Remediation(
action="CHANGE_AGG",
label="Use NONE (display as-is) or a safe aggregation",
),
],
)
Severity Levels¶
Each rule produces issues with a severity level:
| Level | Meaning | Query Status |
|---|---|---|
ALLOW |
No problem | Execute |
WARN |
Caution but allowed | Execute with disclaimers |
REQUIRE_ACK |
Must acknowledge risk | Execute after acknowledgment |
BLOCK |
Cannot proceed | Reject |
The overall query status is the highest severity among all issues.
Writing Custom Rules¶
Rules implement the Rule protocol:
from invariant.validation.domain.services import Rule, CatalogSnapshot
from invariant.query.application.planning.query_plan import QueryPlan
from invariant.validation.domain.value_objects import Issue
class MyCustomRule(Rule):
"""Enforces some domain constraint."""
def evaluate(self, plan: QueryPlan, catalog: CatalogSnapshot) -> list[Issue]:
issues = []
for op in plan.operations:
dp = catalog.data_products.get(op.data_product_id)
if dp is None:
continue
# Your validation logic here
if self._is_violation(op, dp):
issues.append(self._create_issue(op, dp))
return issues
def _is_violation(self, op, dp) -> bool:
# Check for rule violation
...
def _create_issue(self, op, dp) -> Issue:
return Issue(
code="MY_RULE_VIOLATION",
severity=Severity.BLOCK,
message="Description of what went wrong",
details={"variable": ..., "reason": ...},
remediations=[
Remediation(action="FIX_IT", label="How to fix this"),
],
)
Rule Guidelines¶
- Return empty list if no issues — Don't return None
- Check data product exists — It may be missing from snapshot
- Use specific issue codes — Codes should be unique and searchable
- Provide remediations — Tell users how to fix the problem
- Include relevant details — Variable names, requested values, etc.
Deployment Profiles¶
Configure different rule sets for different contexts:
Minimal Profile (Prototyping)¶
Fast, permissive. Good for exploration and development.
Standard Profile (Production)¶
standard_validator = Validator(rules=[
GrainValidationRule(),
IndicatorAggregationRule(),
ComparabilityRule(),
UniverseMatchRule(),
])
Enforces core semantic constraints. Suitable for public dashboards.
Research Profile (Maximum Rigor)¶
research_validator = Validator(rules=[
GrainValidationRule(),
IndicatorAggregationRule(),
ComparabilityRule(),
UniverseMatchRule(),
RequireUniverseRule(), # Universe must be explicit
RequireComparabilityAckRule(), # Must acknowledge partial comparability
RequireMethodologyMatchRule(), # Methods must match exactly
])
Strictest validation. Suitable for academic/research use.
Planned Rules¶
These rules are defined in scope but not yet implemented:
UniverseMatchRule¶
Checks that combined datasets have compatible universes.
| Situation | Severity |
|---|---|
| Same universe | ALLOW |
| Compatible universes (subset) | WARN |
| Incompatible universes | BLOCK |
ComparabilityRule¶
Assesses overall dataset comparability.
| Situation | Severity |
|---|---|
| Full comparability | ALLOW |
| Partial (with crosswalk) | WARN |
| None | BLOCK |
ReferenceSystemVersionRule¶
Checks alignment of reference system versions.
| Situation | Severity |
|---|---|
| Same version | ALLOW |
| Different version, crosswalk exists | WARN |
| Different version, no crosswalk | BLOCK |
GrainValidationRule¶
Validates that aggregation requests are compatible with data grain.
Issue Structure¶
@dataclass
class Issue:
code: str # Unique identifier
severity: Severity # ALLOW, WARN, REQUIRE_ACK, BLOCK
message: str # Human-readable explanation
details: dict[str, Any] # Machine-readable context
remediations: list[Remediation] # Suggested fixes
Issue Codes¶
Issue codes should be: - Unique — One code per rule/situation - Stable — Don't change codes between versions - Searchable — Users can Google them - Descriptive — Hint at the problem
Examples:
- INDICATOR_AGG_NOT_ALLOWED
- UNIVERSE_MISMATCH
- CROSSWALK_REQUIRED
- REFERENCE_VERSION_MISMATCH
Remediations¶
@dataclass
class Remediation:
action: str # Machine-readable action type
label: str # Human-readable description
required_fields: list[str] # Fields needed to apply this remediation
Action types:
- DEFINE_INDICATOR — Add indicator definition
- CHANGE_AGG — Use different aggregation
- ADD_CROSSWALK — Provide crosswalk mapping
- ACKNOWLEDGE — Accept and proceed
- REWRITE_PLAN — Use modified query
Configuring Severity¶
For some rules, severity can be configured:
class ConfigurableRule(Rule):
def __init__(self, severity: Severity = Severity.BLOCK):
self.severity = severity
def evaluate(self, plan, catalog) -> list[Issue]:
if self._is_violation(plan, catalog):
return [Issue(
code="...",
severity=self.severity, # Use configured severity
...
)]
return []
# Use as WARN instead of BLOCK
relaxed_rule = ConfigurableRule(severity=Severity.WARN)
This allows deployments to choose their strictness level.
Testing Rules¶
def test_indicator_aggregation_rule_blocks_sum():
# Setup
rule = IndicatorAggregationRule()
plan = make_plan_with_indicator_sum()
catalog = make_catalog_with_indicator()
# Execute
issues = rule.evaluate(plan, catalog)
# Assert
assert len(issues) == 1
assert issues[0].code == "INDICATOR_AGG_NOT_ALLOWED"
assert issues[0].severity == Severity.BLOCK
def test_indicator_aggregation_rule_allows_recompute():
# Setup
rule = IndicatorAggregationRule()
plan = make_plan_with_indicator_sum()
catalog = make_catalog_with_recomputable_indicator()
# Execute
issues = rule.evaluate(plan, catalog)
# Assert
assert len(issues) == 0 # RECOMPUTE policy allows aggregation
See tests/unit/domain/services/test_validator.py for examples.