Executing a Dataform action only after all dependencies of this action are executed

 

@ms4446 @maffka123 

Hello, I have a similar to this request : 

https://www.googlecloudcommunity.com/gc/Data-Analytics/How-to-add-dependencies-from-js-script-in-Dat...

The logic of the flow :

There are three data transformation steps : 

  • Receiving raw data into tables raw_table1, raw_table2 tables
  • Transforming raw data and inserting the result into trans_table1, trans_table2 tables
  • Aggregating transformed data into agg_table

When the raw data from raw_table1 / raw_table2 arrives, we transform this data and inserting it into trans_table1, trans_table2

(this is done by executing the actions trans_table1.sqlx and trans_table2.sqlx). The data from both tables : raw_table1 and raw_table2 arrives at different time, that means that we cannot schedule the actions, but instead, we need to trigger an update automatically upon the data arrival (this is just to say that both actions trans_table1.sqlx and trans_table2.sqlx are executed independently of each other). Also, there is no order of data arrival : the data for raw_table2 can arrive earlier than raw_table1 or backwards. Then there is agg_table with aggregated data, which depends on these two tables (trans_table1 and trans_table2. Agg_table needs to be updated as soon as both trans_table1.sqlx and trans_table2.sqlx are executed (in other words, when the last among the two actions finished successfully its execution). 

My question is the following : 

How can I execute the action agg_table.sqlx only after making sure both actions trans_table1.sqlx and trans_table2.sqlx were executed successfully ? If I just mention trans_table1 and trans_table2 in the dependencies of action agg_table.sqlx, while executing the actions trans_table1.sqlx and trans_table2.sqlx, agg_table.sqlx will never be executed. 

 

HERE IS THE CODE : 

//  just to make it more simple, I don’t insert the data from raw_tables but instead insert the values by //hand, the logic stays the same

trans_table1.sqlx : 

config {

 type: "operations", 

}

TRUNCATE TABLE `dataformtraining2024.transformed.trans_table1`;

INSERT INTO `dataformtraining2024.transformed.trans_table1`

VALUES

("Steven", "Spielberg", "Schindler's list"),

("Steven", "Spielberg", "Jurassic Park")

 

trans_table2.sqlx : 

config {

 type: "operations", 

}

TRUNCATE TABLE `dataformtraining2024.transformed.trans_table2`;

INSERT INTO `dataformtraining2024.transformed.trans_table2`

VALUES

("Steven", "Spielberg", "Schindler's list"),

("Steven", "Spielberg", "Jurassic Park")

 

agg_table.sqlx

config {

  type: "operations", 

  dependencies : ["trans_table1.sqlx", "trans_table2.sqlx"] // mention all dependencies

}

INSERT INTO `dataformtraining2024.aggregated.agg_table`

SELECT * 

  FROM `dataformtraining2024.transformed.trans_table1`

UNION ALL 

SELECT * 

  FROM `dataformtraining2024.transformed.trans_table2`

0 1 53
1 REPLY 1

The issue you faced arises because Dataform typically executes actions in parallel when possible. If trans_table1.sqlx and trans_table2.sqlx run concurrently, Dataform might attempt to execute agg_table.sqlx before they've finished.

Here is a possible solution with Incremental Tables and Declarative Actions:

  1. Incremental Tables:

    • Switch trans_table1 andtrans_table2 to be "incremental" tables. This means Dataform will only update the tables when the underlying data changes, rather than re-running the entire SQLx action on each execution.
  2. Declarative Actions (sql instead of sqlx) :

    • Convert agg_table.sqlx to a declarative sql action. This allows Dataform to build a dependency graph based on the relationships between tables.
-- trans_table1.sqlx (table type)
config { 
  type: "table",
  schema: "transformed",
  name: "trans_table1" 
}
SELECT "Steven" AS firstname, "Spielberg" AS lastname, "Schindler's list" AS movie UNION ALL
SELECT "Steven", "Spielberg", "Jurassic Park";

-- trans_table2.sqlx (table type)
config {
  type: "table",
  schema: "transformed",
  name: "trans_table2"
}

SELECT "Steven" AS firstname, "Spielberg" AS lastname, "Schindler's list" AS movie UNION ALL
SELECT "Steven", "Spielberg", "Jurassic Park";


-- agg_table.sqlx (operations type)
config {
  type: "operations",
  dependencies: [
    "dataformtraining2024.transformed.trans_table1",
    "dataformtraining2024.transformed.trans_table2"
  ] 
}

INSERT INTO `dataformtraining2024.aggregated.agg_table`
SELECT * FROM `dataformtraining2024.transformed.trans_table1`
UNION ALL
SELECT * FROM `dataformtraining2024.2`;

Key Changes

  • Table Type: Both trans_table1.sqlx and trans_table2.sqlx are now explicitly defined as "table" type, allowing Dataform to manage their execution and dependencies effectively.
  • Dependencies: The dependencies property in agg_table.sqlx uses fully qualified table names to ensure unambiguous dependency tracking.
  • Terminology: The terms "incremental" and "declarative" have been removed to avoid confusion, as they were not strictly accurate in this context.
  • Removed Unnecessary Configuration: Removed the unnecessary "operations" type declaration for agg_table.sqlx.

Explanation

  1. Initial Run: Dataform executes the table actions (trans_table1.sqlx, trans_table2.sqlx) first. Once both tables are created, Dataform proceeds to execute agg_table.sqlx as its dependencies are met.
  2. Subsequent Runs:
    • If either trans_table1.sqlx or trans_table2.sqlx is triggered due to an update in its source data, Dataform will re-execute the corresponding table action.
    • Because agg_table.sqlx explicitly depends on both tables, Dataform will then re-execute agg_table.sqlx to ensure the aggregated table reflects the latest data.

Note:

To make your trans_table actions truly incremental, you would need to modify the SQL logic to only insert or update rows based on specific criteria (e.g., new dates, changed values, etc.). This is a more advanced topic that would require additional custom code.