Add Spark support for Custom Expectations
This guide will help you implement native Spark support for your Custom ExpectationAn extension of the `Expectation` class, developed outside of the Great Expectations library..
Prerequisites
Great Expectations supports a number of Execution EnginesA system capable of processing data to compute Metrics., including a Spark Execution Engine. These Execution Engines provide the computing resources used to calculate the MetricsA computed attribute of data such as the mean of a column. defined in the Metric class of your Custom Expectation.
If you decide to contribute your ExpectationA verifiable assertion about data., its entry in the Expectations Gallery will reflect the Execution Engines that it supports.
We will add Spark support for the Custom Expectations implemented in our guides on how to create Custom Column Aggregate Expectations and how to create Custom Column Map Expectations.
Specify your backends
To avoid surprises and help clearly define your Custom Expectation, it can be helpful to determine beforehand what backends you plan to support, and test them along the way.
Within the examples
defined inside your Expectation class, the optional only_for
and suppress_test_for
keys specify which backends to use for testing. If a backend is not specified, Great Expectations attempts testing on all supported backends. Run the following command to add entries corresponding to the functionality you want to add:
examples = [
{
"data": {"x": [1, 2, 3, 4, 5], "y": [0, -1, -2, 4, None]},
"only_for": ["pandas", "spark", "sqlite", "postgresql"],
"tests": [
{
"title": "basic_positive_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "x",
"min_value": 4,
"strict_min": True,
"max_value": 5,
"strict_max": False,
},
"out": {"success": True},
},
{
"title": "basic_negative_test",
"exact_match_out": False,
"include_in_gallery": True,
"in": {
"column": "y",
"min_value": -2,
"strict_min": False,
"max_value": 3,
"strict_max": True,
},
"out": {"success": False},
},
],
}
]
The optional only_for
and suppress_test_for
keys may be specified at the top-level (next to data
and tests
) or within specific tests (next to title
, etc).
Allowed backends include: "bigquery", "mssql", "mysql", "pandas", "postgresql", "redshift", "snowflake", "spark", "sqlite", "trino"
Implement the Spark logic for your Custom Expectation
Great Expectations provides a variety of ways to implement an Expectation in Spark. Two of the most common include:
- Defining a partial function that takes a Spark DataFrame column as input
- Directly executing queries on Spark DataFrames to determine the value of your Expectation's metric directly
- Partial Function
- Query Execution
Great Expectations allows for much of the PySpark DataFrame logic to be abstracted away by specifying metric behavior as a partial function.
To do this, we use one of the @column_*_partial
decorators:
@column_aggregate_partial
for Column Aggregate Expectations@column_condition_partial
for Column Map Expectations@column_pair_condition_partial
for Column Pair Map Expectations@multicolumn_condition_partial
for Multicolumn Map Expectations
These decorators expect an appropriate engine
argument. In this case, we'll pass our SparkDFExecutionEngine
.
The decorated method takes in a Spark Column
object and will either return a pyspark.sql.functions.function
or a pyspark.sql.Column.function
that Great Expectations will use to generate the appropriate SQL queries.
For our Custom Column Aggregate Expectation ExpectColumnMaxToBeBetweenCustom
, we're going to leverage PySpark's max
SQL Function and the @column_aggregate_partial
decorator.
@column_aggregate_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, _table, _column_name, **kwargs):
"""Spark Max Implementation"""
return F.max(column)
If we need a builtin function from pyspark.sql.functions
, usually aliased to F
, the import logic in
from great_expectations.compatibility.pyspark import functions as F
from great_expectations.compatibility import pyspark
allows us to access these functions even when PySpark is not installed.
Details
Applying Python Functions
F.udf
allows us to use a Python function as a Spark User Defined Function for Column Map Expectations,
giving us the ability to define custom functions and apply them to our data.Here is an example of F.udf
applied to ExpectColumnValuesToEqualThree
:
@column_condition_partial(engine=SparkDFExecutionEngine)
def _spark(cls, column, strftime_format, **kwargs):
def is_equal_to_three(val):
return (val == 3)
success_udf = F.udf(is_equal_to_three, pyspark.types.BooleanType())
return success_udf(column)
For more on F.udf
and the functionality it provides, see the Apache Spark UDF documentation.
The most direct way of implementing a metric is by computing its value by constructing or directly executing querys using objects provided by the @metric_*
decorators:
@metric_value
for Column Aggregate Expectations- Expects an appropriate
engine
,metric_fn_type
, anddomain_type
- Expects an appropriate
@metric_partial
for all Map Expectations- Expects an appropriate
engine
,partial_fn_type
, anddomain_type
- Expects an appropriate
Our engine
will reflect the backend we're implementing (SparkDFExecutionEngine
), while our fn_type
and domain_type
are unique to the type of Expectation we're implementing.
These decorators enable a higher-complexity workflow, allowing you to explicitly structure your queries and make intermediate queries to your database. While this approach can result in extra roundtrips to your database, it can also unlock advanced functionality for your Custom Expectations.
For our Custom Column Map Expectation ExpectColumnValuesToEqualThree
, we're going to implement the @metric_partial
decorator,
specifying the type of value we're computing (MAP_CONDITION_FN
) and the domain over which we're computing (COLUMN
):
@metric_partial(
engine=SparkDFExecutionEngine,
partial_fn_type=MetricPartialFunctionTypes.MAP_CONDITION_FN,
domain_type=MetricDomainTypes.COLUMN,
)
def _spark(
cls,
execution_engine: SparkDFExecutionEngine,
metric_domain_kwargs,
metric_value_kwargs,
metrics,
runtime_configuration,
):
The decorated method takes in a valid Execution Engine and relevant kwargs
,
and will return a tuple of:
- A
pyspark.sql.column.Column
defining the query to be executed compute_domain_kwargs
accessor_domain_kwargs
These will be used to execute our query and compute the results of our metric.
To do this, we need to access our Compute Domain directly:
(
selectable,
compute_domain_kwargs,
accessor_domain_kwargs,
) = execution_engine.get_compute_domain(
metric_domain_kwargs, MetricDomainTypes.COLUMN
)
column_name = accessor_domain_kwargs["column"]
column = F.col(column_name)
This allows us to build and return a query to be executed, providing the result of our metric:
query = F.when(column == 3, F.lit(False)).otherwise(F.lit(True))
return (query, compute_domain_kwargs, accessor_domain_kwargs)
Because in Spark we are implementing the window function directly, we have to return the unexpected condition: False
when column == 3
, otherwise True
.
Verify your implementation
If you now run your file, print_diagnostic_checklist()
will attempt to execute your example cases using this new backend.
If your implementation is correctly defined, and the rest of the core logic in your Custom Expectation is already complete, you will see the following in your Diagnostic Checklist:
✔ Has at least one positive and negative example case, and all test cases pass
If you've already implemented the Pandas backend covered in our How-To guides for creating Custom Expectations and the SQLAlchemy backend covered in our guide on how to add SQLAlchemy support for Custom Expectations, you should see the following in your Diagnostic Checklist:
✔ Has core logic that passes tests for all applicable Execution Engines and SQL dialects
Congratulations!
🎉 You've successfully implemented Spark support for a Custom Expectation! 🎉
Contribution (Optional)
This guide will leave you with core functionality sufficient for contribution to Great Expectations at an Experimental level.
If you're interested in having your contribution accepted at a Beta level, your Custom Expectation will need to support SQLAlchemy, Spark, and Pandas.
For full acceptance into the Great Expectations codebase at a Production level, we require that your Custom Expectation meets our code standards, including test coverage and style. If you believe your Custom Expectation is otherwise ready for contribution at a Production level, please submit a Pull Request, and we will work with you to ensure your Custom Expectation meets these standards.
For more information on our code standards and contribution, see our guide on Levels of Maturity for Expectations.
To view the full scripts used in this page, see them on GitHub: