Version Controlled Data Pipelines: Metaflow + Dolt

Background

This post details how to use Metaflow with Dolt. Metaflow is a framework for defining data science and data engineering workflows with the the ability to define local experiments and scale those experiments to production jobs from a single API. Dolt is a version controlled relational database. It provides a familiar SQL interface along with Git-like version control features. Each commit corresponds to the state of the database at the time the commit was created. Both Dolt and Metaflow are open source.

We show how the combination of Dolt and Metaflow can be used to address three common challenges in modern data science and data engineering projects and architectures:

Reproducibility: how to make sure that we can repeat past results consistently and improve results incrementally. Disciplined science and engineering requires that goalposts don’t shift unexpectedly, for example due to changes in input data.
Experiment tracking: how to keep track of all changes and experiments executed. Today, it is hard to imagine software development without a version control like Git — data science needs a similar tool to stay organized.
Lineage and auditing: finally, when our system is in production, we want to understand the exact model and input data that contribute to the system’s output, for example predictions. This is especially important when results are surprising and we need to understand why.

Both Dolt and Metaflow are built around the idea of strong versioning: Dolt versions data and Metaflow versions code, execution environments, and the state of execution. To illustrate how Metaflow and Dolt work together to solve for these challenges, we use an example Metaflow pipeline to derive results from an input dataset. We read the input data into a Pandas DataFrame and convert that DataFrame to permanent storage in Dolt for use with application layer services with an intermediate step. Dolt SQL Server can be used to serve this versioned and reproducible data over MySQL connectors into other parts of the infrastructure.

Let’s dive into the details of our example pipeline.

Example Pipeline

Our pipeline consists of two flows. One flow consumes the results written by the other. The first flow computes the state level median price for a hospital procedure. The second flow computes the variance of the price for a procedure across states. For input data we chose a new public dataset of 1400 US hospital prices created using DoltHub data bounties. Our pipeline will look something like schematic below.

We will use the end result to illustrate how integrating Dolt and Metaflow provides Metaflow users with the ability to traverse versions of their final data, as well as traceback through the pipeline to examine various stages. Users can do this via the Metaflow API and Dolt integration in a familiar environment like a console or a notebook.

All the code from the example pipeline is open source, and can be found in the dolt-integrations GitHub repository.

How it Works

Our design goal with this integration was to give the Metaflow user additional capabilities directly from Metaflow. We wanted to minimize additional API surface area.

Workflows in Metaflow are called “flows.” Each flow stores metadata about flow execution, referred to as a “run.” Each time a run interacts with Dolt, we capture a small amount of metadata to make that interaction reproducible. We create a mapping between Metaflow runs and Dolt commits to provide users with powerful lineage and reproducibility features.

Runs of a flow read and write Pandas DataFrame objects to Dolt.

Individual steps of a run can create separate commits that snapshot the state of the database following a write.

When a flow reads data from Dolt, it records exactly how that data was read inside Metaflow. When a flow writes to Dolt it creates a commit and captures the associated metadata, as well as formatting the commit message. This allows users to browse the inputs and outputs of their flows from the Metaflow API directly without having to know much of anything about Dolt. Furthermore users can retrieve the flow that last touched a table at a given branch or commit, also directly from the Metaflow API.

This is all abstract, so let’s install a few dependencies, grab a dataset, and get stuck into running our pipeline.

Setup

Let’s get the boring stuff out of the way. We need the following:

- Dolt and dolt-integrations installed
- Metaflow installed
- the sample dataset we will use, which can easily clone from DoltHub

Install Dolt

The first step is to install Dolt on a *nix system:

There are Windows distributions and a Homebrew cask. Find more details about installation here.

Install dolt-integrations[metaflow]

Next let’s install the Metaflow + Dolt integration. It comes packaged with both Metaflow, and Dolt’s Python CLI wrapper,doltcli. It’s easy enough to install via pip:

Get The Data

The final step is to acquire the dataset. Recall Dolt is a SQL database with Git-like version control features, and it comes with the ability to clone a remote database to your local machine. We can use that feature to easily acquire a Dolt database:

Note this dataset is nearly 20 gigabytes, and could take a few minutes to clone. Once it’s landed it’s straightforward to jump right into SQL:

We are now ready to start running our Metaflow based pipeline.

Using Metaflow

Before we get into the details, let’s first produce a run of our pipeline off the latest version of the upstream database. The first flow computes the median cost of a given hospital procedure at the state level:

The second flow computes the variance in median procedure price across states:

We now have our first result set computed. Let’s access the computed variances via the integration, using the flow as an entry point. You can execute the following snippet, and subsequent examples, using the Python interpreter on the command line or in a notebook. The snippet uses Metaflow’s Client API to access results of past runs:

