From 8479efacd42aa350263095218495c5abcad576c9 Mon Sep 17 00:00:00 2001 From: Anil Joshi Date: Fri, 20 Feb 2026 14:33:51 -0500 Subject: [PATCH] Add databricks-dqx skill for data quality extensions Add comprehensive skill for Databricks Labs DQX (Data Quality Extensions) framework covering quality rule definition, profiling, auto-generation, Lakeflow/DLT integration, streaming, and check functions reference. Includes SKILL.md (256 lines) with 7 progressive-disclosure reference files. Co-Authored-By: Claude Opus 4.6 --- README.md | 2 +- databricks-skills/README.md | 1 + .../databricks-dqx/1-installation-setup.md | 179 +++++++++ .../2-defining-quality-rules.md | 341 ++++++++++++++++++ .../databricks-dqx/3-applying-checks.md | 205 +++++++++++ .../4-profiler-auto-generation.md | 239 ++++++++++++ .../databricks-dqx/5-lakeflow-integration.md | 193 ++++++++++ .../databricks-dqx/6-streaming-metrics.md | 144 ++++++++ .../7-check-functions-reference.md | 269 ++++++++++++++ databricks-skills/databricks-dqx/SKILL.md | 256 +++++++++++++ 10 files changed, 1828 insertions(+), 1 deletion(-) create mode 100644 databricks-skills/databricks-dqx/1-installation-setup.md create mode 100644 databricks-skills/databricks-dqx/2-defining-quality-rules.md create mode 100644 databricks-skills/databricks-dqx/3-applying-checks.md create mode 100644 databricks-skills/databricks-dqx/4-profiler-auto-generation.md create mode 100644 databricks-skills/databricks-dqx/5-lakeflow-integration.md create mode 100644 databricks-skills/databricks-dqx/6-streaming-metrics.md create mode 100644 databricks-skills/databricks-dqx/7-check-functions-reference.md create mode 100644 databricks-skills/databricks-dqx/SKILL.md diff --git a/README.md b/README.md index 431aa152..5a0ba312 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,7 @@ Works with LangChain, OpenAI Agents SDK, or any Python framework. See [databrick |-----------|-------------| | [`databricks-tools-core/`](databricks-tools-core/) | Python library with high-level Databricks functions | | [`databricks-mcp-server/`](databricks-mcp-server/) | MCP server exposing 50+ tools for AI assistants | -| [`databricks-skills/`](databricks-skills/) | 19 markdown skills teaching Databricks patterns | +| [`databricks-skills/`](databricks-skills/) | 20 markdown skills teaching Databricks patterns | | [`databricks-builder-app/`](databricks-builder-app/) | Full-stack web app with Claude Code integration | --- diff --git a/databricks-skills/README.md b/databricks-skills/README.md index 3c0e62cf..8569d14f 100644 --- a/databricks-skills/README.md +++ b/databricks-skills/README.md @@ -54,6 +54,7 @@ cp -r ai-dev-kit/databricks-skills/databricks-agent-bricks .claude/skills/ - **databricks-unity-catalog** - System tables for lineage, audit, billing ### 🔧 Data Engineering +- **databricks-dqx** - DQX (Data Quality Extensions) for quality rules, profiling, and quarantine - **databricks-spark-declarative-pipelines** - SDP (formerly DLT) in SQL/Python - **databricks-jobs** - Multi-task workflows, triggers, schedules - **databricks-synthetic-data-generation** - Realistic test data with Faker diff --git a/databricks-skills/databricks-dqx/1-installation-setup.md b/databricks-skills/databricks-dqx/1-installation-setup.md new file mode 100644 index 00000000..d97eb803 --- /dev/null +++ b/databricks-skills/databricks-dqx/1-installation-setup.md @@ -0,0 +1,179 @@ +# Installation and Setup + +## Install as a Library (pip) + +### In a Databricks Notebook + +```python +%pip install databricks-labs-dqx +dbutils.library.restartPython() +``` + +### Pinned Version (recommended for production) + +```python +%pip install databricks-labs-dqx==0.13.0 +dbutils.library.restartPython() +``` + +### Optional Extras + +```bash +# AI-assisted rule generation (includes DSPy) +pip install 'databricks-labs-dqx[llm]' + +# PII detection (includes Microsoft Presidio) +pip install 'databricks-labs-dqx[pii]' + +# Data Contract / ODCS support +pip install 'databricks-labs-dqx[datacontract]' + +# Combined +pip install 'databricks-labs-dqx[datacontract,llm]' +``` + +### In Databricks Asset Bundles (DAB) + +Add to your job's task libraries: + +```yaml +resources: + jobs: + quality_check_job: + tasks: + - task_key: run_checks + libraries: + - pypi: + package: databricks-labs-dqx==0.13.0 +``` + +### Enterprise PyPI Mirror + +```bash +PIP_INDEX_URL="https://your-company-pypi.internal" pip install databricks-labs-dqx +``` + +--- + +## Install as a Workspace Tool (Databricks CLI) + +Prerequisites: Python 3.10+, Databricks CLI v0.241+ + +```bash +# Authenticate +databricks auth login --host + +# Install latest +databricks labs install dqx + +# Install pinned version +databricks labs install dqx@v0.13.0 + +# Force global install (shared across users) +DQX_FORCE_INSTALL=global databricks labs install dqx + +# Force user install (default) +DQX_FORCE_INSTALL=user databricks labs install dqx +``` + +### What Workspace Installation Deploys + +- Python wheel file +- `config.yml` configuration file +- Profiler workflow (unscheduled) +- Quality checker workflow (unscheduled) +- End-to-end (e2e) workflow (unscheduled) +- Quality dashboard (unscheduled) + +### Installation Locations + +| Mode | Path | +|------|------| +| **User (default)** | `/Users//.dqx` | +| **Global** | `/Applications/dqx` | +| **Custom** | Any workspace folder | + +### CLI Lifecycle Commands + +```bash +databricks labs install dqx # Install +databricks labs upgrade dqx # Upgrade to latest +databricks labs uninstall dqx # Uninstall + +# Open configuration +databricks labs dqx open-remote-config +databricks labs dqx open-remote-config --install-folder "/Workspace/my_folder" + +# Open quality dashboard +databricks labs dqx open-dashboards +``` + +--- + +## Basic Setup in Code + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import DQEngine + +# Standard setup (uses default auth from notebook context or env) +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# With Databricks Connect (local development) +from databricks.connect import DatabricksSession +spark = DatabricksSession.builder.getOrCreate() +dq_engine = DQEngine(ws, spark) + +# With custom result column names +from databricks.labs.dqx.config import ExtraParams +extra_params = ExtraParams( + result_column_names={"errors": "dq_errors", "warnings": "dq_warnings"} +) +dq_engine = DQEngine(ws, extra_params=extra_params) +``` + +--- + +## Configuration File (config.yml) + +For workspace installations, DQX uses a `config.yml`: + +```yaml +serverless_clusters: true +profiler_max_parallelism: 4 + +llm_config: + model: + model_name: "databricks/databricks-claude-sonnet-4-5" + api_key: xxx # optional: secret_scope/secret_key + api_base: xxx # optional: secret_scope/secret_key + +extra_params: + result_column_names: + errors: dq_errors + warnings: dq_warnings + user_metadata: + team: data-engineering + +run_configs: + - name: default + checks_location: catalog.schema.checks_table + input_config: + format: delta + location: catalog.schema.input_table + is_streaming: false + output_config: + location: catalog.schema.output_table + mode: append + quarantine_config: + location: catalog.schema.quarantine_table + mode: append + profiler_config: + limit: 1000 + sample_fraction: 0.3 + summary_stats_file: profile_summary_stats.yml + filter: "status = 'active'" + llm_primary_key_detection: false + checks_user_requirements: "business rules description" +``` diff --git a/databricks-skills/databricks-dqx/2-defining-quality-rules.md b/databricks-skills/databricks-dqx/2-defining-quality-rules.md new file mode 100644 index 00000000..82e15085 --- /dev/null +++ b/databricks-skills/databricks-dqx/2-defining-quality-rules.md @@ -0,0 +1,341 @@ +# Defining Quality Rules + +DQX supports two approaches for defining quality rules: **programmatic** (DQRule classes) and **declarative** (YAML/dict metadata). Both produce identical behavior at runtime. + +## Core Concepts + +### Criticality Levels + +| Level | Behavior | +|-------|----------| +| `"error"` (default) | Failed rows go ONLY to quarantine DataFrame | +| `"warn"` | Failed rows appear in BOTH valid and quarantine DataFrames | + +### Rule Types + +| Class | Scope | Use Case | +|-------|-------|----------| +| `DQRowRule` | Per-row | Column-level validation (nulls, ranges, formats) | +| `DQDatasetRule` | Cross-row | Uniqueness, aggregations, foreign keys | +| `DQForEachColRule` | Multi-column | Apply same row-level check to multiple columns | + +--- + +## Method 1: DQRule Classes (Recommended) + +### Basic Rules + +```python +from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule +from databricks.labs.dqx import check_funcs + +checks = [ + # Null check + DQRowRule( + criticality="error", + check_func=check_funcs.is_not_null, + column="order_id", + ), + # Not null and not empty + DQRowRule( + criticality="warn", + check_func=check_funcs.is_not_null_and_not_empty, + column="customer_name", + ), + # Range check with keyword arguments + DQRowRule( + criticality="warn", + check_func=check_funcs.is_in_range, + column="amount", + check_func_kwargs={"min_limit": 0, "max_limit": 100000}, + ), + # List membership with positional arguments + DQRowRule( + criticality="error", + check_func=check_funcs.is_in_list, + column="status", + check_func_args=[["pending", "active", "completed", "cancelled"]], + ), + # Regex validation + DQRowRule( + name="email_format_check", + criticality="error", + check_func=check_funcs.regex_match, + column="email", + check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"}, + ), +] +``` + +### Conditional Rules (with filter) + +```python +DQRowRule( + name="premium_amount_check", + criticality="error", + filter="customer_tier = 'premium'", # Only apply to premium customers + check_func=check_funcs.is_not_less_than, + column="amount", + check_func_kwargs={"limit": 100}, +) +``` + +### Multi-Column Rules (DQForEachColRule) + +```python +# Apply same check to multiple columns at once +checks = [ + *DQForEachColRule( + columns=["col1", "col2", "col3"], + criticality="error", + check_func=check_funcs.is_not_null, + ).get_rules(), +] +``` + +### Dataset-Level Rules + +```python +# Uniqueness (single or composite key) +DQDatasetRule( + criticality="error", + check_func=check_funcs.is_unique, + columns=["order_id"], +) + +# Composite uniqueness +DQDatasetRule( + criticality="error", + check_func=check_funcs.is_unique, + columns=["customer_id", "order_date"], +) + +# Aggregation check +DQDatasetRule( + criticality="error", + check_func=check_funcs.is_aggr_not_greater_than, + column="amount", + check_func_kwargs={"aggr_type": "count", "group_by": ["customer_id"], "limit": 100}, +) + +# Foreign key validation (against a table) +DQDatasetRule( + criticality="error", + check_func=check_funcs.foreign_key, + columns=["customer_id"], + check_func_kwargs={ + "ref_columns": ["id"], + "ref_table": "catalog.schema.customers", + }, +) +``` + +### User Metadata + +Attach custom metadata to any rule for tracking/reporting: + +```python +DQRowRule( + name="completeness_check", + criticality="warn", + check_func=check_funcs.is_not_null_and_not_empty, + column="address", + user_metadata={ + "check_category": "completeness", + "data_steward": "team-data@company.com", + "sla": "99.5%", + }, +) +``` + +### Struct Fields and Map Elements + +```python +from pyspark.sql import functions as F + +# Struct field access +DQRowRule(check_func=check_funcs.is_not_null, column="address.zip_code") + +# Map element access +DQRowRule( + criticality="error", + check_func=check_funcs.is_not_null, + column=F.try_element_at("metadata", F.lit("source_system")), +) +``` + +--- + +## Method 2: YAML / Dictionary Metadata (Declarative) + +Same checks defined declaratively: + +```python +import yaml + +checks = yaml.safe_load(""" +- criticality: error + check: + function: is_not_null + arguments: + column: order_id + +- criticality: warn + check: + function: is_not_null_and_not_empty + arguments: + column: customer_name + +- criticality: warn + check: + function: is_in_range + arguments: + column: amount + min_limit: 0 + max_limit: 100000 + +- criticality: error + check: + function: is_in_list + arguments: + column: status + allowed: + - pending + - active + - completed + - cancelled + +- name: email_format_check + criticality: error + check: + function: regex_match + arguments: + column: email + regex: "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$" + +- criticality: error + check: + function: is_unique + arguments: + columns: + - order_id + +- criticality: error + check: + function: is_not_null + for_each_column: + - col1 + - col2 + - col3 + +- criticality: warn + filter: "customer_tier = 'premium'" + check: + function: is_not_less_than + arguments: + column: amount + limit: 100 + user_metadata: + check_category: business_rule + data_steward: team@company.com +""") +``` + +### Dictionary Structure + +| Field | Required | Description | +|-------|----------|-------------| +| `criticality` | No | `"error"` (default) or `"warn"` | +| `check.function` | Yes | Check function name (from `check_funcs`) | +| `check.arguments` | Yes | Arguments dict for the check function | +| `check.for_each_column` | No | Apply same check to listed columns | +| `name` | No | Auto-generated if not provided | +| `filter` | No | Spark SQL expression to filter rows | +| `user_metadata` | No | Key-value pairs added to results | + +--- + +## Storing and Loading Checks + +### Storage Backends + +```python +from databricks.labs.dqx.config import ( + FileChecksStorageConfig, # Local or workspace file + WorkspaceFileChecksStorageConfig, # Workspace file (absolute path) + TableChecksStorageConfig, # Delta table + VolumeFileChecksStorageConfig, # UC Volume + LakebaseChecksStorageConfig, # Lakebase table + InstallationChecksStorageConfig, # Installation-managed +) +``` + +### Saving Checks + +```python +from databricks.labs.dqx.config import FileChecksStorageConfig, TableChecksStorageConfig, VolumeFileChecksStorageConfig + +# To local YAML file +dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="checks.yml")) + +# To workspace file +from databricks.labs.dqx.config import WorkspaceFileChecksStorageConfig +dq_engine.save_checks(checks, config=WorkspaceFileChecksStorageConfig(location="/Shared/App1/checks.yml")) + +# To Delta table (with run config name for multi-table support) +dq_engine.save_checks(checks, config=TableChecksStorageConfig( + location="catalog.schema.checks_table", + run_config_name="catalog.schema.orders", + mode="overwrite", +)) + +# To UC Volume +dq_engine.save_checks(checks, config=VolumeFileChecksStorageConfig( + location="/Volumes/catalog/schema/volume_name/checks.yml" +)) +``` + +### Loading Checks + +```python +# From local file +checks = dq_engine.load_checks(config=FileChecksStorageConfig(location="checks.yml")) + +# From workspace file +checks = dq_engine.load_checks(config=WorkspaceFileChecksStorageConfig(location="/Shared/App1/checks.yml")) + +# From Delta table +checks = dq_engine.load_checks(config=TableChecksStorageConfig( + location="catalog.schema.checks_table", + run_config_name="catalog.schema.orders", +)) + +# From UC Volume +checks = dq_engine.load_checks(config=VolumeFileChecksStorageConfig( + location="/Volumes/catalog/schema/volume_name/checks.yml" +)) + +# From installation +from databricks.labs.dqx.config import InstallationChecksStorageConfig +checks = dq_engine.load_checks(config=InstallationChecksStorageConfig( + run_config_name="catalog.schema.orders" +)) +``` + +### Delta Table Schema for Checks + +When storing checks in a Delta table: + +``` +name STRING, criticality STRING, check STRUCT, arguments MAP>, filter STRING, run_config_name STRING, user_metadata MAP +``` + +--- + +## CLI Commands for Rule Management + +```bash +# Validate checks configuration +databricks labs dqx validate-checks +databricks labs dqx validate-checks --run-config default +``` diff --git a/databricks-skills/databricks-dqx/3-applying-checks.md b/databricks-skills/databricks-dqx/3-applying-checks.md new file mode 100644 index 00000000..7f255387 --- /dev/null +++ b/databricks-skills/databricks-dqx/3-applying-checks.md @@ -0,0 +1,205 @@ +# Applying Quality Checks + +## DQEngine Setup + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import DQEngine + +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# With Databricks Connect +from databricks.connect import DatabricksSession +spark = DatabricksSession.builder.getOrCreate() +dq_engine = DQEngine(ws, spark) + +# With custom result column names +from databricks.labs.dqx.config import ExtraParams +extra_params = ExtraParams( + result_column_names={"errors": "dq_errors", "warnings": "dq_warnings"} +) +dq_engine = DQEngine(ws, extra_params=extra_params) +``` + +--- + +## Applying Checks (DQRule Objects) + +### Option 1: Single DataFrame with Result Columns + +Returns the original DataFrame with `_error` and `_warning` columns appended: + +```python +checked_df = dq_engine.apply_checks(input_df, checks) +# checked_df has all original columns + _error + _warning +``` + +### Option 2: Split into Valid + Quarantine + +```python +valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks) +# valid_df: rows with no errors (may have warnings) +# quarantine_df: rows with errors +``` + +### Option 3: End-to-End (Read, Check, Write) + +```python +from databricks.labs.dqx.config import InputConfig, OutputConfig + +dq_engine.apply_checks_and_save_in_table( + checks=checks, + input_config=InputConfig(location="catalog.schema.input_table"), + output_config=OutputConfig(location="catalog.schema.valid_table", mode="overwrite"), + quarantine_config=OutputConfig(location="catalog.schema.quarantine_table", mode="overwrite"), +) +``` + +--- + +## Applying Checks (Metadata / Dict) + +Same three options, using `_by_metadata` variants: + +```python +# Option 1: Single DataFrame +checked_df = dq_engine.apply_checks_by_metadata(input_df, checks_metadata) + +# Option 2: Split +valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks_metadata) + +# Option 3: End-to-end +dq_engine.apply_checks_by_metadata_and_save_in_table( + checks=checks_metadata, + input_config=InputConfig(location="catalog.schema.input_table"), + output_config=OutputConfig(location="catalog.schema.valid_table", mode="overwrite"), + quarantine_config=OutputConfig(location="catalog.schema.quarantine_table", mode="overwrite"), +) +``` + +--- + +## Filtering Valid / Invalid Rows + +After `apply_checks()` returns a single DataFrame: + +```python +checked_df = dq_engine.apply_checks(input_df, checks) + +valid_df = dq_engine.get_valid(checked_df) # No errors +invalid_df = dq_engine.get_invalid(checked_df) # Has errors +``` + +--- + +## Saving Results + +```python +dq_engine.save_results_in_table( + output_df=valid_df, + quarantine_df=quarantine_df, + output_config=OutputConfig( + location="catalog.schema.valid_table", + mode="append", + options={"mergeSchema": "true"}, + ), + quarantine_config=OutputConfig( + location="catalog.schema.quarantine_table", + mode="append", + ), +) +``` + +--- + +## Multi-Table Processing + +### Explicit RunConfig List + +```python +from databricks.labs.dqx.config import RunConfig, InputConfig, OutputConfig + +dq_engine.apply_checks_and_save_in_tables(run_configs=[ + RunConfig( + name="orders", + input_config=InputConfig(location="catalog.schema.orders"), + output_config=OutputConfig(location="catalog.schema.orders_valid", mode="overwrite"), + quarantine_config=OutputConfig(location="catalog.schema.orders_quarantine", mode="overwrite"), + checks_location="catalog.schema.checks_table", + ), + RunConfig( + name="customers", + input_config=InputConfig(location="catalog.schema.customers"), + output_config=OutputConfig(location="catalog.schema.customers_valid", mode="overwrite"), + quarantine_config=OutputConfig(location="catalog.schema.customers_quarantine", mode="overwrite"), + checks_location="catalog.schema.checks_table", + ), +]) +``` + +### Wildcard Patterns + +Process all tables matching a pattern: + +```python +dq_engine.apply_checks_and_save_in_tables_for_patterns( + patterns=["catalog.schema.*"], + checks_location="catalog.schema.checks_table", + exclude_patterns=["*_dq_output", "*_dq_quarantine", "*_checks"], + output_table_suffix="_dq_output", + quarantine_table_suffix="_dq_quarantine", +) +``` + +--- + +## InputConfig and OutputConfig Reference + +### InputConfig + +```python +InputConfig( + location="catalog.schema.table", # Table or file path + format="delta", # delta, parquet, csv, json + is_streaming=False, # True for streaming reads + schema=None, # Optional schema override + options={}, # Reader options +) +``` + +### OutputConfig + +```python +OutputConfig( + location="catalog.schema.output", # Table or file path + format="delta", # delta, parquet, csv, json + mode="overwrite", # overwrite, append + options={}, # Writer options (e.g., mergeSchema) + trigger=None, # Streaming trigger (e.g., {"availableNow": True}) +) +``` + +--- + +## CLI Commands + +```bash +# Apply checks using workspace config +databricks labs dqx apply-checks --timeout-minutes 20 + +# Apply checks for specific run config +databricks labs dqx apply-checks --run-config default --timeout-minutes 20 + +# Apply checks with pattern matching +databricks labs dqx apply-checks \ + --run-config "default" \ + --patterns "catalog.schema1.*;catalog.schema2.*" \ + --exclude-patterns "*_dq_output;*_dq_quarantine" \ + --output-table-suffix "_dq_output" \ + --quarantine-table-suffix "_dq_quarantine" + +# End-to-end: profile + generate + apply +databricks labs dqx e2e --timeout-minutes 20 +databricks labs dqx e2e --run-config default --timeout-minutes 20 +``` diff --git a/databricks-skills/databricks-dqx/4-profiler-auto-generation.md b/databricks-skills/databricks-dqx/4-profiler-auto-generation.md new file mode 100644 index 00000000..c3c179a8 --- /dev/null +++ b/databricks-skills/databricks-dqx/4-profiler-auto-generation.md @@ -0,0 +1,239 @@ +# Profiler and Auto-Generation + +The DQX profiler analyzes datasets to generate quality rule candidates automatically. This is the recommended starting point for new datasets. + +## Profiling a DataFrame + +```python +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.sdk import WorkspaceClient + +ws = WorkspaceClient() +profiler = DQProfiler(ws) + +# Profile a DataFrame +input_df = spark.table("catalog.schema.my_table") +summary_stats, profiles = profiler.profile(input_df) +``` + +## Profiling a Table Directly + +```python +from databricks.labs.dqx.config import InputConfig + +# Profile specific columns +summary_stats, profiles = profiler.profile_table( + input_config=InputConfig(location="catalog.schema.my_table"), + columns=["col1", "col2", "col3"], +) +``` + +## Profiling Multiple Tables (Wildcard Patterns) + +```python +results = profiler.profile_tables_for_patterns( + patterns=["catalog.schema.*"], + exclude_patterns=["*_dq_output", "*_dq_quarantine"], +) + +generator = DQGenerator(ws) +for table_name, (summary_stats, profiles) in results.items(): + checks = generator.generate_dq_rules(profiles) + dq_engine.save_checks(checks, config=TableChecksStorageConfig( + location="catalog.schema.checks", + run_config_name=table_name, + mode="overwrite", + )) +``` + +--- + +## Profiler Options + +```python +custom_options = { + "sample_fraction": 0.3, # Sample 30% of data (default) + "sample_seed": None, # Random seed for reproducibility + "limit": 1000, # Max records to analyze (default) + "filter": None, # SQL filter expression (e.g., "status = 'active'") + "round": True, # Round min/max values + "max_in_count": 10, # Max items for is_in_list rule + "distinct_ratio": 0.05, # Generate is_in if distinct ratio < 5% + "max_null_ratio": 0.01, # Generate is_not_null if null ratio < 1% + "remove_outliers": True, # Enable outlier detection + "outlier_columns": [], # Specific columns (empty = all numeric) + "num_sigmas": 3, # Standard deviations for outlier detection + "trim_strings": True, # Trim whitespace + "max_empty_ratio": 0.01, # Generate is_not_null_or_empty if empty ratio < 1% + "llm_primary_key_detection": True, # AI-based primary key detection (requires [llm] extra) +} + +summary_stats, profiles = profiler.profile(input_df, options=custom_options) +``` + +--- + +## Generating Rules from Profiles + +```python +generator = DQGenerator(ws) + +# Default criticality: "error" +checks = generator.generate_dq_rules(profiles) + +# Custom default criticality +checks = generator.generate_dq_rules(profiles, default_criticality="warn") +``` + +### Profile Types Generated + +| Profile Type | Check Function | Applies To | +|-------------|----------------|------------| +| `is_not_null` | `is_not_null` | All types | +| `is_not_null_or_empty` | `is_not_null_and_not_empty` | StringType | +| `is_in` | `is_in_list` | String, Integer, Long | +| `min_max` | `is_in_range` | Numeric, Date, Timestamp | +| `is_unique` | `is_unique` | All (requires `[llm]` extra) | + +--- + +## AI-Assisted Rule Generation + +Requires the `[llm]` extra: + +```bash +pip install 'databricks-labs-dqx[llm]' +``` + +### Generate Rules from Natural Language + +```python +generator = DQGenerator(ws) +checks = generator.generate_dq_rules_ai_assisted( + user_input="Ensure all customer emails are valid, ages are between 18 and 120, and order_id is unique", + input_config=InputConfig(location="catalog.schema.customers"), +) +``` + +### AI-Assisted Primary Key Detection + +```python +profiler = DQProfiler(ws) +primary_keys = profiler.detect_primary_keys_with_llm( + input_config=InputConfig(location="catalog.schema.my_table"), + max_retries=3, +) +``` + +--- + +## Data Contract Rule Generation + +Requires the `[datacontract]` extra: + +```bash +pip install 'databricks-labs-dqx[datacontract]' +# Or with AI for text-based expectations +pip install 'databricks-labs-dqx[datacontract,llm]' +``` + +```python +generator = DQGenerator(ws) +checks = generator.generate_rules_from_contract( + contract_file="path/to/contract.yaml", + contract_format="odcs", + generate_predefined_rules=True, + process_text_rules=True, # Requires [llm] extra + default_criticality="error", +) +``` + +--- + +## DLT Expectation Generation + +Generate native DLT expectations from profiler results: + +```python +from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator + +dlt_generator = DQDltGenerator(ws) + +# SQL format +sql_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL") +# Output: CONSTRAINT user_id_is_null EXPECT (user_id is not null) + +# SQL with violation action +sql_with_drop = dlt_generator.generate_dlt_rules(profiles, language="SQL", action="drop") +# Output: CONSTRAINT user_id_is_null EXPECT (user_id is not null) ON VIOLATION DROP ROW + +# Python decorator format +python_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python") +# Output: @dlt.expect_all({"user_id_is_null": "user_id is not null", ...}) + +# Python dict format +dict_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict") +# Output: {"user_id_is_null": "user_id is not null", ...} +``` + +--- + +## CLI Commands for Profiling + +```bash +# Profile using workspace config +databricks labs dqx profile --timeout-minutes 20 + +# Profile specific run config +databricks labs dqx profile --run-config default --timeout-minutes 20 + +# Profile with patterns +databricks labs dqx profile \ + --run-config "default" \ + --patterns "catalog.schema1.*;catalog.schema2.*" \ + --exclude-patterns "*_output;*_quarantine" + +# End-to-end: profile + generate + apply +databricks labs dqx e2e --timeout-minutes 20 +``` + +--- + +## Complete Workflow Example + +```python +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.labs.dqx.config import ( + InputConfig, OutputConfig, FileChecksStorageConfig +) + +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# 1. Profile the dataset +profiler = DQProfiler(ws) +input_df = spark.table("catalog.schema.orders") +summary_stats, profiles = profiler.profile(input_df) + +# 2. Generate rule candidates +generator = DQGenerator(ws) +checks = generator.generate_dq_rules(profiles) + +# 3. Review: print generated checks +for check in checks: + print(check) + +# 4. Save checks for future use +dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="order_checks.yml")) + +# 5. Apply checks +valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks) + +# 6. Inspect results +print(f"Valid: {valid_df.count()}, Quarantine: {quarantine_df.count()}") +quarantine_df.select("order_id", "_error").show(truncate=False) +``` diff --git a/databricks-skills/databricks-dqx/5-lakeflow-integration.md b/databricks-skills/databricks-dqx/5-lakeflow-integration.md new file mode 100644 index 00000000..43f33ee6 --- /dev/null +++ b/databricks-skills/databricks-dqx/5-lakeflow-integration.md @@ -0,0 +1,193 @@ +# Lakeflow / DLT Integration + +DQX integrates with Lakeflow Declarative Pipelines (DLT) to apply quality checks within pipeline stages. + +**Important**: Install `databricks-labs-dqx` as a pipeline library before using DQX in DLT. + +--- + +## Pattern 1: Apply Checks with Quarantine + +Split data into valid (silver) and quarantine tables: + +```python +import dlt +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.rule import DQRowRule +from databricks.labs.dqx import check_funcs +from databricks.sdk import WorkspaceClient + +dq_engine = DQEngine(WorkspaceClient()) + +checks = [ + DQRowRule(criticality="error", check_func=check_funcs.is_not_null, column="order_id"), + DQRowRule(criticality="warn", check_func=check_funcs.is_in_range, column="amount", + check_func_kwargs={"min_limit": 0, "max_limit": 100000}), +] + +@dlt.view +def bronze_dq_check(): + """Apply quality checks to bronze data.""" + df = dlt.read_stream("bronze_orders") + return dq_engine.apply_checks(df, checks) + +@dlt.table +def silver_orders(): + """Valid rows only.""" + return dq_engine.get_valid(dlt.read_stream("bronze_dq_check")) + +@dlt.table +def quarantine_orders(): + """Invalid rows for investigation.""" + return dq_engine.get_invalid(dlt.read_stream("bronze_dq_check")) +``` + +### With Metadata-Based Checks + +```python +import yaml + +checks_metadata = yaml.safe_load(""" +- criticality: error + check: + function: is_not_null + arguments: + column: order_id +- criticality: warn + check: + function: is_in_range + arguments: + column: amount + min_limit: 0 + max_limit: 100000 +""") + +@dlt.view +def bronze_dq_check(): + df = dlt.read_stream("bronze_orders") + return dq_engine.apply_checks_by_metadata(df, checks_metadata) + +@dlt.table +def silver_orders(): + return dq_engine.get_valid(dlt.read_stream("bronze_dq_check")) + +@dlt.table +def quarantine_orders(): + return dq_engine.get_invalid(dlt.read_stream("bronze_dq_check")) +``` + +--- + +## Pattern 2: Report Only (No Quarantine) + +Keep all rows but add quality annotations: + +```python +@dlt.view +def bronze_dq_check(): + df = dlt.read_stream("bronze_orders") + return dq_engine.apply_checks(df, checks) + +@dlt.table +def silver_orders(): + """All rows pass through with _error/_warning columns for monitoring.""" + return dlt.read_stream("bronze_dq_check") +``` + +--- + +## Pattern 3: Load Checks from External Storage + +Load checks from a Delta table or UC Volume instead of hardcoding: + +```python +from databricks.labs.dqx.config import TableChecksStorageConfig + +@dlt.view +def bronze_dq_check(): + # Load checks from Delta table at pipeline start + checks = dq_engine.load_checks(config=TableChecksStorageConfig( + location="catalog.schema.quality_checks", + run_config_name="bronze_orders", + )) + df = dlt.read_stream("bronze_orders") + return dq_engine.apply_checks_by_metadata(df, checks) +``` + +--- + +## Pattern 4: Generate DLT Native Expectations from Profiler + +Use the profiler to generate native DLT `EXPECT` constraints: + +```python +from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator + +dlt_generator = DQDltGenerator(ws) + +# SQL format (for SQL DLT pipelines) +sql_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL") +# CONSTRAINT user_id_is_null EXPECT (user_id is not null) + +# SQL with action +sql_with_action = dlt_generator.generate_dlt_rules(profiles, language="SQL", action="drop") +# CONSTRAINT user_id_is_null EXPECT (user_id is not null) ON VIOLATION DROP ROW + +# Python dict format (for Python DLT pipelines) +expectations_dict = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict") +# {"user_id_is_null": "user_id is not null", ...} + +# Use in DLT +@dlt.table +@dlt.expect_all(expectations_dict) +def silver_orders(): + return dlt.read_stream("bronze_orders") +``` + +--- + +## DQX vs Native DLT Expectations + +| Feature | DQX in DLT | Native DLT Expectations | +|---------|-----------|------------------------| +| **Quarantine routing** | Yes (split valid/invalid) | Drop or fail only | +| **Row-level error details** | Yes (_error/_warning columns) | No | +| **Dataset-level checks** | Yes (uniqueness, aggregation) | No | +| **External rule storage** | Yes (tables, volumes, files) | Inline only | +| **Profiler auto-generation** | Yes | No | +| **User metadata** | Yes | No | +| **Criticality levels** | Yes (error/warn) | Yes (expect/expect_all/expect_or_drop/expect_or_fail) | + +**Recommendation**: Use DQX when you need quarantine routing, detailed error information, or centralized rule management. Use native DLT expectations for simple pass/fail constraints. + +--- + +## Pipeline Library Configuration + +### In Pipeline Settings (UI or API) + +Add `databricks-labs-dqx` as a PyPI library in your pipeline configuration. + +### In Asset Bundles (databricks.yml) + +```yaml +resources: + pipelines: + my_pipeline: + libraries: + - notebook: + path: ./src/transformations/ + - pypi: + package: databricks-labs-dqx==0.13.0 +``` + +### Via MCP Tool + +When using `create_or_update_pipeline`, include in the libraries list: + +```python +libraries = [ + {"notebook": {"path": "/path/to/transformations/"}}, + {"pypi": {"package": "databricks-labs-dqx==0.13.0"}}, +] +``` diff --git a/databricks-skills/databricks-dqx/6-streaming-metrics.md b/databricks-skills/databricks-dqx/6-streaming-metrics.md new file mode 100644 index 00000000..cc3e08e8 --- /dev/null +++ b/databricks-skills/databricks-dqx/6-streaming-metrics.md @@ -0,0 +1,144 @@ +# Streaming Support and Quality Metrics + +## Streaming DataFrames + +All DQX `apply_checks` methods work with both batch and streaming DataFrames. No code changes needed for the check logic itself. + +### End-to-End Streaming + +```python +from databricks.labs.dqx.config import InputConfig, OutputConfig + +dq_engine.apply_checks_and_save_in_table( + checks=checks, + input_config=InputConfig( + location="catalog.schema.input_stream", + is_streaming=True, + ), + output_config=OutputConfig( + location="catalog.schema.valid_stream", + mode="append", + trigger={"availableNow": True}, + options={ + "checkpointLocation": "/Volumes/catalog/schema/vol/checkpoint_output", + "mergeSchema": "true", + }, + ), + quarantine_config=OutputConfig( + location="catalog.schema.quarantine_stream", + mode="append", + trigger={"availableNow": True}, + options={ + "checkpointLocation": "/Volumes/catalog/schema/vol/checkpoint_quarantine", + }, + ), +) +``` + +### foreachBatch Pattern + +For fine-grained control over micro-batch processing: + +```python +from unittest.mock import MagicMock +from databricks.labs.dqx.engine import DQEngine +from databricks.sdk import WorkspaceClient + +def validate_and_write_micro_batch(batch_df, batch_id): + mock_ws = MagicMock(spec=WorkspaceClient) + dq_engine = DQEngine(mock_ws) + + valid_df, quarantine_df = dq_engine.apply_checks_and_split(batch_df, checks) + + valid_df.write.format("delta").mode("append").saveAsTable("catalog.schema.valid") + quarantine_df.write.format("delta").mode("append").saveAsTable("catalog.schema.quarantine") + +input_stream = spark.readStream.table("catalog.schema.source") +input_stream.writeStream.foreachBatch(validate_and_write_micro_batch).start() +``` + +--- + +## Quality Metrics + +DQX captures quality metrics lazily (triggered after count/display/write actions). + +### Batch Metrics via Observation + +```python +from databricks.labs.dqx.metrics_observer import DQMetricsObserver +from databricks.labs.dqx.config import OutputConfig + +observer = DQMetricsObserver() +dq_engine = DQEngine(ws, observer=observer) + +# Apply checks (metrics collected lazily) +checked_df = dq_engine.apply_checks(input_df, checks) + +# Trigger an action (count, write, display) to collect metrics +checked_df.write.mode("overwrite").saveAsTable("catalog.schema.output") + +# Save metrics to a Delta table +dq_engine.save_summary_metrics( + observed_metrics=observer.get, + metrics_config=OutputConfig(location="catalog.schema.quality_metrics"), +) +``` + +### Streaming Metrics via Listener + +```python +listener = dq_engine.get_streaming_metrics_listener( + metrics_config=OutputConfig(location="catalog.schema.quality_metrics") +) +spark.streams.addListener(listener) +``` + +### Default Metrics Captured + +| Metric | Description | +|--------|-------------| +| `input_count` | Total rows in input | +| `error_count` | Rows with errors | +| `warning_count` | Rows with warnings | +| `valid_count` | Rows passing all error checks | + +### Custom Quality Metrics + +You can extend the default metrics with custom aggregations by adding user metadata to your checks and querying the metrics table. + +--- + +## Monitoring Dashboard + +DQX workspace installation includes a quality dashboard: + +```bash +# Open the dashboard +databricks labs dqx open-dashboards +``` + +You can also build custom AI/BI dashboards on the metrics Delta table using the `databricks-aibi-dashboards` skill. + +--- + +## Workflow Orchestration + +### CLI Workflows + +```bash +# List available workflows +databricks labs dqx workflows + +# View workflow logs +databricks labs dqx logs --workflow quality-checker +``` + +### Scheduling with Databricks Jobs + +Use the `databricks-jobs` skill to schedule periodic quality checks: + +```python +# Example: Schedule DQX quality checks as a job task +# See databricks-jobs skill for full details +``` diff --git a/databricks-skills/databricks-dqx/7-check-functions-reference.md b/databricks-skills/databricks-dqx/7-check-functions-reference.md new file mode 100644 index 00000000..86514b05 --- /dev/null +++ b/databricks-skills/databricks-dqx/7-check-functions-reference.md @@ -0,0 +1,269 @@ +# Check Functions Reference + +All functions are in `databricks.labs.dqx.check_funcs`. + +```python +from databricks.labs.dqx import check_funcs +``` + +--- + +## Row-Level Check Functions + +Used with `DQRowRule`. Each operates on a single row and returns a PySpark Column. + +### Null and Empty Checks + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_not_null(column)` | Column is not null | `column` | +| `is_null(column)` | Column is null | `column` | +| `is_not_empty(column, trim_strings=False)` | Not empty string (may be null) | `column`, `trim_strings` | +| `is_empty(column, trim_strings=False)` | Is empty string (may be null) | `column`, `trim_strings` | +| `is_not_null_and_not_empty(column, trim_strings=False)` | Not null AND not empty | `column`, `trim_strings` | +| `is_null_or_empty(column, trim_strings=False)` | Null OR empty | `column`, `trim_strings` | +| `is_not_null_and_not_empty_array(column)` | Array not null and not empty | `column` | + +### List Membership + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_in_list(column, allowed, case_sensitive=True)` | Value in allowed list (null OK) | `column`, `allowed`, `case_sensitive` | +| `is_not_null_and_is_in_list(column, allowed, case_sensitive=True)` | Not null AND in allowed list | `column`, `allowed`, `case_sensitive` | +| `is_not_in_list(column, forbidden, case_sensitive=True)` | Value not in forbidden list | `column`, `forbidden`, `case_sensitive` | + +**Example:** +```python +DQRowRule( + criticality="error", + check_func=check_funcs.is_in_list, + column="status", + check_func_args=[["active", "inactive", "pending"]], +) +``` + +### Comparison Checks + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_equal_to(column, value, abs_tolerance, rel_tolerance)` | Equal to value | `column`, `value`, tolerances | +| `is_not_equal_to(column, value, abs_tolerance, rel_tolerance)` | Not equal to value | `column`, `value`, tolerances | +| `is_not_less_than(column, limit)` | Value >= limit | `column`, `limit` | +| `is_not_greater_than(column, limit)` | Value <= limit | `column`, `limit` | +| `is_in_range(column, min_limit, max_limit)` | min <= value <= max | `column`, `min_limit`, `max_limit` | +| `is_not_in_range(column, min_limit, max_limit)` | value < min OR value > max | `column`, `min_limit`, `max_limit` | + +**Example:** +```python +DQRowRule( + criticality="warn", + check_func=check_funcs.is_in_range, + column="price", + check_func_kwargs={"min_limit": 0.01, "max_limit": 99999.99}, +) +``` + +### Pattern Matching + +| Function | Description | Key Args | +|----------|-------------|----------| +| `regex_match(column, regex, negate=False)` | Matches regex pattern | `column`, `regex`, `negate` | + +**Example:** +```python +DQRowRule( + name="valid_phone", + criticality="error", + check_func=check_funcs.regex_match, + column="phone", + check_func_kwargs={"regex": r"^\+?[1-9]\d{1,14}$"}, +) +``` + +### Date and Time Checks + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_valid_date(column, date_format)` | Valid date format | `column`, `date_format` | +| `is_valid_timestamp(column, timestamp_format)` | Valid timestamp format | `column`, `timestamp_format` | +| `is_data_fresh(column, max_age_minutes, base_timestamp)` | Data not stale | `column`, `max_age_minutes` | +| `is_older_than_n_days(column, days, curr_date, negate)` | At least N days old | `column`, `days` | +| `is_older_than_col2_for_n_days(column1, column2, days, negate)` | col1 >= col2 + N days | `column1`, `column2`, `days` | +| `is_not_in_future(column, offset, curr_timestamp)` | Not in the future | `column`, `offset` | +| `is_not_in_near_future(column, offset, curr_timestamp)` | Not in the near future | `column`, `offset` | + +**Example:** +```python +DQRowRule( + criticality="error", + check_func=check_funcs.is_not_in_future, + column="order_date", +) +``` + +### Network Checks + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_valid_ipv4_address(column)` | Valid IPv4 address | `column` | +| `is_ipv4_address_in_cidr(column, cidr_block)` | IPv4 in CIDR range | `column`, `cidr_block` | +| `is_valid_ipv6_address(column)` | Valid IPv6 address | `column` | +| `is_ipv6_address_in_cidr(column, cidr_block)` | IPv6 in CIDR range | `column`, `cidr_block` | + +### Custom SQL Expression + +| Function | Description | Key Args | +|----------|-------------|----------| +| `sql_expression(expression, msg, name, negate, columns)` | Custom SQL expression | `expression` | + +**Example:** +```python +DQRowRule( + criticality="error", + check_func=check_funcs.sql_expression, + check_func_kwargs={ + "expression": "order_total = quantity * unit_price", + "msg": "Order total does not match quantity * unit_price", + }, +) +``` + +--- + +## Dataset-Level Check Functions + +Used with `DQDatasetRule`. These operate across groups of rows. + +### Uniqueness + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_unique(columns, nulls_distinct=True)` | Values are unique | `columns`, `nulls_distinct` | + +**Example:** +```python +# Single column +DQDatasetRule(criticality="error", check_func=check_funcs.is_unique, columns=["order_id"]) + +# Composite key +DQDatasetRule(criticality="error", check_func=check_funcs.is_unique, columns=["customer_id", "order_date"]) +``` + +### Foreign Key Validation + +| Function | Description | Key Args | +|----------|-------------|----------| +| `foreign_key(columns, ref_columns, ref_df_name, ref_table, negate)` | FK constraint check | `columns`, `ref_columns`, `ref_df_name` or `ref_table` | + +**Example:** +```python +DQDatasetRule( + criticality="error", + check_func=check_funcs.foreign_key, + columns=["customer_id"], + check_func_kwargs={ + "ref_columns": ["id"], + "ref_table": "catalog.schema.customers", + }, +) +``` + +### Outlier Detection + +| Function | Description | Key Args | +|----------|-------------|----------| +| `has_no_outliers(column)` | MAD-based outlier detection | `column` | + +### Aggregation Checks + +| Function | Description | Key Args | +|----------|-------------|----------| +| `is_aggr_not_greater_than(column, limit, aggr_type, group_by, aggr_params)` | Aggregation <= limit | `column`, `limit`, `aggr_type` | +| `is_aggr_not_less_than(column, limit, aggr_type, group_by, aggr_params)` | Aggregation >= limit | `column`, `limit`, `aggr_type` | +| `is_aggr_equal(column, limit, aggr_type, group_by)` | Aggregation == limit | `column`, `limit`, `aggr_type` | +| `is_aggr_not_equal(column, limit, aggr_type, group_by)` | Aggregation != limit | `column`, `limit`, `aggr_type` | + +**Supported aggregation types**: `count`, `sum`, `avg`, `min`, `max`, `count_distinct`, `stddev`, `percentile`, and any Databricks built-in aggregate function. + +**Example:** +```python +# Max 100 orders per customer +DQDatasetRule( + criticality="warn", + check_func=check_funcs.is_aggr_not_greater_than, + column="order_id", + check_func_kwargs={ + "aggr_type": "count", + "group_by": ["customer_id"], + "limit": 100, + }, +) + +# Average order value at least $10 +DQDatasetRule( + criticality="warn", + check_func=check_funcs.is_aggr_not_less_than, + column="amount", + check_func_kwargs={ + "aggr_type": "avg", + "limit": 10, + }, +) +``` + +### Custom SQL Query + +| Function | Description | Key Args | +|----------|-------------|----------| +| `sql_query(query, merge_columns, msg, name, negate, condition_column, input_placeholder)` | Custom SQL query check | `query`, `merge_columns` | + +--- + +## Helper Function + +```python +from databricks.labs.dqx.check_funcs import make_condition + +# Create a custom condition for use in custom check functions +condition = make_condition( + condition=col("amount") > 0, + message="Amount must be positive", + alias="positive_amount_check", +) +``` + +--- + +## Custom Check Functions + +Register custom check functions using the `@register_rule` decorator: + +### Row-Level Custom Rule + +```python +from databricks.labs.dqx.rule import register_rule +from pyspark.sql import Column +from pyspark.sql.functions import col, when, lit + +@register_rule("row") +def is_valid_sku(column: str, prefix: str = "SKU-") -> Column: + """Check that SKU starts with expected prefix.""" + return when( + ~col(column).startswith(prefix), + lit(f"{column} does not start with {prefix}") + ) +``` + +### Dataset-Level Custom Rule + +```python +from databricks.labs.dqx.rule import register_rule +from pyspark.sql import Column +from typing import Callable, Tuple + +@register_rule("dataset") +def custom_dataset_check(columns: list[str], threshold: float) -> Tuple[Column, Callable]: + """Custom dataset-level check.""" + # Return (condition_column, merge_function) + ... +``` diff --git a/databricks-skills/databricks-dqx/SKILL.md b/databricks-skills/databricks-dqx/SKILL.md new file mode 100644 index 00000000..2228c3a6 --- /dev/null +++ b/databricks-skills/databricks-dqx/SKILL.md @@ -0,0 +1,256 @@ +--- +name: databricks-dqx +description: "Databricks Labs DQX (Data Quality Extensions) framework for defining and enforcing data quality rules on Delta tables using PySpark. Use when building data quality checks, profiling datasets, generating quality rules, quarantining invalid data, integrating quality checks into Lakeflow Declarative Pipelines (DLT), or when the user mentions DQX, data quality, quality checks, data validation, profiling, quarantine, quality rules, or data quality monitoring." +--- + +# Databricks Labs DQX (Data Quality Extensions) + +DQX is a Python framework for defining, applying, and monitoring data quality rules on PySpark DataFrames and Delta tables. It supports both batch and streaming, integrates with Lakeflow Declarative Pipelines (DLT), and provides profiling-based auto-generation of quality rules. + +**Repository**: https://github.com/databrickslabs/dqx +**Docs**: https://databrickslabs.github.io/dqx/ +**Package**: `databricks-labs-dqx` (PyPI) +**Status**: Databricks Labs project (no formal SLA) + +--- + +## Critical Rules (always follow) + +- **MUST** install DQX before use: `%pip install databricks-labs-dqx` +- **MUST** use Unity Catalog 3-layer namespace for table locations +- **MUST** use `WorkspaceClient()` to initialize `DQEngine` +- **PREFER** DQRule classes (programmatic) over metadata dicts for type safety +- **PREFER** profiler-generated rules as a starting point, then customize + +## Quick Start + +### Minimal Example: Apply Quality Checks to a DataFrame + +```python +%pip install databricks-labs-dqx +dbutils.library.restartPython() + +from databricks.sdk import WorkspaceClient +from databricks.labs.dqx.engine import DQEngine +from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule +from databricks.labs.dqx import check_funcs + +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# Define checks +checks = [ + DQRowRule(criticality="error", check_func=check_funcs.is_not_null, column="order_id"), + DQRowRule(criticality="error", check_func=check_funcs.is_not_null_and_not_empty, column="customer_name"), + DQRowRule(criticality="warn", check_func=check_funcs.is_in_range, column="amount", + check_func_kwargs={"min_limit": 0, "max_limit": 100000}), + DQDatasetRule(criticality="error", check_func=check_funcs.is_unique, columns=["order_id"]), +] + +# Apply checks - split valid and invalid rows +input_df = spark.table("catalog.schema.orders") +valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks) + +# Save results +valid_df.write.mode("overwrite").saveAsTable("catalog.schema.orders_valid") +quarantine_df.write.mode("overwrite").saveAsTable("catalog.schema.orders_quarantine") +``` + +### Minimal Example: Auto-Generate Rules with Profiler + +```python +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator + +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(input_df) + +generator = DQGenerator(ws) +checks = generator.generate_dq_rules(profiles) +# Review and customize generated checks, then apply +``` + +--- + +## Quick Reference + +| Concept | Details | +|---------|---------| +| **Package** | `pip install databricks-labs-dqx` | +| **Engine** | `DQEngine(WorkspaceClient())` | +| **Rule Types** | `DQRowRule` (per-row), `DQDatasetRule` (across rows), `DQForEachColRule` (multi-column) | +| **Criticality** | `"error"` = quarantine only, `"warn"` = both valid + quarantine | +| **Result Columns** | `_error` and `_warning` appended to output (customizable) | +| **Apply Methods** | `apply_checks()`, `apply_checks_and_split()`, `apply_checks_and_save_in_table()` | +| **Metadata Methods** | `apply_checks_by_metadata()`, `apply_checks_by_metadata_and_split()` | +| **Profiler** | `DQProfiler.profile(df)` + `DQGenerator.generate_dq_rules(profiles)` | +| **Storage** | YAML files, Delta tables, UC Volumes, Lakebase, workspace files | +| **Streaming** | All apply methods work with streaming DataFrames | +| **DLT/Lakeflow** | Apply checks in `@dlt.view`, split to silver + quarantine tables | +| **Extras** | `[llm]` for AI rules, `[pii]` for PII detection, `[datacontract]` for ODCS | + +--- + +## Detailed Guides + +**Installation and setup**: See [1-installation-setup.md](1-installation-setup.md) for all installation methods (pip, CLI workspace install, DABs, extras). (Keywords: install, setup, pip, CLI, workspace, extras, llm, pii, datacontract) + +**Defining quality rules**: See [2-defining-quality-rules.md](2-defining-quality-rules.md) for rule definition patterns using DQRule classes, YAML metadata, and storage backends. (Keywords: rules, checks, DQRowRule, DQDatasetRule, YAML, metadata, storage, save, load) + +**Applying checks**: See [3-applying-checks.md](3-applying-checks.md) for applying checks to DataFrames, splitting valid/invalid rows, and saving results. (Keywords: apply, split, quarantine, valid, invalid, save, DQEngine, batch) + +**Profiler and auto-generation**: See [4-profiler-auto-generation.md](4-profiler-auto-generation.md) for profiling datasets and auto-generating quality rules. (Keywords: profile, profiler, auto-generate, DQProfiler, DQGenerator, AI-assisted, data contract) + +**Lakeflow/DLT integration**: See [5-lakeflow-integration.md](5-lakeflow-integration.md) for integrating DQX with Lakeflow Declarative Pipelines (DLT). (Keywords: DLT, Lakeflow, pipeline, streaming table, expectations, quarantine) + +**Streaming and metrics**: See [6-streaming-metrics.md](6-streaming-metrics.md) for streaming support, quality metrics, and monitoring. (Keywords: streaming, metrics, monitoring, observer, listener, foreachBatch) + +**Check functions reference**: See [7-check-functions-reference.md](7-check-functions-reference.md) for the complete list of built-in check functions. (Keywords: is_not_null, is_in_range, is_unique, regex, SQL expression, aggregation, foreign key) + +--- + +## Workflow + +1. Determine the task type: + + **Installing DQX?** -> Read [1-installation-setup.md](1-installation-setup.md) + **Defining quality rules?** -> Read [2-defining-quality-rules.md](2-defining-quality-rules.md) + **Applying checks to data?** -> Read [3-applying-checks.md](3-applying-checks.md) + **Profiling data / auto-generating rules?** -> Read [4-profiler-auto-generation.md](4-profiler-auto-generation.md) + **Integrating with Lakeflow/DLT?** -> Read [5-lakeflow-integration.md](5-lakeflow-integration.md) + **Streaming or metrics monitoring?** -> Read [6-streaming-metrics.md](6-streaming-metrics.md) + **Looking up a specific check function?** -> Read [7-check-functions-reference.md](7-check-functions-reference.md) + +2. Follow the instructions in the relevant guide + +3. Use the profiler as a starting point, then customize rules for your domain + +--- + +## Common Patterns + +### Pattern 1: Profile -> Generate -> Review -> Apply + +The recommended workflow for new datasets: + +```python +from databricks.labs.dqx.profiler.profiler import DQProfiler +from databricks.labs.dqx.profiler.generator import DQGenerator +from databricks.labs.dqx.engine import DQEngine +from databricks.sdk import WorkspaceClient + +ws = WorkspaceClient() +dq_engine = DQEngine(ws) + +# Step 1: Profile +profiler = DQProfiler(ws) +summary_stats, profiles = profiler.profile(spark.table("catalog.schema.my_table")) + +# Step 2: Generate rule candidates +generator = DQGenerator(ws) +checks = generator.generate_dq_rules(profiles) + +# Step 3: Review and save +from databricks.labs.dqx.config import FileChecksStorageConfig +dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="checks.yml")) + +# Step 4: Apply +input_df = spark.table("catalog.schema.my_table") +valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks) +``` + +### Pattern 2: End-to-End (Read, Check, Write) + +```python +from databricks.labs.dqx.config import InputConfig, OutputConfig + +dq_engine.apply_checks_and_save_in_table( + checks=checks, + input_config=InputConfig(location="catalog.schema.input_table"), + output_config=OutputConfig(location="catalog.schema.valid_table", mode="overwrite"), + quarantine_config=OutputConfig(location="catalog.schema.quarantine_table", mode="overwrite"), +) +``` + +### Pattern 3: DLT/Lakeflow Pipeline Integration + +```python +import dlt +from databricks.labs.dqx.engine import DQEngine +from databricks.sdk import WorkspaceClient + +dq_engine = DQEngine(WorkspaceClient()) + +@dlt.view +def bronze_checked(): + df = dlt.read_stream("bronze_raw") + return dq_engine.apply_checks(df, checks) + +@dlt.table +def silver_valid(): + return dq_engine.get_valid(dlt.read_stream("bronze_checked")) + +@dlt.table +def quarantine(): + return dq_engine.get_invalid(dlt.read_stream("bronze_checked")) +``` + +### Pattern 4: Multi-Table Quality Checks with Wildcards + +```python +dq_engine.apply_checks_and_save_in_tables_for_patterns( + patterns=["catalog.schema.*"], + checks_location="catalog.schema.checks_table", + exclude_patterns=["*_dq_output", "*_dq_quarantine"], + output_table_suffix="_dq_output", + quarantine_table_suffix="_dq_quarantine", +) +``` + +--- + +## MCP Tools + +Use Databricks MCP tools (from `databricks-ai-dev-kit` plugin) alongside DQX: + +| Tool | Use With DQX | +|------|-------------| +| `execute_sql` | Test queries on input/output tables, verify quarantine data | +| `execute_databricks_command` | Run DQX Python code on clusters | +| `manage_uc_objects` | Create catalogs/schemas/volumes for checks storage | +| `get_table_details` | Inspect table schemas before defining rules | + +--- + +## Official Documentation + +- **[DQX Documentation](https://databrickslabs.github.io/dqx/)** - Main docs +- **[DQX GitHub](https://github.com/databrickslabs/dqx)** - Source code and issues +- **[Tutorial](https://databrickslabs.github.io/dqx/tutorial/)** - Getting started tutorial +- **[User Guide](https://databrickslabs.github.io/dqx/user_guide/)** - Detailed user guide +- **[API Reference](https://databrickslabs.github.io/dqx/reference/)** - Full API reference + +--- + +## Common Issues + +| Issue | Solution | +|-------|----------| +| **`ModuleNotFoundError: databricks.labs.dqx`** | Run `%pip install databricks-labs-dqx` then `dbutils.library.restartPython()` | +| **`WorkspaceClient` auth fails** | Ensure Databricks auth is configured (profile, env vars, or notebook context) | +| **Profiler slow on large tables** | Use `sample_fraction` (default 0.3) and `limit` (default 1000) options | +| **Rules not catching issues** | Check criticality: `"error"` quarantines, `"warn"` keeps in both outputs | +| **Custom result column names** | Use `ExtraParams(result_column_names={"errors": "my_errors", "warnings": "my_warnings"})` | +| **Streaming checkpoint errors** | Provide `checkpointLocation` in `OutputConfig.options` | +| **DLT integration fails** | Ensure `databricks-labs-dqx` is installed as a pipeline library | + +--- + +## Related Skills + +If using the `databricks-ai-dev-kit` plugin, these skills complement DQX: +- **spark-declarative-pipelines** - for building Lakeflow pipelines that use DQX checks +- **databricks-unity-catalog** - for catalog/schema/volume management +- **databricks-jobs** - for scheduling DQX quality check jobs +- **databricks-aibi-dashboards** - for building quality monitoring dashboards +- **synthetic-data-generation** - for generating test data with known quality issues