Hello, I have a similar to this request :
The logic of the flow :
There are three data transformation steps :
When the raw data from raw_table1 / raw_table2 arrives, we transform this data and inserting it into trans_table1, trans_table2
(this is done by executing the actions trans_table1.sqlx and trans_table2.sqlx). The data from both tables : raw_table1 and raw_table2 arrives at different time, that means that we cannot schedule the actions, but instead, we need to trigger an update automatically upon the data arrival (this is just to say that both actions trans_table1.sqlx and trans_table2.sqlx are executed independently of each other). Also, there is no order of data arrival : the data for raw_table2 can arrive earlier than raw_table1 or backwards. Then there is agg_table with aggregated data, which depends on these two tables (trans_table1 and trans_table2. Agg_table needs to be updated as soon as both trans_table1.sqlx and trans_table2.sqlx are executed (in other words, when the last among the two actions finished successfully its execution).
My question is the following :
How can I execute the action agg_table.sqlx only after making sure both actions trans_table1.sqlx and trans_table2.sqlx were executed successfully ? If I just mention trans_table1 and trans_table2 in the dependencies of action agg_table.sqlx, while executing the actions trans_table1.sqlx and trans_table2.sqlx, agg_table.sqlx will never be executed.
HERE IS THE CODE :
// just to make it more simple, I don’t insert the data from raw_tables but instead insert the values by //hand, the logic stays the same
trans_table1.sqlx :
config {
type: "operations",
}
TRUNCATE TABLE `dataformtraining2024.transformed.trans_table1`;
INSERT INTO `dataformtraining2024.transformed.trans_table1`
VALUES
("Steven", "Spielberg", "Schindler's list"),
("Steven", "Spielberg", "Jurassic Park")
trans_table2.sqlx :
config {
type: "operations",
}
TRUNCATE TABLE `dataformtraining2024.transformed.trans_table2`;
INSERT INTO `dataformtraining2024.transformed.trans_table2`
VALUES
("Steven", "Spielberg", "Schindler's list"),
("Steven", "Spielberg", "Jurassic Park")
agg_table.sqlx
config {
type: "operations",
dependencies : ["trans_table1.sqlx", "trans_table2.sqlx"] // mention all dependencies
}
INSERT INTO `dataformtraining2024.aggregated.agg_table`
SELECT *
FROM `dataformtraining2024.transformed.trans_table1`
UNION ALL
SELECT *
FROM `dataformtraining2024.transformed.trans_table2`
The issue you faced arises because Dataform typically executes actions in parallel when possible. If trans_table1.sqlx
and trans_table2.sqlx
run concurrently, Dataform might attempt to execute agg_table.sqlx
before they've finished.
Here is a possible solution with Incremental Tables and Declarative Actions:
Incremental Tables:
trans_table1
andtrans_table2
to be "incremental" tables. This means Dataform will only update the tables when the underlying data changes, rather than re-running the entire SQLx action on each execution.Declarative Actions (sql
instead of sqlx
) :
agg_table.sqlx
to a declarative sql
action. This allows Dataform to build a dependency graph based on the relationships between tables.-- trans_table1.sqlx (table type)
config {
type: "table",
schema: "transformed",
name: "trans_table1"
}
SELECT "Steven" AS firstname, "Spielberg" AS lastname, "Schindler's list" AS movie UNION ALL
SELECT "Steven", "Spielberg", "Jurassic Park";
-- trans_table2.sqlx (table type)
config {
type: "table",
schema: "transformed",
name: "trans_table2"
}
SELECT "Steven" AS firstname, "Spielberg" AS lastname, "Schindler's list" AS movie UNION ALL
SELECT "Steven", "Spielberg", "Jurassic Park";
-- agg_table.sqlx (operations type)
config {
type: "operations",
dependencies: [
"dataformtraining2024.transformed.trans_table1",
"dataformtraining2024.transformed.trans_table2"
]
}
INSERT INTO `dataformtraining2024.aggregated.agg_table`
SELECT * FROM `dataformtraining2024.transformed.trans_table1`
UNION ALL
SELECT * FROM `dataformtraining2024.2`;
Key Changes
trans_table1.sqlx
and trans_table2.sqlx
are now explicitly defined as "table" type, allowing Dataform to manage their execution and dependencies effectively.dependencies
property in agg_table.sqlx
uses fully qualified table names to ensure unambiguous dependency tracking.agg_table.sqlx
.Explanation
trans_table1.sqlx
, trans_table2.sqlx
) first. Once both tables are created, Dataform proceeds to execute agg_table.sqlx
as its dependencies are met.trans_table1.sqlx
or trans_table2.sqlx
is triggered due to an update in its source data, Dataform will re-execute the corresponding table action.agg_table.sqlx
explicitly depends on both tables, Dataform will then re-execute agg_table.sqlx
to ensure the aggregated table reflects the latest data.Note:
To make your trans_table
actions truly incremental, you would need to modify the SQL logic to only insert or update rows based on specific criteria (e.g., new dates, changed values, etc.). This is a more advanced topic that would require additional custom code.