Azure Data Factory V2 – Incremental loading with configuration stored in a table – Complete solution, step by step.

This post explains things that are difficult to find even in English. That’s why I will break my rule and will not write it in my native language! Po wersję polską zapraszam do google translate :>

Introduction


Loading data using Azure Data Factory v2 is really simple. Just drop Copy activity to your pipeline, choose a source and sink table, configure some properties and that’s it – done with just a few clicks!

But what if you have dozens or hundreds of tables to copy? Are you gonna do it for every object?

Fortunately, you do not have to do this! All you need is dynamic parameters and a few simple tricks 🙂

Also, this will give you the option of creating incremental feeds, so that – at next run – it will transfer only newly added data.

Mappings

Before we start diving into details, let’s demystify some basic ADFv2 mapping principles.

  • Copy activity doesn’t need to have defined column mappings at all,
  • it can dynamically map them using its own mechanism which retrieves source and destination (sink) metadata,
  • if you use polybase, it will do it using column order (1st column from source to 1st column at destination etc.),
  • if you do not use polybase, it will map them using their names but watch out – it’s case sensitive matching!
  • So all you have to do is to just keep the same structure and data types on the destination tables (sink), as they are in a source database.

Bear in mind, that if your columns are different between source and destination, you will have to provide custom mappings. This tutorial doesn’t show how to do it, but it is possible to pass them using “Get metadata” activity to retrieve column specification from the source, then you have to parse it and pass as JSON structure into the mapping dynamic input. you can read about mappings in official documentation: https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-schema-and-type-mapping

String interpolation – the key to success

My entire solution is based on one cool feature, that is called string interpolation. It is a part of built-in expression engine, that simply allows you to just inject any value from JSON object or an expression directly into string input, without any concatenate functions or operators. It’s fast and easy. Just wrap your expression between  @{ ... } . It will always return it as a string.

Below is a screen from official documentation, that clarifies how this feature works:

Read more about JSON expressions at https://docs.microsoft.com/en-us/azure/data-factory/control-flow-expression-language-functions#expressions

 

So what we are going to do? :>


Good question 😉

In my example, I will show you how to transfer data incrementally from Oracle and PostgreSQL tables into Azure SQL Database.

All of this using configuration stored in a table, which in short, keeps information about Copy activity settings needed to achieve our goal 🙂

Adding new definitions into config will also automatically enable transfer for them, without any need to modify Azure Data Factory pipelines.

So you can transfer as many tables as you want, in one pipeline, at once. Triggering with one click 🙂

 

Every process needs diagram :>

 

 

Basically, we will do:

  1. Get configuration from our config table inside Azure SQL Database using Lookup activity, then pass it to Filter activity to split configs for Oracle and PostgreSQL.
  2. In Foreach activity created for every type of database, we will create simple logic that retrieves maximum update date from every table.
  3.  Then we will prepare dynamically expressions for SOURCE and SINK properties in Copy activity. MAX UPDATEDATE, retrieved above, and previous WATERMARK DATE, retrieved from config, will set our boundaries in WHERE clause. Every detail like table name or table columns we will pass as a query using string interpolation, directly from JSON expression. Sink destination will be also parametrized.
  4. Now Azure Data Factory can execute queries evaluated dynamically from JSON expressions, it will run them in parallel just to speed up data transfer.
  5. Every successfully transferred portion of incremental data for a given table has to be marked as done. We can do this saving MAX UPDATEDATE in configuration, so that next incremental load will know what to take and what to skip. We will use here: Stored procedure activity.
This example simplifies the process as much as it is possible. Remember, in your solution you have to implement logic for every unsuccessful operation. You can achieve that using On Failure control flow with some activities (chosen depending on your needs) and timeout/retry options set individually for every activity in your pipeline.

 

About sources

I will use PostgreSQL 10 and Oracle 11 XE installed on my Ubuntu 18.04 inside VirtualBox machine.

In Oracle, tables and data were generated from EXMP/DEPT samples delivered with XE version.

In PostgreSQL – from dvd rental sample database: http://www.postgresqltutorial.com/postgresql-sample-database/

 

I simply chose three largest tables from each database. You can find them in a configuration shown below this section.

 

Every database is accessible from my Self-hosted Integration Runtime. I will show an example how to add the server to Linked Services, but skip configuring Integration Runtime. You can read about creating self-hosted IR here: https://docs.microsoft.com/en-us/azure/data-factory/create-self-hosted-integration-runtime.

 

About configuration

In my Azure SQL Database I have created a simple configuration table:

Id is just an identity value, SRC_name is a type of source server (ORA or PG).

SRC and DST tab columns maps source and destination objects. Cols defines selected columns, Watermark Column and Value stores incremental metadata.

And finally Enabled just enables particular configuration (table data import).

As Andy rightly noted in the comment below this post, it is possible to use “Cols” also to implement SQL logic, like functions, aliases etc. The value from this column is rewritten directly to the query (more precisely – concatenated between SELECT and FROM clause). So you can use it according to your needs.

 

This is how it looks with initial configuration:

Create script:

 

EDIT 19.10.2018

Microsoft announced, that now you can parametrize also linked connections!

https://azure.microsoft.com/en-us/blog/parameterize-connections-to-your-data-stores-in-azure-data-factory/

Let’s get started (finally :P)


Preparations!

Go to your Azure Data Factory portal @ https://adf.azure.com/

Select Author button with pencil icon:

 

Creating server connections (Linked Services)

