From Big Data to Better Data: Ensuring Data Quality with Verity | by Michael McPhillips

Verity GoalsMake adding checks, viewing their result histories, and receiving alerts as easy and transparent as possible.To not be tightly coupled to a particular data orchestration engine, data store, or compute technology.Be reliable, fault-tolerant, and highly scalable — particularly handle extreme request volume spikes from daily event-processing ETLs.High-level ConceptsCheck Definition — The user-written configuration file defining the data quality contract and who to notify if it is breached.Check Result— The numeric measurement of data quality at a point in time, a boolean pass/fail value, and metadata about this run.Check Development — The process of interactively creating and testing Check Definitions with our VerityCLI.Check Orchestration —How a previously configured Check Definition is dispatched.Check Execution— How the data quality check is performed within Verity’s web services after being initiated by Check Orchestration.Verity UI — The Verity website where Check Results and Check Definitions can be viewed.Check DefinitionThe Verity product begins with a user-written YAML definition of data quality called the Check Definition.Query — This field generates a numeric measurement of data quality — such as a count or an average. It uses either raw SQL or our domain-specific language (DSL). Our DSL provides a fast, SQL-less short-hand for the most common queries.Condition — This field describes how the query result is to be evaluated into a pass or a fail. It can be a fixed threshold or a statistical one.Metadata — This includes a human-readable name, a universally unique identifier (UUID), ownership information, and tags (arbitrary semantic aggregations like ‘ML-feature’ or ‘business-reporting’).Notifier — This lists the target(s) to notify upon failure — via PagerDuty, Slack, or email.Three Example Check DefinitionsThe first check addresses the completeness issue from our first example — that our rider_events.session_id is never null.core rider events session_id is not null: # check namemetadata:id: 90bde4fa-148b-4f06-bd5f-f15b3d2ad759ownership_slack: #dispatch-service-devtags: [rides, core-data, high-priority]query:type: dsldata_source_id: hive.core.rider_eventsfilters:- session_id = nullcondition:type: fixed_thresholdmax: 0notifier_group:pagerduty_policy: dispatch-serviceemail: dispatch-service@lyft.pagerduty.comThe next check addresses the issue from our second example. It will ensure the number of canceled rides for this day is not more than 3 standard deviations outside the 90-day historic mean. This check will be dispatched daily at 4 AM by our scheduler.core rider events daily canceled volume is inside 3 SDs: # check namemetadata:id: 3cb75736-1fbe-4f6d-bad5-67bf613f5d62ownership_slack: #dispatch-service-devquery:type: dsldata_source_id: hive.core.rider_eventsfilters:- final_state = canceledcondition: type: z_scoremin: -3max: 3history: 90 daysschedule: ## field read by VeritySchedulertype: cronexpression: “0 0 4 1/1 * ? *” ## 4 AM dailynotifier_group:slack: #dispatch-service-alertsemail: dispatch-service@lyft.pagerduty.comThis last check addresses the timeliness issue from our third example. It will ensure that the raw_events.bike_sessions table has no entries where the last_updated_ms (Hive timestamp) is more than 5 minutes later than the occurred_at_ms (generation timestamp).raw bike sessions data is more than 5 minutes late: # check namemetadata:id: 6eb84cc-efe3-466e-ab48-a7e1fec6ddq6ownership_slack: #tbs-devquery:type: dsldata_source_id: hive.raw_events.bike_sessionsfilters:- occurred_at_ms < last_updated_ms + 5 * 60 * 1000condition:type: fixed_thresholdmax: 0In order to develop these Check Definitions, we made the VerityCLI. It enables customers to validate, backtest, and backfill their checks across date ranges interactively before committing them. This same validation is done on each pull request to ensure no bad configurations are committed, using Github CI Actions.For example, our backtest command of the completeness check looks like this:(veritydata venv)mmcphillips@ABCHELLOO veritydata % veritycli backtest \–check_id 90bde4fa-148b-4f06-bd5f-f15b3d2ad759 –ds 2023–10–15=!========================================================================Beginning backtest for 1 date(s) and 1 check(s). check_ids: [’90bde4fa-148b-4f06-bd5f-f15b3d2ad759′] ds_dates: [‘2023–10–15T00:00:00+00:00’]=!========================================================================SQL Query: SELECT COUNT(*) as resultFROM “hive”.”core”.”rider_events”WHERE ds = ‘2023–10–15’AND session_id IS nullResult Set:result2.00Maximum Value: 0.00Check Result: FAILURE=!========================================================================overall command finished in : 2.176988840103149 secondsaggregate results: SUCCESS : 0FAILURE : 1INTERNAL_ERROR : 0CLIENT_ERROR : 0=!========================================================================Airflow and FlyteData engineers can dispatch these checks inside Flyte, Airflow, or other systems which create or consume Hive data. To do this, we created the VerityAirflowOperator and VerityFlyteOperator. These operators dispatch checks and poll for the results. However, the operators are merely delegators — they add their own typed exceptions and retry strategies and delegate the real business logic to our VeritySDK for better maintainability.In Airflow, we instantiate the VerityAirflowOperator by citing the check_id previously created in the Check Definition. We then add it to the Airflow DAG (Directed Acyclic Graph) in the desired position:with DAG(dag_id=”rider_events_example”) as dag:….dq_check_blocking = VerityOperator(task_id=”completeness_dq_blocking”,check_id=”90bde4fa-148b-4f06-bd5f-f15b3d2ad759″,check_datetime=ds,is_blocking=True)create_staged_data >> dq_check_blocking >> exchange_dataThe VerityAirflowOperator can be used in a blocking fashion to halt a DAG upon a check failure, preventing bad data from ever reaching production. This utilizes the “Stage-Check-Exchange” pattern: we create data in a staged schema, verify the data with a blocking operator, then promote it to production if it passes quality checks.Verity Scheduled ChecksData analysts and data scientists can dispatch Verity Scheduled Checks, which are checks orchestrated at specified times using Verity’s job scheduler. This is useful because these users are often not familiar with ETL tooling. To orchestrate Verity Scheduled Checks, they simply write the CRON expression into the Check Definition as seen in Example 2 above.Now that we have seen what a user must do to onboard to Verity, let us see what happens after they do. Our system design mainly follows an asynchronous compute engine pattern. We have four web-services in the back-end (shown in teal): the Scheduler, API Server, Executor, and Notifier. This loosely-coupled, service-oriented approach allows us to evolve and scale each component independently, while limiting the blast-radius of failures.