Unleash the power of Azure Data Factory

Bringing data to the cloud is a massive undertaking. In order to tackle this challenge, we used Azure Data Factory to create a maintenance-free, generic parameter-driven framework to ingest data and track changes within the data. In this blog we’ll be covering this journey from beginning to end, starting with relatively straightforward data ingestion, then working our way up to more complex techniques in Dataflows. All with our tooling of choice: Azure Data Factory!

Tristan Kloppenburg
Tristan Kloppenburg
Data Integration Lead
7 minutes
February 20, 2023
Visual of Azure Data Factory.
Image 1. Azure Data Factory.

Data to the cloud

While the concept of bringing data to the cloud sounds relatively straightforward, I can tell you that the whole journey from data sitting in a legacy source system somewhere on-premise to a fully standardized historical dataset on Azure Cloud Storage is quite an adventurous one.

With our framework, we create and maintain over a dozen different datasets from various sources and make them available within Rabobank on its Global Data Platform.

It’s maintenance-free because we use technical features from the parquet file format that support schema drift. That means if attributes are being added in the source tables it will be picked up automatically, like magic!

In order to track changes in the data, we use a well-established concept within data warehousing; Slowly Changing Dimensions (type 2) in combination with Delta Lake.

A word of caution… this blog has data integrators with some experience in Data Factory as target audience in mind. It is not meant as a full tutorial or comprehensive guide for Data Factory. Not a data integrator or familiar with data integration tools? Use it as inspiration to understand Data Factory’s capabilities!

Image 2. Copy the data and then transform the data.
Image 2. Copy the data and then transform the data.

Use the power of parameters

The basis for our framework is the extensive usage of parameters. Everything is made in a generic way and nothing is hard-coded. What this allows us to do is to have only one data pipeline that does everything. Ok, maybe not everything, but it does copy the data from a relational database source to Azure storage and transforms it to parquet format.

For our first use case, this was around twelve different tables. But you can imagine that it easily scales to hundreds.

To copy data in Data Factory you need a Copy Data activity which uses a source and target Dataset. Datasets make use of Linked Services which are the connections to your systems.

What to parameterize? … Well everything!

  • Oracle - Linked service
    Parameterize the connection string (A secret in Keyvault) which allows us to connect to different environments if we want to.
  • Oracle – Dataset (1)
    Parameterize the Schema and Table (2)
  • Oracle – Query (3)
    In the query we use the schema and table parameters, and for our particular use case we also want to select data for a particular day, so that’s another parameter.
  • Azure Data Lake Storage Gen2 - Linked service
    Parameterize the storage URL string so we use one linked service for all our Azure connections
  • Parquet – Dataset (4)
    Parameterize the source container and subfolder (5) (and optionally the filename)

Note: in the copy activity we do not specify any schema for the data as we want to capture everything. And because we’re writing every load to a separate partition it will allow schema changes over time.

Image 3. Copy Data activity with Source and Target (Sink) configuration.
Image 3. Copy Data activity with Source and Target (Sink) configuration.

Where do these values come from?

Now that we have these parameters everywhere, the next question is: where do the values come from?

Actually you can do this in multiple ways; it could be from a configuration file that you prepare in a CI/CD pipeline or from a separate control table. In our project we try to drive these values via our scheduling platform in Airflow. We call the same Data Factory pipeline from Airflow, but we use different parameters every time.

A pragmatic and straightforward way that I will show you here is to drive these values from Data Factory itself, using set variable activity. Within a For Each loop we can refer to these values using item() reference.

In the For Each activity you can set to run it either sequential or in parallel.

Example : [ Item().sink_data_set_name ] refers to the value that is set in the json (see the image below).

Image 4. Our parameters provided in a json structure.
Image 4. Our parameters provided in a json structure.

Track changes in the Data using SCD2 logic

So far that was all pretty basic, but now that we have our data in the cloud we can start building our historical timelines. We have our existing historical set, we join it with the new incoming data and determine which records are new and which records have changed, then we write the result back to our historical set. Rinse and repeat!