We can’t do anything without defining Linked Services, which are just connections to your servers (on-prem and cloud).

  1. Go to   and click 
  2. Find your database type, select and click 
  3. Give all needed data, like server ip/host, port, SID (Oracle need this), login and password.
  4. You can  if everything is ok. Click Finish to save your connection definition.
Some types of servers, such as PostgreSQL or MySQL, require separate .NET drivers. Check your server type here in Microsoft Docs and search for Prerequisites to match your scenario.

I have created three connections. Here are their names and server types:

 

Creating datasets

Creating linked services is just telling ADF what are connection settings (like connection strings).

Datasets, on the other hand, points directly to database objects.

BUT they can be parametrized, so you can just create ONE dataset and use it passing different parameters to get data from multiple tables within same source database 🙂

Source datasets

Source datasets don’t need any parameters. We will later use built-in query parametrization to pass object names.

  1. Go to   and click + and choose 
  2. Choose your datataset type, for example 
  3. Rename it just as you like. We will use name: “ORA”
  4. Set proper Linked service option, just like this for oracle database: 
  5. And that’s it! No need to set anything else. Just repeat these steps for every source database, that you have.

In my example, I’ve created two source datasets, ORA and PG

As you can see, we need to create also the third dataset. It will work as a source too, BUT also as a parametrizable sink (destination). So creating it is little different than others.

Sink dataset

Sinking data needs one more extra parameter, which will store destination table name.

  1. Create dataset just like in the previous example, choose your destination type. In my case, it will be Azure SQL Database.
  2. Go to , declare one String parameter called “TableName”. Set the value to anything you like. It’s just dummy value, ADF just doesn’t like empty parameters, so we have to set a default value.
  3. Now, go to , set Table as dynamic content. This will be tricky :). Just click “Select…”, don’t choose any value, just click somewhere in empty space. The magic option “Add dynamic content” now appears! You have to click it or hit alt+p. 
  4. “Add Dynamic Content” windows is now visible. Type: “@dataset().TableName” or just click “TableName” in “Parameters” section below “Functions”.
  5. The table name is now parameterized. And looks like this: 

 

Parametrizable PIPELINE with dynamic data loading.


Ok, our connections are defined. Now it’s time to copy data :>

 

Creating pipeline

  1. Go to you ADF and click PLUS symbol near search box on the left and choose “Pipeline“: 
  2. Reanme it. I will use “LOAD DELTA“.
  3. Go to Parameters, create new String parameter called ConfigTable. Set value to our configuration table name: load.cfg . This will simply parametrize you configuration source. So that in the future it would be possible to load a completely different set of sources by changing only one parameter :>
  4. In case you missed it, SAVE your work by clicking “Save All” if you’re using GIT or “Publish All” if not ;]

 

Creating Lookup – GET CFG

First, we have to get configuration. We will use Lookup activity to retrieve it from the database.

Bear in mind, that lookup activity has some limits. Currently, the maximum number of rows, that can be returned by Lookup activity is 5000, and up to 2MB in size. Also max duration for Lookup activity before timeout is one hour. Go to documentation for latest info and updates.
    1. Drag and drop  into your pipline
    2. Rename it. This is important, we will use this name later in our solution. I will use value “GET CFG“.
    3. In “Settings” choose 
    4. Now, don’t bother TableName set to dummy :> Just in “Use Query” set to “Query“, click “Add dynamic content” and type:
    5. Unmark “First row only“, we need all rows, not just first. All should look like this:

 

Creating Filters – ORA CFG & PG CFG

Now we have to split configs for oracle and PostgreSQL. We will use Filter activity on rows retrieved in “GET CFG” lookup.

  1. Drag and drop twice.
  2. Rename the first block to “ORA CFG“, second to “PG CFG“.
  3. Now go to “ORA CFG“, then “Settings“.
  4. In Items, click Add dynamic content and type:  @activity('GET CFG').output.value . As you probably guess, this will point directly to GET CFG output rows 🙂
  5. In Condition, click Add dynamic content and  type: @equals(item().SRC_name,'ORA') . We have to match rows for oracle settings. So we know, that there is a column in config table called “SRC_name“. We can use it to filter out all rows, except that with value ‘ORA’ 🙂 .
  6. Do the same with lookup activity “PG CFG“. Of course, change the value for a condition.

It should look like this:

Creating ForEach – FOR EACH ORA & FOR EACH PG

Now it’s time to iterate over each row filtered in separate containers (ORA CFG and PG CFG).

  1. Drag and drop two  blocks, rename them as “FOR EACH ORA” and “FOR EACH PG“. Connect each to proper filter acitivity. Just like in this example:  
  2. Click “FOR EACH ORA“, go to “Settings“, in Items clik Add dynamic content and type:  @activity('ORA CFG').output.value . We are telling ForEach, that it has to iterate over results returned in “ORA CFG”. They are stored in JSON array.
  3. Do this also in FOR EACH PG. Type:  @activity('PG CFG').output.value
  4. Now, you can edit Activities and add only “WAIT”  activity to debug your pipeline. I will skip this part. Just remember to delete WAIT block at the end of your tests.

 

Inside ForEach – GET MAX ORA -> COPY ORA -> UPDATE WATERMARK ORA

Place these blocks into FOR EACH ORA. Justo go there, click “Activities” and then 

Every row, that ForEach activity is iterating over, is accessible using @item() .

And every column in that row, can be reached just by using  @item().ColumnName .

Remember, that you can surround every expression in brackets @{ }  to use it as a string interpolation. Then you can concatenate it with other strings and expressions just like that:  Value of the parameter WatermarkColumn is: @{item().WatermarkColumn}

 