We see that we have successfully computed procedure level variance:

We have seen it’s relatively straightforward to run our pipeline, and access our versioned results via a reference to the flow that produced them. We now dive into some of the capabilities this provides Metaflow users who choose to use Dolt in their data infrastructure.

Back-testing

In this example our input dataset is stored in Dolt. We used a DoltHub dataset because it’s easy to clone the dataset and get started, and after all this is about integrating Dolt with Metaflow. But having our input dataset in Dolt isn’t just a matter of convenience for this post . Because every Dolt commit represents the complete state of the database at a point in time, we can easily point our pipeline to historical versions of the data. Let’s examine the Dolt commit graph and grab a commit straight from the SQL console:

Suppose we’d like to run our pipeline with input data as of commit gstcq5loi9ieqdv1elrljab9hcgr090p, the first one labeled Updating hospitals with changes from hospitals.csv. That’s easy enough, first let’s name the commit with a branch:

Before recomputing the medians let’s create a branch in our hospital-price-analysis database associated with the root commit to store these experiments:

Now let’s kick off recomputing the medians. Since we are recomputing our medians from a historical version of the raw pricing data we will write them to a separate experimentation branch:

And the variances can be run similarly, again using the experimentation branch:

We can now query the results directly from Python:

Since this commit was taken from far earlier in the data gathering process, we can see far fewer unique procedure codes, 21K vs 1.17M:

Or we can use Dolt SQL’s AS OF syntax to query the results of our backtest:

In this section we saw how storing flow inputs in Dolt makes back-testing straightforward. This is enabled by how easy Dolt’s commit graph makes it for users specify a historical version of the data.

Reproducibility

Our pipeline contains two steps, one computes state procedure price medians, and the second computes procedure price variances across states. Suppose now that we would like to tweak the way we compute variances. We might like to exclude some outliers, or invalid procedure codes. In a production setting we might own the variances computation but not the medians computation, and have stricter criteria for excluding invalid data. Let’s update our variances job and then recompute using an updated flow definition.

Let’s first look at our Flow definition to exclude corrupt procedure codes:

Reproducibility comes from passing the path to a previous run:

We use that path as a parameter to DoltDT, which in turn causes DoltDT to read data in exactly the same way as the run specified by the provided run path:

Looking back to our first run of the flow computing the medians, the run ID was 1617810636925188. We can also see this in Dolt:

Let’s create a branch pinned to the commit we want to reproduce from:

We can pass this branch straight into our job to achieve the desired data read isolation for testing our code changes:

We can now see that our procedure level variances have been filtered appropriately and we no longer have corrupt procedure codes in our dataset, first grabbing it using the run path:

And we have eliminated the corrupt procedure codes:

By simply retrieving a run path, and kicking off our variances flow, we were able to reproduce the exact inputs of a historical run.

Lineage

In the previous section we showed how to run one Flow using the inputs of a previous run. We did this to achieve data version isolation for the purposes of testing our code changes. The same mechanism we used for achieving this kind of reproducibility also allows us to track data lineage. Recall that our pipeline has two steps, each a separate flow:

Obviously a real world example might have a much more complicated data dependency graph, making this kind of tracking all the more important. Let’s see how we would trace the lineage of the final variances. The first thing to do is grab the run that created the current production data:

Which produces a Metaflow run path:

We now have the run ID of the flow that produced our variances. We can access the flow that wrote the medians in a similar manner:

And we have traced back to the medians job that originally produced this data:

As a final step we can pull at the input data, since we stored in Dolt:

Which yields the original input dataset we started out with:

By storing Metaflow results in Dolt, result sets can be associated with flows and the input datasets. When results from Metaflow are put into other data stores we don’t have a way to trace the table back to flow run that produced it.

Conclusion

In this post we demonstrated how to use Dolt alongside Metaflow. Metaflow provides a framework for defining data engineering and data science workflows. Using Dolt for inputs and outputs augments pipelines defined in Metaflow with additional capabilities. Users can examine a table in their Dolt database and locate the flow that produced that table, and if that flow used Dolt as an input, locate the flows that rate the input data, and so on. Users can also run a flow pinning a historical version of the data, providing for reproducible runs that use data version isolation to ensure code changes are properly tested. Finally, when Dolt is used as an input the commit graph can be used for back-testing against historical versions of the data.

If you want to learn more about using Dolt with Metaflow, join the Dolt team on Discord, or checkout the Metaflow docs.

DoltHub sponsors Dolt, the version controlled SQL database, and builds DoltHub, the hosting and collaboration platform for Dolt databases.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store