What about the first day? the first day we don’t have any historical data yet. Luckily for us the first day is very straightforward; we just insert all the records we receive with the added technical fields. This will then serve as the starting point of our historical datasets.

If you’re not familiar with the Slowly Changing Dimension concept, the wiki entry has a very good explanation on the topic. You can find the link below.

Image 5. Schematic overview Slowly Changing Dimension type 2.
Image 5. Schematic overview Slowly Changing Dimension type 2.

Track changes in the Data using SCD2, but now in Dataflow

Applying the SCD2 logic in Dataflow requires quite some transformations, and to explain them all in detail would require an additional blog post. But the picture and the input parameters should give you an idea of what we’re trying to achieve here.

To process the data in a generic way without a schema, the most important function we use is byName() – byName selects a column value by name in the stream. This allows us to select and use columns from our source data based on our input parameters. (See this link).

A tip: some transformations, like the window transformation, don’t allow dynamically selecting attributes in the input stream. What you can do is first map it to static columns using derived columns. Later on you can drop those introduced columns if you don’t want them in your target dataset.

For our dataflow to function we need the following parameters:

  • Dataset
    The name of our dataset. We use a common pattern for reading and writing the data that comes from this parameter.
  • Key columns
    Columns to identify the (composite) primary key, we calculate a hash value on those columns which is the basis for the Delta Detection.
  • Non key exclude columns
    We calculate the hash value on all the columns minus these columns to determine if a record has changed or not.

In our process we basically have 2 technical fields: the KEY_HASH and the NON_KEY_HASH. With just these 2 columns we are able to apply the complete SCD2 logic.

  • Snapshot type
    You have separate logic to handle deleted records, depending if you receive all records or only the changed records.
  • sortingColumns, validDateColumn and partitionColumn
    We need to know in which order to process the data and which field to use to base the technical attributes on. (VALID_FROM and VALID_TO) The last parameter is the partition key, and that is how we partition the target dataset.

Based on that, our input example json looks like this:

{
     "seq": "1",
     "dataset": “OUR_DATASET_01",                                              
     "keyColumns": “PRIMARY_KEY_2| |PRIMARY_KEY_2",                           
     "nonkeyExcludeColumns": " PRIMARY_KEY_2 | | PRIMARY_KEY_2"      
     "snapshotType": "delta",                                         
     "sortingColumns": “EXTRACT_DATE",
     "validDateColumn": “EXTRACT_DATE",                        
     "partitionColumn":“EXTRACT_DATE "                                  
  }
JSON

Note in the picture below that the existing set uses delta file format and the incoming data is in parquet file format.

Image 6. Slowly Changing Dimension logic captured in Dataflow.
Image 6. Slowly Changing Dimension logic captured in Dataflow.

Concluding thoughts about Azure Data Factory

Now that we are at the end of our data to the cloud journey, I want to share some final thoughts with you. Our team has been using the principles of this framework successfully for about 2 years now, and for our latest data ingestion project we extended the functionality to handle text files. The principles are the same, but with text files we also need to provide an input schema for the data. The benefits of working with such a framework should be clear; if we have a similar use case it is very easy to reuse this and we spend the time gathering the required parameters, the functional side instead of development, the technical side.

I hope this blog post was informative and gave you some ideas about the possibilities and capabilities of Data Factory. And of course that it can serve as inspiration to create your own generic frameworks that you can use for your future data integration projects!

Resources

Discover more articles

About the author

Tristan Kloppenburg
Tristan Kloppenburg
Data Integration Lead
Tristan Kloppenburg is a data integration specialist. He is currently the development lead for Data & Factory Services, part of the Data & Analytics Tribe within Rabobank. He and his team work tirelessly to make data available from a diverse set of sources onto the Global Data Platform; the Governed Data Marketplace for the entire Rabobank. Today he works entirely in Azure Cloud, but he formerly worked with on-premise infrastructure, building the Data Warehouse for Customer Data. His prior endeavours involved large scale data migration projects following bank mergers. In total he has close to 20 years of experience in IT, including 8 at Rabobank.