GET MAX ORA

  1. Go to “GET MAX ORA“, then Settings
  2. Choose your source dataset “ORA“, Use Query: “Query” and click Add dynamic content
  3. Type  SELECT MAX(@{item().WatermarkColumn}) as maxd FROM @{item().SRC_tab} . This will get a maximum date in your watermark column. We will use it as RIGHT BOUNDRY for delta slice.
  4. Check if  First row only is turned on.

It should look like this:

 

COPY ORA

Now the most important part :> Copy activity with a lot of parametrized things… So pay attention, it’s not so hard to understand but every detail matters.

Source

  1. In source settings, choose Source Dataset to ORA, in Use query select Query.
  2. Below Query input, click Add dynamic content and paste this:

Now, this needs some explanation 🙂

 

 

  • ORA CFG output has all columns and their values from our config.
  • We will use SRC_tab as table name, Cols as columns for SELECT query, WatermatkColumn as LastChange DateTime column name and WatermarkValue for LEFT BOUNDRY (greater than, >).
  • GET MAX ORA output stores date of a last updated row in the source table. So this is why we are using it as a RIGHT BOUNDRY (less than or equal, <=)
  • And the tricky thing, ORACLE doesn’t support implicit conversion from the string with ISO 8601 date. So we need to extract it properly with TO_DATE function.

So the source is a query from ORA dataset:

 

Sink

Sink is our destination. Here we will set parametrized table name and truncate query.

  1. Select 
  2. Parametrize TableName as dynamic content with value:  @{item().DST_tab}
  3. Also, do the same with Pre-copy script and put there:  TRUNCATE TABLE @{item().DST_tab}
As De jan properly noticed in comments, you are not obligated to use any Pre-copy script here. You can leave this box empty or run other commands like partitions switching.

It should look like this:

 

Mappings and Settings

All other things should just be set to defaults. You don’t have to parametrize mappings if you just copy data from and to tables that have the same structure.

Of course, you can dynamically create them if you want, but it is a good practice to transfer data 1:1 – both structure and values from source to staging.

 

UPDATE WATERMARK ORA

Now we have to confirm, that load has finished and then update previous watermark value with the new one.

We will use a stored procedure. The code is simple:

Create it on your Azure SQL database. Then use it in ADF:

  1. Drop  into project, connect constraint from COPY ORA into it. Rename as “UPDATE WATERMARK ORA” and view properties.
  2. In SQL Account set 
  3. Now go to “Stored Procedure”, select our procedure name and click “Import parameter”.
  4. Now w have to pass values for procedure parametrs. And we will also parametrize them. Id should be  @{item().id}  and NewWatermatk has to be:  @{activity('GET MAX ORA').output.firstRow.MAXD} .

 

And basically, that’s all! This logic should copy rows from all Oracle tables defined in the configuration.

We can now test it. This can be done with “Debug” or just by triggering pipeline run.

If everything is working fine, we can just copy/paste all content from “FOR EACH ORA” into “FOR EACH PG“.

Just remember to properly rename all activities to reflect new source/destination names (PG). Also, all parameters and SELECT queries have to be redefined. Luckily PostgreSQL support ISO dates out of the box.

Source code


Here are all components in JSON. You can use them to copy/paste logic directly inside ADF V2 code editor or save as files in GIT repository.

Below is source code for pipeline only. All other things can be downloaded in zip file in “Download all” at the bottom of this article.

Pipeline

 

Download all

IncrementalCopy_ADFv2.zip

 

