Chapter 4: Task Definition & Execution Workflow
Welcome to Chapter 4! In Chapter 3: Metadata Persistence (Database), we learned how our cloud-accelerator-function uses a database to remember important information, like the status of processed files and records of what it has done. This "memory" is crucial for tracking individual pieces of information.
But what if processing data involves multiple steps that need to happen in a specific order? For example, you can't create a sales summary report until after the latest sales data has been loaded into the database. And you can't refresh your Power BI dashboard with this new summary until after the report itself is generated. This is where the Task Definition & Execution Workflow comes in.
The Recipe Card: Why We Need Workflows
Imagine you're baking a cake. You have a recipe with several steps:
- Mix ingredients for the batter.
- Bake the batter in the oven.
- Let the cake cool.
- Prepare the frosting.
- Frost the cake.
You can't frost the cake (Step 5) before it's baked and cooled (Steps 2 & 3). And you probably want to prepare frosting (Step 4) while the cake is cooling, but definitely not before you've even mixed the batter! Some steps depend on others being completed first.
Our cloud-accelerator-function often needs to perform series of operations just like this. These "recipes" for processing data are called workflows. This chapter explains how we define these steps (tasks) and how the system ensures they run in the correct order.
Our Use Case: Let's say we want to automatically refresh a Power BI sales report. This report uses data from a BigQuery table called daily_sales_summary. This summary table itself is created by a Dataform query. So, the order must be:
- Run the Dataform query to update
daily_sales_summary. - Then, refresh the Power BI report.
Key Concepts: The Building Blocks of Our Recipes
-
Task (The Steps in the Recipe): A Task is the most basic unit of work in our system. Think of it as a single instruction in our recipe. Examples of tasks include:
- Loading a specific table from a file into BigQuery.
- Running a Dataform SQL query.
- Refreshing a Power BI dataset or report.
- Moving a file to an archive folder.
-
Task Dependencies (Order of Steps): Tasks can have dependencies on each other. This means one task (a "child" task) cannot start until another task (a "parent" task) has successfully completed.
- In our cake analogy: "Frost the cake" (child) depends on "Bake the cake" (parent) and "Let the cake cool" (parent).
- In our use case: "Refresh Power BI report" (child) depends on "Run Dataform query for
daily_sales_summary" (parent).
-
Workflow / Directed Acyclic Graph (DAG) (The Whole Recipe Card): A workflow is a collection of tasks and their dependencies, organized to achieve a larger goal. It shows the complete sequence of operations.
- Because tasks depend on previous ones, and we don't want loops (like a task depending on itself indirectly), these workflows form what's known in computer science as a Directed Acyclic Graph (DAG). "Directed" means the dependencies have a direction (Task A must finish before Task B). "Acyclic" means no circular dependencies.
- It's like your complete cake recipe, showing all steps and their order.
-
TaskBatch (One Baking Session): When a workflow is triggered (e.g., by a new file arriving, or on a schedule), it starts a new TaskBatch. A
TaskBatchis a specific instance of that workflow running. All tasks that are part of this single execution belong to the sameTaskBatch.- This is like deciding, "I'm going to bake a cake today." All the steps you do for this specific cake are part of today's "baking session" or
TaskBatch. - As we saw in Chapter 3: Metadata Persistence (Database),
TaskBatchrecords are stored in our database to group related task executions.
- This is like deciding, "I'm going to bake a cake today." All the steps you do for this specific cake are part of today's "baking session" or
-
TaskLog (Tracking Each Step's Outcome): For every task that attempts to run within a
TaskBatch, the system creates a TaskLog entry. This log records:- Which task it was.
- Which
TaskBatchit belongs to. - When it started.
- When it finished (or if it failed).
- Its status (
NEW,PROCESSING,SUCCESS,ERROR). - Any error messages if it failed.
- This is like making a note on your recipe card: "Mixed batter - check!", "Baked cake - check! (took 30 mins)", "Frosting - oops, ran out of sugar, failed!".
TaskLogentries are also stored in the database, as discussed in Chapter 3: Metadata Persistence (Database).
How It Solves Our Use Case: Refreshing Power BI After Dataform
Let's revisit our goal: Run a Dataform query, and then refresh a Power BI report.
-
Task Definition (Stored in the Database): Somewhere in our system's configuration, likely within the database described in Chapter 3: Metadata Persistence (Database), we define these tasks:
- Task A: "Run Dataform Sales Summary Query"
- Type: Dataform Execution
- Details: Which Dataform project, which specific query to run.
- Task B: "Refresh Power BI Sales Report"
- Type: Power BI Refresh
- Details: Which Power BI workspace, which dataset/report. And we define the dependency:
- Task B depends on Task A. (Task A is a parent of Task B).
- Task A: "Run Dataform Sales Summary Query"
-
Execution Workflow Trigger: Let's say a new daily sales file has been processed (as in Chapter 2: Data Ingestion & Transformation Pipeline). The completion of this file processing might be configured to trigger Task A.
- A message is sent to our system (via Pub/Sub, as seen in Chapter 1: Pub/Sub Event Handling & Routing) saying, "Start Task A for a new
TaskBatch(e.g.,Batch_20240316_001)."
- A message is sent to our system (via Pub/Sub, as seen in Chapter 1: Pub/Sub Event Handling & Routing) saying, "Start Task A for a new
-
Processing Task A:
- The system receives the message for Task A.
- It creates a
TaskLogentry for Task A inBatch_20240316_001with statusPROCESSING. - It executes Task A (runs the Dataform query). We'll see more about interacting with external services like Dataform in Chapter 5: External Services Integration (Dataform & Power BI).
- Let's say Task A completes successfully. The system updates its
TaskLogtoSUCCESS. - Crucially, it then sends another message to itself (again, via Pub/Sub): "Task A in
Batch_20240316_001just completed."
-
Checking Dependencies and Triggering Task B:
- The system receives the "Task A completed" message. The function
handle_task_recursive(which we'll see soon) is invoked. - It looks up Task A in the database.
- It finds that Task B depends on Task A.
- It then checks: "Are all parents of Task B completed for
Batch_20240316_001?"- Task B only has one parent: Task A.
- The system checks the
TaskLogfor Task A inBatch_20240316_001. It'sSUCCESS. - Yes! All parents of Task B are complete.
- The system now triggers Task B: It creates a
TaskLogfor Task B (statusNEWorPROCESSING) in the sameBatch_20240316_001and sends a Pub/Sub message: "Execute Task B forBatch_20240316_001."
- The system receives the "Task A completed" message. The function
-
Processing Task B:
- The system receives the message for Task B.
- It executes Task B (refreshes the Power BI report).
- If successful, it updates Task B's
TaskLogtoSUCCESS. - It then sends a "Task B completed" message.
handle_task_recursiveis called again. It checks if Task B has any children. If not, this branch of the workflow forBatch_20240316_001is complete.
This way, Task B (Power BI refresh) only runs after Task A (Dataform query) is confirmed to be successful. The system automatically manages this "baton pass."
Under the Hood: How the System Manages the Flow
The magic of orchestrating these tasks happens primarily within a function often called handle_task_recursive, which gets triggered by Pub/Sub messages indicating a task has been initiated or has completed.
A Step-by-Step Walkthrough:
Imagine Task A (parent) has just finished, and Task B (child) depends on it.
Diving into the Code:
-
Receiving a Task Message (
run/pubsub/main.py): As we saw in Chapter 1: Pub/Sub Event Handling & Routing,main.pyreceives messages. If it identifies a message as being related to a task, it routes it to a specialized handler.# File: run/pubsub/main.py
# ... (inside the index() function, after validate_file_trigger) ...
elif trigger_type == "task":
data_obj = TaskObj(data) # Prepare task-specific data
logger.info("Processing task %s", data_obj.task_id)
# Call the specialized function for task events
result = handle_task_recursive(
data_obj.task_id, data_obj.batch_id, db, data_obj.previous_task_id
) # Route to task handler
# ...Here, if the incoming message is about a task (e.g., "Task X just completed" or "Execute Task Y"), the
handle_task_recursivefunction is called.data_objwould contain details liketask_id,batch_id, and potentiallyprevious_task_id(the ID of the task that just finished, triggering this check). Thedbobject is our connection to the database from Chapter 3: Metadata Persistence (Database) -
The Task Handler (
handle_task_recursive- conceptually): The actualhandle_task_recursivefunction (likely in a file likerun/pubsub/handlers/task_handler.py) is the brain of the workflow. Here's a simplified conceptual outline of what it does:# Conceptual: run/pubsub/handlers/task_handler.py
# from database.models import Task, TaskLog, TaskBatch # Import necessary models
# from helpers.pubsub_publisher import publish_message # To send new task messages
def handle_task_recursive(task_id_to_process, batch_id, db, previous_task_id=None):
# If previous_task_id is present, it means a parent task just completed.
# We need to log its completion and then find its children.
if previous_task_id:
# Step 1: Log completion of the previous_task_id (parent task)
# parent_task_log = db.session.query(TaskLog).filter_by(
# task_id=previous_task_id, batch_id=batch_id
# ).first()
# if parent_task_log:
# parent_task_log.status = "SUCCESS" # Or ERROR if it failed
# parent_task_log.ended_at = datetime.utcnow()
# db.session.commit()
logger.info(f"Parent task {previous_task_id} in batch {batch_id} logged as complete.")
# Step 2: Find child tasks that depend on this 'previous_task_id'
# child_tasks = db.session.query(Task).join(TaskDependencyTable)...
# .filter(TaskDependencyTable.parent_id == previous_task_id).all()
# For simplicity, let's imagine we get a list of child task IDs:
child_task_ids = get_child_task_ids_from_db(previous_task_id, db)
for child_id in child_task_ids:
# Step 3 & 4: Check if ALL parents of this child_id are complete
if are_all_parents_complete(child_id, batch_id, db):
logger.info(f"Child task {child_id} is ready to run!")
# Step 5a: Create a new TaskLog for this child
# new_log = TaskLog(task_id=child_id, batch_id=batch_id, status="PENDING")
# db.session.add(new_log)
# db.session.commit()
# Step 5b: Publish a new Pub/Sub message to trigger this child task
# message_data = {"task_id": child_id, "batch_id": batch_id}
# publish_message(message_data) # This sends it back to Pub/Sub
logger.info(f"Published message to trigger child task {child_id}.")
else:
logger.info(f"Child task {child_id} not ready, waiting for other parents.")
else:
# This 'else' block means task_id_to_process is a task to BE EXECUTED now.
# (e.g., it was triggered by the logic above, or it's the first task in a batch)
# current_task_log = db.session.query(TaskLog).filter_by(
# task_id=task_id_to_process, batch_id=batch_id
# ).first()
# current_task_log.status = "PROCESSING"
# current_task_log.started_at = datetime.utcnow()
# db.session.commit()
logger.info(f"Executing task {task_id_to_process} in batch {batch_id}...")
# Perform the actual work of the task (e.g., call Dataform, Power BI)
# success = execute_actual_task_logic(task_id_to_process, db)
success = True # Placeholder for actual task execution
# After execution, send a "completion" message for THIS task,
# which will re-invoke handle_task_recursive with this task_id as 'previous_task_id'.
# completion_message = {
# "task_id": task_id_to_process, # This task itself
# "batch_id": batch_id,
# "previous_task_id": task_id_to_process, # Mark itself as completed
# "status": "SUCCESS" if success else "ERROR"
# }
# publish_message(completion_message)
logger.info(f"Task {task_id_to_process} finished. Publishing its completion.")
return "Task processing initiated/logged.", 200
# --- Helper function stubs (conceptual) ---
def get_child_task_ids_from_db(parent_task_id, db):
# In reality, this queries database tables defining task dependencies.
# e.g., SELECT child_id FROM task_dependencies WHERE parent_id = parent_task_id
logger.info(f"DB Query: Get children of {parent_task_id}")
if parent_task_id == "DataformTask123": return ["PowerBITask789"] # Dummy
return []
def are_all_parents_complete(child_task_id, batch_id, db):
# In reality, this queries:
# 1. Find all parent_ids for child_task_id from task_dependencies.
# 2. For each parent_id, check its TaskLog status in the given batch_id.
# Return True if all are "SUCCESS".
logger.info(f"DB Query: Check parents of {child_task_id} for batch {batch_id}")
return True # DummyKey things to note in this conceptual code:
- Dual Role: The
handle_task_recursivefunction cleverly handles two scenarios:- When a task has just completed (
previous_task_idis provided): It logs this completion and then checks for/triggers any ready child tasks. - When a task is to be executed (
previous_task_idisNoneor not the focus): It performs the actual work of the task and then, upon its own completion, sends a message that will lead back to scenario 1 for itself.
- When a task has just completed (
- Database Interaction: It heavily relies on the database (
db) to:- Update
TaskLogstatuses. - Discover parent-child task relationships (using hypothetical functions like
get_child_task_ids_from_dbandare_all_parents_completewhich would query dependency tables).
- Update
- Recursive Nature via Pub/Sub: When a child task is ready,
handle_task_recursivedoesn't call itself directly. Instead, it publishes a new Pub/Sub message. This new message will eventually be picked up by thecloud-accelerator-functionagain, leading to another call tohandle_task_recursiveto process that new task. This is a robust way to handle distributed task execution.
- Dual Role: The
The definitions for tasks (their names, types, and what they do) and their dependencies would typically be set up beforehand, often stored in database tables (e.g., a Tasks table and a TaskDependencies table) as part of the system's overall Chapter 6: Configuration Management.
Conclusion
You've now seen how the cloud-accelerator-function can manage complex, multi-step processes using Task Definitions and an Execution Workflow. It's like having an automated chef that follows a recipe meticulously:
- It knows the Tasks (steps).
- It respects Dependencies (order of steps).
- It groups executions into a TaskBatch (one cooking session).
- It records everything in TaskLogs (notes on how each step went).
- It ensures dependent tasks only run when their prerequisites are met, forming a reliable Workflow (DAG).
This powerful abstraction allows you to build sophisticated data pipelines where the completion of one operation automatically and reliably triggers the next, ensuring data flows correctly through your system.
So far, we've talked about tasks like "run a query" or "refresh a report" somewhat abstractly. In the next chapter, we'll dive into the specifics of how our cloud-accelerator-function actually communicates with external systems to perform these actions. Get ready for Chapter 5: External Services Integration (Dataform & Power BI)!