Dataform workflow orchestration

I have set up workflow to run a specific tag in a branch as per the guide here, but I'm seeing that the workflow triggers a new reload regardless of the state of the previous reload.

I was expecting Workflow or Dataform to wait for the existing task to complete.

Is there a way to configure the Workflow or Dataform to 

  1. skip a reload if the same reload already exists
  2. retry a reload ?

Maybe by using an API call in workflow to get the state of the last execution ?

Solved Solved
0 5 2,496
1 ACCEPTED SOLUTION

Thanks for the clarification. Dataform will indeed re-create/update all tables every time a Workflow is executed. 

In Cloud Workflows, you can add a condition that checks if there are invocations running with a particular tag and end the workflow if that's the case. We'll also add that option when we release a scheduler in the Dataform app. 

Here's an example:

main:
    steps:
    - init:
        assign:
          - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
    - current_invocations
      call: http.get
      args:
        url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations?filter=invocationConfig.includedTags:daily&filter=state:RUNNING"}
auth:
type: OAuth2 result: running_invocations - decide_to_run: switch: - condition: ${len(running_invocations.body) > 0} next: end - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}

EDIT: updated code snippet with reply below

View solution in original post

5 REPLIES 5

Hi Frans,

By new reload, do you mean a create or replace table? 

If you are using type: "table", each workflow trigger would recreate the table from scratch. You can also configure your table to update incrementally so your data is not reprocessed unnecessarily. 

Would that solve your use-case? 

The new reload I'm referring to is an execution of a tag in dataform - as seen under the "workflow execution logs".

If a "start execution" is done on a tag, then multiple tables can be reloaded at once.

The problem is that nothing prevents multiple instances of executing a tag in a schedule.

We have a use case where we want to schedule a reload on a tag every 5 minutes, but the reload task sometimes runs over 5 minutes ... so we want to skip creating a new reload if a reload has been triggered before but is still running.

Thanks for the clarification. Dataform will indeed re-create/update all tables every time a Workflow is executed. 

In Cloud Workflows, you can add a condition that checks if there are invocations running with a particular tag and end the workflow if that's the case. We'll also add that option when we release a scheduler in the Dataform app. 

Here's an example:

main:
    steps:
    - init:
        assign:
          - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
    - current_invocations
      call: http.get
      args:
        url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations?filter=invocationConfig.includedTags:daily&filter=state:RUNNING"}
auth:
type: OAuth2 result: running_invocations - decide_to_run: switch: - condition: ${len(running_invocations.body) > 0} next: end - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}

EDIT: updated code snippet with reply below

Thanks for the suggestion. Just needed a minor change

- decide_to_run:
switch:
- condition: ${len(running_invocations.body) > 0}
next: end

Hi All,

I'm invoking Dataform using Cloud workflows. I can invoke single sqlx file using tags(includedTags). But instead of tags, I want to invoke it using Dataform actions. Is there anyway? Please find the below yaml file.

 

- createCompilationResult:
    call: http.post
    args:
        url: ${"https://dataform.googleapis.com/v1beta1/projects/" + project_id + "/locations/" + location + "/repositories/" + repository_id + "/compilationResults"}
   auth:
       type: OAuth2
   body:
   gitCommitish: git_branch
   result: compilationResult
- createWorkflowInvocation:
       call: http.post
       args:
          url: ${"https://dataform.googleapis.com/v1beta1/projects/" + project_id + "/locations/" + location + "/repositories/" + repository_id + "/workflowInvocations"}
      auth:
         type: OAuth2
     body:
         compilationResult: ${compilationResult.body.name}
         invocationConfig:
            includedTags:
                 - mdt
          transitiveDependenciesIncluded: true
    result: workflowInvocation