80 thoughts on “Azure Data Factory V2 – Incremental loading with configuration stored in a table – Complete solution, step by step.

  1. Dear Mr. Pawlikowski,
    This is the best post on ADF v2, that I found so far on the internet. I have passed the link to this blog post to a couple of colleagues in Bengaluru, India.
    Thank you very much for sharing your knowledge.

  2. Hi, thank you for this blog post – it’s really good. I was looking around for a way to simplify and use configurations for loading data from several source databases and this is perfect. I would also say that the “columns” config could actually also contain the SQL itself (if you needed to use any functions on the source data while loading or even join multiple tables together in the source query).

    Anyway, great job on this and very helpful.

  3. Very Impressive Azure Tutorial. The content seems to be pretty exhaustive and excellent and will definitely help in learning Azure Tutorial.I’m also a learner taken up Azure Training and I think your content has cleared some concepts of mine. While browsing for Azure Course on YouTube i found this fantastic video on Azure Course. Do check it out if you are interested to know more on Azure Tutorial.:-https://www.youtube.com/watch?v=8_0qGTdHZSs&t=51s

  4. Hi Michał ,

    Thanks for your sharing. While when I follow your steps, don’t know why there is error in CopyORA activity, which showing no source dataset found. While when I did a simply copy activity, it’s no problem to define the same source dataset.

    We had tried troubleshooting for many ways, but not able to resolve it, can you help to give some suggestion? Thank you very much. =)

    “Code”: 11000,
    “Message”: “‘Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=The column ‘UPDATE_DATE1’ as defined in the source DataSet could not be located in the actual source. Check the configuration to ensure that all columns in the source DataSet does exist in the actual source.,Source=Microsoft.DataTransfer.ClientLibrary,'”,

      1. Hi Michał ,
        Thanks for the reply. „UPDATE_DATE1” is the column in Oracle source table, which aims to compare data in ORA Copy Activity. We did test the column exist, and previous activity also had refereed the dataset successfully. It will be helpful if you can give some suggestion to trouble shooting.

      1. Oh.. Sorry for hearing that, and hoping your getting better, and recovery soon. =)
        The issue is just mapping error, after we adjust to the correct column name, the issue resolved. Thank you.

  5. It’s an awesome post and really very detailed thanks for writing this. I am new to ADF and currently Working on a project to load multiple JSON files (in diff structures) to target tables using ADF v2 so can that also be dynamically build like it? If you have answer please throw some light

    1. Devendra Kumar, oh well, honestly it depends.
      Everything that has “Add dynamic content” can be parametrized.
      And if you are asking about dynamic content remapping it also depends on the structure of json files. They can have a set of objects or arrays, can have a lot of tables defined inside or just only one. Unfortunately, there will be always a problem with semi-structured files, which requires to parse and check their structure 🙁
      If the schema is the same for all files – i think it will be possible to do it dynamically. If not – well, hard to say 😐

    2. Look also at this site:
      https://docs.microsoft.com/en-us/azure/data-factory/supported-file-formats-and-compression-codecs#json-format

      Head to the example starting with:
      Sample 2: cross apply multiple objects with the same pattern from array

      Note this:

      If the structure and jsonPathDefinition are not defined in the Data Factory dataset, the Copy Activity detects the schema from the first object and flatten the whole object.
      If the JSON input has an array, by default the Copy Activity converts the entire array value into a string. You can choose to extract data from it using jsonNodeReference and/or jsonPathDefinition, or skip it by not specifying it in jsonPathDefinition.

      It means that it can be controlled, but it will be quite of challenge 😐

  6. Hi Michal, thank you for great explanation, looking on your example I was able to create ADF loading on prem oracle tables to data lake gen2 into blobs containers. Did you come across the issue loading tables as files incrementally instead of overnighting previous loads?
    Bruce.

    1. Hi Bruce.
      I’m afraid you must describe it a little widely 🙂
      I did not use Data Lake Storage as a sink, but as far as I understand this is not a case (everything is working fine with copying and detecting delta on Oracle side?)

      So maybe you just want to know how to handle deltas (incremental load) in a structures like Data Lake Storage?
      As far as I know ADLS does not have any mechanism to apply new portion of data into already existing file. So you have to implement it as a deltas partitioned by folder path and file names. So every new portion of data will sink into different folder and different file, but automatically 🙂

      Look at this article and it’s point number 2:
      https://www.blue-granite.com/blog/four-tips-for-using-azure-data-factory-to-load-your-data-to-azure-data-lake-store

      Choose your scenario.

      Then look at documentation of ADLS (by the way, are you using v1 or v2?)
      https://docs.microsoft.com/en-us/azure/data-factory/v1/data-factory-azure-datalake-connector#the-partitionedby-property

      You have to use partitionedBy property to store every incremental in a separate path/file, then – depending on your architecture – implement a mechanism that will parse all the data and make what you want in.ex. particular type of slowly changing dimension or history table or maybe current snapshot? Sky is the limit ;D

      Summing it up:
      make your incremental loads from Oracle sink in ADLS with partitioning, it will put them in different paths and files. Then use them as you want with a tool as you like (U-SQL, Spark, Data Warehouse T-SQL etc…)

    1. I believe fileName can be parametrized same as folderPath. They are just expression fields.

      “fileName”: {
      “value”: “EBC.rpt_BriefingActivitySummary.tsv”,
      “type”: “Expression”
      },
      “folderPath”: {
      “value”: “@concat(‘/Snapshots/EBC/rpt_BriefingActivitySummary/’, formatDateTime(pipeline().parameters.scheduledRunTime, ‘yyyy’), ‘/’, formatDateTime(pipeline().parameters.scheduledRunTime, ‘MM’), ‘/’, formatDateTime(pipeline().parameters.scheduledRunTime, ‘dd’), ‘/’)”,
      “type”: “Expression”
      }

  7. Hi Michal, you are absolutely correct, fileName declared as pipeline parameter and filled with value at sink destination. Folder path can be simplified:

    “folderPath”: {
    “value”: “@concat(‘/dev-raw-data-zone/oracle_erp_full_tables/’,formatDateTime(pipeline().parameters.windowStart, ‘yyyy/MM/dd’))”,
    “type”: “Expression”
    }
    Want to thank you again, your example is a most complete, understandable and comprehensive learning guideline I was able to find on line.
    Bruce.

  8. How is it that you can have truncate table in your pipeline? Your case is not appending new records to a table that holds everything, but just coping new records to a staging table, right?
    Simply removing truncate table statement would append records. it would work for system that never updates records. If records are modified and they come over again, there would be a need to do some merging or overwriting.

    1. @De jan
      Thank you for pointing that out 🙂

      Of course, it always depends on our needs.
      This example shows, that you can use “Pre-copy script” before loading will start.
      In another case, this field could be empty or, for example, run partition switching or even partition truncating (yes, since MSSQL 2016 version we can truncate partitions).

      ETL staging is a huge topic. I used to work with many scenarios in different projects, so maybe this was too obvious for me, that there is a choice 🙂
      I have added relevant information under the example of “pre-script”. Thanks!

  9. Hello Michał,

    First of all, thank you very much for such a detailed guide. It’s been a great help for me while trying to implement an incremental load solution from Azure SQL Database to Azure SQL Data Warehouse.

    However, if it’s not too much trouble, I’d appreciate some help in the COPY ORA section, step 3: “Also, do the same with Pre-copy script and put there: TRUNCATE TABLE @{item().DST_tab}”.

    Could you please elaborate on why this TRUNCATE TABLE is needed? As it stands, my baseline version of the target table is being truncated and once the pipeline ends it remains empty.

    I’m struggling to understand what the role of this TRUNCATE command is.

    Thanks in advance.

    Regards,
    Pedro

  10. Hello Michal

    Thanks for sharing the post, it’s really nice,
    Can you please suggest me something about:

    I have a csv file in blob storage that is updated daily with change in name of file (e.g. Products.25-03-2018 and data is continuously changed every day). I created a data factory to take csv file from blob storage to Azure SQL database, I cannot figure out how I can resolve this issue? may be dynamic content?
    if you can suggest me something to do, that would be great !

    Second, in your example above if the source and sink is same that is Azure SQL database, how it will work?

    Thanks in advance

    Fraz

    1. Hi Fraz!
      So, if I understand it correctly, you have a problem with dynamic parametrization of a source file.
      And the pattern of the filename is always: Products.DD-MM-YYYY.csv

      If this is a case when you just need to take Products.25-03-2018.csv by the day 25.03.2018 and for that particular day pipeline will be run – then it’s a matter of using two (or three) functions for dynamic concatenation in your source file expression.

      Look at the documentation here:
      https://docs.microsoft.com/en-us/azure/data-factory/control-flow-expression-language-functions#date-functions

      1. THE CURRENT DATE
      You can always return the current date and time in UTC format using: utcnow().
      Now the question is – do you have files with date taken from UTC timestamp or maybe it’s in another timezone? And if this timezone has daylight saving time you should also consider it in your expression. Depending on the answers you can use function addhours() because it looks like Data Factory expressions do not have any timezone aware date functions (yet?)

      2. FORMATTING TO YOUR NEEDS
      Then, after using utcnow() and anything else that will give you proper date, now you can format it to the string and using custom format.
      Look at formatDateTime() function, you can use it with custom format declaration taken directly from .NET platform. Its described here: https://docs.microsoft.com/en-us/dotnet/standard/base-types/custom-date-and-time-format-strings

      So finally it should like something similar to this:
      formatDateTime(utcnow(), ‘dd-MM-yyyy’)

      3. CONCATENATE STRING TO MATCH YOUR FILES:
      So in the final stage, it should be concatenated with all names required to identify your file. Here we can use another function, but of course, you can use string interpolation explained in the article 🙂

      concat(‘Products.’, formatDateTime(utcnow(), ‘dd-MM-yyyy’), ‘.csv’)

      The answer to your question regarding if it does matter which db source and db sink you will have … Well in general – no, but of course I’m using some special functions like TO_DATE which does not exist in SQL Server so you have to replace anything that applies to ORACLE or PG with the proper replacement available in MS database 😉 That’s all, everything else should work just like in a example above ;]

      Regards,
      m.

      1. Hi Michal
        Thank you very much for the detailed explanation, I am going to try it today and if there is anything I could not do, I may bother you once more,

        Thanks again, appreciate your support!
        Fraz

  11. My files are on ADLS gen 2 raw area. I want to copy few columns from individual files to a ADLS gen 2 STAGE area . I want to achieve it dynamically. I just have one copy activity. I want file mappings to be stored in sql table, read it and copy only required columns for individual files as per mapping.. Any help please.

    If I understand correctly, you have to define exact column names of individual files on data lake storage to utilize schema/column mapping option of ADF. That way it can’t be dynamic. The only way seems to be with databricks.

    1. Hi Sanjeet.
      This requires a different approach.
      The SQL commands are fortunately flexible, so you can construct a query as you like. Column lists can be just a definition hardcoded dynamically as a query string. Then we can easily store them just as a text in the database.

      With semi-structured files like CSV this is something different. Unfortunately, we do not use SQL to query them.

      Nevertheless, you need to dig deeper into the topic of schema mappings:
      https://docs.microsoft.com/bs-latn-ba/azure/data-factory/copy-activity-schema-and-type-mapping
      With some definitions stored in your database, you need to build dynamically a mapping section "mappings": [] . TIP: if your database support JSON formatting, maybe you should try to convert your query to JSON before you will retrieve mappings from config table. At least I would do it if I was to submit JSON in ADF 🙂
      Then you can pass this mapping definition as a dynamic parameter in your copy activity:
      http://sql.pawlikowski.pro/wp-content/uploads/2019/05/mapping_add_dynamic_content.png

    2. Just one more thing.
      Remember that you can use Get Metadata Activity to dig into the specific file and check the structure.
      https://docs.microsoft.com/bs-latn-ba/azure/data-factory/control-flow-get-metadata-activity
      I didn’t test it, but it looks like ADLS Gen2 is supported and you can get structure and even columnCount. This is something that can be used also to get something dinamically.
      I imagine a scenario, where I can use it to retrieve the structure and then parse it (with ADF expressions or maybe in my SQL Server Database with some additional JSON parsing commands) 🙂

  12. Dear Michal,

    Thanks, this article is really helpful.

    Please if you could advise that if this will also take care of Updated columns ? i.e., if any value has been updated in source database then it would be updated in dest.

    Regards,
    Ashish

    1. Ashish,
      Hmm, If I understood you correctly, the answer is no.
      This mechanism is using watermark columns to detect the change. It’s a simple one, if value has been changed since last upload – take the entire row.

      Updates on source – that’s just a different issue and I do not describe it in my article, since it is one of the most common used this days.
      So if you will update one column, for example COUNTRY_NAME, without marking the change in watermark column which is UPDATE_DATE, the change will not gonna be tracked and no update will be send to destination system.

      1. Dear Michal,

        Many thanks for your prompt response.

        Apologies , I wasn’t clear in my query. My original query was that ‘If a column is updated in a table along with it’s watermark column would that update in target table as well?’.

        For example if ‘COUNTRY_NAME’ has been updated along with watermark column which is ‘UPDATE_DATE’ would that be updated in target database ?

        Thanks ,
        Ashish

        Thanks ,
        Ashish

        1. Ok, I think, that your question is related to the fact, that you are confusing stage load with replication 🙂 With my example you will copy entire row, from source table to the destination staging table. Table record on the destination is never updated. It’s always inserteded as a new row. This is a part of a process that is most common in data warehousing. Where you first load all rows with all column values first to the empty table, then accordingly to your needs, you merge or refresh partitions to your core tables updating old rows with new ones.

          1. Thank you so much Michal , this is perfect article i have found on internet for Azure 🙂

            To maintain our database , we will add one more activity in our flow which is to delete and insert updated rows from staging table to target data system 🙂

            Thanks,
            Ashish

            1. That should be fine, as long as it fits your scale and time window.

              You see, for bigger systems (over dozens of terabytes of storage, hundreds or even dozens of gigabytes of data to load), DELETE operations cost too much for the engine and generates bloat in your transaction logs.
              In Azure Data Warehouse (or Parallel Data Warehouse, APS systems or any other MPP system) it’s better to divide the load into partitions and create them once again by UNIONing old and new data, switching the partitions and drop the old one. A good example is a usage of CTAS (create table as select) mentioned in good practices here: https://docs.microsoft.com/en-us/azure/sql-data-warehouse/sql-data-warehouse-best-practices#minimize-transaction-sizes

              You can try to build it also in your data warehouse, but of course only if scale will be (or is) that big.

              That sounds more complicated to implement, but in the end is cheaper than DELETE process (BULK INSERT is always faster, and delete also requires mechanisms like ghost row deletion, which is in Sql Server (click here) or Vacuum process over dead tuples (click here) in PostgreSQL). But this is another topic 🙂

  13. Hi Michal,

    Great Article!!!

    I have a question on timestamp column. Suppose we have thousands of tables for which timestamp column is not there and do not want to alter those tables to add such column as well.

    Wow would i fetch the delta values for thousands of tables which doesn’t have timestamp value?

    Please suggest

    1. That is really tricky question because the answer is of course like this: “it depends”.

      The first and most obvious scenario is to use the trigger on a table and create the logic to store updated row identifier and the timestamp somewhere else (in prepared structure).
      But again, if we are talking about critical transaction system with the requirement of having as short INSERTS/UPDATES as possible – triggers are the ugliest and most inefficient way to do it. But sometimes there is no other way.

      Anyway, if we are talking about SQL Server as a source, there are some other possible solutions, less cpu and io intensive than triggers. Change data capture or change tracking. You can check them in the documentation. But first read all requirements and limitations, there are always some points to consider.

  14. Dear Michal,

    We have implemented this solution and this is working as expected , thank you.

    Need a little help in SQL Procedure side to update the timestamp in Watermark table so we have created parameterised proc. and passing table name as well in variable, we had to create Dynamic sql to achieve this

    This is giving us error ‘Conversion failed when converting date and/or time from character string’. We are using below syntax in our procedure

    set @SQL = ‘update’ + @tablename + ‘set watermarkvalue =’ + @Newwatermarkvalue + ‘where id =’ + @id;

    Execcute sp__executesql @SQL

    please can you advise on any workaround for this

    Regards,
    Ashish

    1. Hi Ashish,

      first of all – what stands behind your parameters? What type they are and what values they have?
      Your error message indicates, that you have a problem with conversion to date. But without any values, I cannot suggest any solutions.

      Below is just a simple example, but it all depends on your scenario if this can be helpful or not.
      It uses temporary objects and rollbacks at the end, so you can run it anywhere, anytime.


      SET NOCOUNT ON;
      BEGIN TRAN;

      CREATE TABLE #test_watermark
      (
      id INT
      DEFAULT 1,
      watermarkvalue DATETIME
      DEFAULT GETDATE()
      );

      GO

      INSERT INTO #test_watermark
      DEFAULT VALUES;
      GO

      CREATE PROC #up_watermark
      (
      @tablename sysname,
      @Newwatermarkvalue NVARCHAR(100),
      @id NCHAR(1)
      )
      AS
      BEGIN

      DECLARE @SQL NVARCHAR(MAX);
      SET @SQL = N'UPDATE ' + @tablename + N' SET watermarkvalue = ' + @Newwatermarkvalue + N' where id = ' + @id;
      PRINT @SQL;
      EXEC sp_executesql @SQL;
      END;

      GO

      SELECT *
      FROM #test_watermark;

      EXEC #up_watermark @tablename = '#test_watermark', -- sysname
      @Newwatermarkvalue = '''2010-01-01''', -- nvarchar(100)
      @id = '1'; -- nchar(1)

      SELECT *
      FROM #test_watermark;

      ROLLBACK;

      1. Hi Michal,

        Thank you.

        I have created below procedure and can see from ADF that below values are getting passed.

        create PROCEDURE sp_UpdateWatermark
        (
        @id nvarchar(10),
        @NewWatermark datetime,
        @TableName NVARCHAR(200)
        )
        AS
        BEGIN
        SET NOCOUNT ON;
        DECLARE @Sql NVARCHAR(500)
        SET @Sql = ‘Update ‘ + @TableName + ‘set WatermarkValue=’ + convert(varchar(200), @NewWatermark,120 ) + ‘Where id =’ + @id;
        EXECUTE sp_executesql @Sql
        END

        —————values passed from ADF ———-
        “id”: {
        “value”: “4”,
        “type”: “String”
        },
        “NewWatermark”: {
        “value”: “2019-08-01T12:37:43Z”,
        “type”: “DateTime”
        },
        “TableName”: {
        “value”: “CFG.DW_ACCOUNT_CHARGE_DTL_HIST_V”,
        “type”: “String”
        }

        1. Hi Ashish,
          ok, now i get it. So the problem is with the ZULU time passed from ADF expressions and you want to save the new value in the config table. That is strange, I remember that my solution worked out of the box in Azure SQL Database and no conversion was needed. Nevertheless, you can pass the date with zulu time as string, then convert it to date (as 127) and back to string (as 120).
          SELECT '''' + CONVERT(NVARCHAR(100), CONVERT(datetime, '2019-08-01T12:37:43Z', 127), 120) + ''''

          Entire procedure:
          https://paste.ofcode.org/zH7rp7aFpHzhKL3a492LMH

  15. Hi Michal,

    thank you. It’s me again 🙂 apologies for so many queries.

    We have now copied data in our Staging tables. You have already suggested us about CTAS options to maintain data warehouse. Is that achievable in ADF itself to write ?

    We want to update and insert in dimension tables in the same pipeline.

    Regards,
    Ashish

  16. Hi Michal,

    Please ignore my above question. I have created a parameterised procedure to achieve this in ADF pipeline 🙂

    Thanks,
    Ashish

  17. Hi Michal..good afternoon. I am working of change tracking mechanism for my project. We are on Hybrid Model with Azure. We are modernizing all our legacy applications. I already completed working on initial data loads from On-Prem multiple sources to new system (SQLMI on Azure). Now I have to implement change tracking mechanism …(When ever there are Updates and Inserts to the On-Prem sources…I should run the change tracking mechanism process on ADF to do the delta loads). I did a POC on ADF for a single source and single target…my ADF pipeline loads new inserts but is not doing Updated columns. Please help. I need to develop a complex process with multiple data sources and multiple destination now.

    1. Hi Lalitha,

      Are there any watermark columns available in your On-prem tables ? If yes , then you can use Merge query to take care of your Update and Insert.

      Obviously , you need to create a Dynamic Merge Store Procedure. So , do a delta copy from Source to a Staging layer and from Staging Layer do a Merge

      Hope this helps.

      Regards,
      Ashish

  18. Simply to the point for the implementation. I did it and it is working as expected.

    Great Article Michal !!!

  19. Yes Michal. It was really.

    I am having a project requirement where I need to source the data from different source systems like On-Premise SQL Server, SalesForce, SAP System, SharePoint, SFTP Drive. I was thinking about how I can implement it in ADF 2 with some simplification (The control table approach)

    The approach here mentioned was really helpful and I am thinking to implement the same for my project requirements (For the Structured source systems like SQL Server, Salesforce & SAP System).

    1. Cool, glad to hear that 🙂 They (Microsoft) should use it as general template 😀 Anyway, you can always create the template yourself and use it multiple times in your pipelines.
      Good luck in your projects!

  20. impressive. I am new to ADF. and looking for delta and truncate reload pipelines. this one is great.
    without using blob storage.

    my case is from one source MSSQL(on-prem) to Azure DataWarehouse. will this ADF pipeline work as well? Please let me know.

    1. Hi eduardo,
      Sure, it should work 🙂
      As a matter of fact, the example above was originally created on Azure Data Warehouse in the project I was involved in. We had to load data from different sources (mysql, oracle, mssql) and different tables.

  21. Hi Michael,

    I am following along your ADF implementation. I only have one source for the time being and is MSSQL
    server on-prem to AZURE DW. QQ, I am in the part of COPY ORA (in my CASE COPY on-prem).
    How do I convert this query to just MSSQL server. Instead of to_date (which i oracle specific) in my case
    in MSSQL server and is datetime and display like this .(2019-09-18 12:18:24.617). Any help highly appreciated.

    SELECT
    @{item().Cols} FROM @{item().SRC_tab}

    WHERE

    @{item().WatermarkColumn} >
    TO_DATE(‘@{item().WatermarkValue}’, ‘YYYY-MM-DD”T”HH24:MI:SS”Z”‘)
    AND
    @{item().WatermarkColumn} <=
    TO_DATE('@{activity('GET MAX ONPREM').output.firstRow.MAXD}', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')

  22. Michal,

    when i debug it, it tells me that COPY ORA (in my case ONPREM) TABLE IS REQUIRED FOR COPY
    ACTIVITY. Below is the query for my source. TAB.

    SELECT
    @{item().Cols} FROM @{item().SRC_tab}

    WHERE

    @{item().WatermarkColumn} >
    CONVERT(‘@{item().WatermarkValue}’, ‘YYYY-MM-DD”T”HH24:MI:SS”Z”‘)
    AND
    @{item().WatermarkColumn} <=
    CONVERT('@{activity('GET MAX ONPREM').output.firstRow.MAXD}', 'YYYY-MM-DD"T"HH24:MI:SS"Z"')

    and SINK TAB is set to
    @{item().DST_tab}

    Any Help highly appreciated.

    1. Hi Daniel,
      TO_DATE was just the way to convert string to ISO 8601 date in ORACLE. The lack of support native iso dates, makes it harder to just convert it. But they allow us to manually set the string, so the ugly YYYY-MM-DD"T"HH24:MI:SS"Z" was the only way to make it possible 🙂

      And in SQL Server things are different. There is a native support for ISO 8601, also with timezone which is ZULU in our case (the letter ‘Z’ at the end) and the ZULU means it is in GMT timezone which means UTC(UCT) standard 🙂
      Well, to make it easier to understand, let’s quote Wikipedia:

      Zulu (short for “Zulu time”) is used in the military and in navigation generally as a term for Universal Coordinated Time (UCT), sometimes called Universal Time Coordinated ( UTC ) or Coordinated Universal Time (but abbreviated UTC), and formerly called Greenwich Mean Time.

      Anyway, to properly convert string value from ADF into a proper DATETIME or SMALLDATETIME (the only supported types here) you must reference the documentation: CAST and CONVERT (Transact-SQL)

      Look at the notation of CONVERT function and date convert number 127 – ISO8601 with time zone Z. You can easily test the conversion on SQL Server:


      DECLARE @string VARCHAR(100) = '2019-10-21T10:01:22Z';

      SELECT CONVERT(DATETIME, @string, 127);

      The result will be: 2019-10-21 10:01:22.000

      Now, use this convertion in your example, without using explicit format string like in Oracle’s TO_DATE and with a proper style number 127, something like this:


      SELECT
      @{item().Cols} FROM @{item().SRC_tab}

      WHERE

      @{item().WatermarkColumn} >
      CONVERT(DATETIME, '@{item().WatermarkValue}', 127)
      AND
      @{item().WatermarkColumn} < = CONVERT(DATETIME,'@{activity('GET MAX ONPREM').output.firstRow.MAXD}', 127)

      I do not have this possibility to test it now, but it should work. Please, check it and tell me what was the result 🙂
      Regards,
      m.

  23. Michal,

    Great explanation. it does compiled. However, when I run debug, I still get TABLE IS REQUIRED FOR COPY ACTIVITY.

    this is the sink activity.

    “sink”: {
    “type”: “SqlDWSink”,
    “allowPolyBase”: false,
    “preCopyScript”: “TRUNCATE TABLE @{item().DST_tab}”,
    “disableMetricsCollection”: false
    },
    “enableStaging”: false
    },
    “inputs”: [
    {
    “referenceName”: “ONPREM”,
    “type”: “DatasetReference”
    }
    ],
    “outputs”: [
    {
    “referenceName”: “AzureSqlDW”,
    “type”: “DatasetReference”,
    “parameters”: {
    “TableName”: {
    “value”: “@{item().DST_tab}”,
    “type”: “Expression”

  24. Michal,

    QQ, Does the table has to pre exists on target database? I still get TABLE IS REQUIRED FOR COPY ACTIVITY. I cannot bypass that ERROR

    Here is the sink activity updated .
    “sink”: {
    “type”: “SqlDWSink”,
    “allowPolyBase”: false,
    “writeBatchSize”: 10000,
    “preCopyScript”: {
    “value”: “TRUNCATE TABLE @{item().DST_tab}”,
    “type”: “Expression”
    },
    “tableOption”: {
    “value”: “@{item().DST_tab}”,
    “type”: “Expression”
    },
    “disableMetricsCollection”: false
    },
    “enableStaging”: false
    },
    “inputs”: [
    {
    “referenceName”: “ONPREM”,
    “type”: “DatasetReference”
    }
    ],
    “outputs”: [
    {
    “referenceName”: “AzureSqlDW”,
    “type”: “DatasetReference”,
    “parameters”: {
    “TableName”: {
    “value”: “@{item().DST_tab}”,
    “type”: “Expression”
    }

      1. I created the target table. However, I got the same error message when debugging it. TABLE IS REQUIRED FOR COPY ACTIVITY.

  25. Michal

    I am trying to copy from service now to on premises SQL and keeping getting error inner activity failed. I would really appreciate your assistance.

  26. Hi Michal – Thanks for the wonderful post.

    How could I join the Lookup output with Metadata output before passing to foreach.
    I would like to process the files from Gen2 to SQL Database based on modified time.
    1) I am using the lookup to get the destination table name and source directory name
    2) I am using the meta data to get the file name from the passed source directory based of modified time.
    I require to combine step 1 and 2 information to pass foreach to process the list of files added to a source directory to the destination SQL table.

    Thanks

    1. Well, it depends. Is there any other column that indicates update/insert date? If not, then it all depends on yours database system, if it has any kind of mechanism that can handle changes in history like change data capture in sql server. If there is nothing, then you can still use triggers to mark update date, perhaps in a different table storing there only key and watermark date. But triggers are the last resort, as they can bring a lot of transactional and performens issues in many scenarios.

      1. Hi Michal,

        Thanks for you comment, please assume that you are not allowed(schema changes) to do anything at source, you have only read permissions. Please suggest an approach.

        Thanks
        Venkat

        1. Well, this is of course the worst scenario. Even if your key is an incremental number/lexicographically ordered string, you can not detect update/delete changes to your data.
          In such situations I saw several possible ways of detecting changes by making a mechanism to calculate differences based upon some algorithm.
          For example, if it is sql server, you can use two functions (not only one, since it will generate collisions to often) checksum and checksum_bin over all columns, then compare the calculated values with previously generated ones. You need, of course, to store them first somewhere, the primary key and checksum numbers. This mechanism is not ideal, a pair of checksums can generate exactly the same numbers even if some of the columns were changed. That is unfortunate, but rare (anyway I was able to generate such case once in my life)
          You can use better hash function, like hashbytes but you need to cast every value to a string, and it is really slow… Like 20 times slower than checksum. And the delete and insert makr just using traditiinal full join with null and not null detection.

          It always depends on the size of the data. Sometimes it is better to just do the full and by this time, process the change of adding additional watermark columns, even if it will take months…

Leave a Reply