Chapter 1: Pub/Sub Event Handling & Routing
Welcome to the cloud-accelerator-function tutorial! This project is designed to help you build robust data processing pipelines in the cloud. In this first chapter, we'll explore the very first step in our system: how it receives and understands different kinds of messages.
The Mailroom: Why Do We Need Event Handling and Routing?
Imagine a busy office building. It constantly receives mail: letters with new customer orders, packages containing reports, internal memos about project updates, and so on. If all this mail was just dumped in a pile, it would be chaos! Nobody would know what to do with what.
This is where a central mailroom comes in. The mailroom staff:
- Receives all incoming mail.
- Looks at each item to see what it is (an order, a report, a memo).
- Sends (routes) each item to the correct department that knows how to handle it.
Our cloud-accelerator-function faces a similar challenge. It needs to react to various "events" happening in your cloud environment. For example:
- A new data file (like
sales_data_january.csv) is uploaded to a storage bucket. - A data processing task (like "transform customer data") has just finished.
- A signal arrives from another service like Dataform, indicating a data model has been updated.
Each of these events is a "message" that our system needs to process. Pub/Sub Event Handling & Routing is the "central mailroom" of our cloud-accelerator-function. It's the main entry point that receives all these different messages and directs them to the appropriate specialized logic for processing.
Key Concepts: The Parts of Our Mailroom
Let's break down the components involved:
-
Google Cloud Pub/Sub (The Postal Service): Think of Pub/Sub as a super-efficient postal service provided by Google Cloud. When an event occurs (like a file upload), other Google Cloud services can publish a "message" about this event to a Pub/Sub "topic" (like a specific mailbox). Our application then "subscribes" to this topic to receive these messages.
-
Events/Messages (The Mail): These are the actual pieces of information delivered by Pub/Sub. Each message contains details about what happened. For example, a message about a new file might include the file's name and where it's stored.
-
The Flask Application (
main.py) (The Mailroom Clerk): At the heart of ourcloud-accelerator-functionis a web application built using a Python framework called Flask. This application, primarily defined in therun/pubsub/main.pyfile, is our "mailroom clerk." It's constantly listening for new messages arriving from Pub/Sub. -
Decoding (Opening the Envelope): Pub/Sub messages arrive in a specific format. Our Flask application needs to "decode" them, which is like opening the envelope to read the letter inside, to understand their content.
-
Routing (Sorting the Mail): Once the message is decoded, the Flask application inspects its content to determine its "type" (e.g., is this about a new file, a completed task, or a Dataform update?). Based on this type, it "routes" the message to a specific Python function designed to handle that exact kind of event.
-
Handler Functions (The Specialist Departments): These are specialized Python functions within our project. Each handler is an expert in dealing with one particular type of event. For example:
handle_raw_file: Knows what to do when a new file arrives. (We'll learn more about this in Chapter 2: Data Ingestion & Transformation Pipeline).handle_task_recursive: Manages the workflow of tasks. (Covered in Chapter 4: Task Definition & Execution Workflow).handle_completed_dataform: Reacts to signals from Dataform. (See Chapter 5: External Services Integration (Dataform & Power BI)).
How It Works: From Event to Action
Let's trace the journey of a message, using a "new file upload" as an example:
- Event Occurs: You upload a file named
new_sales_data.csvto a Google Cloud Storage bucket. - Pub/Sub Notified: Google Cloud Storage is configured to send a message to a Pub/Sub topic whenever a new file is uploaded to that bucket.
- Message Delivered: Pub/Sub delivers this message to our
cloud-accelerator-function(specifically, to the Flask application inmain.py).- Example Input (Simplified Pub/Sub message content):
(The actual
{
"message": {
"data": "base64-encoded-string-representing-file-info",
"attributes": {
"eventType": "OBJECT_FINALIZE", // Means a new object/file was created
"bucketId": "my-data-bucket",
"objectId": "uploads/new_sales_data.csv"
}
}
}datapart is encoded, but it contains details like the bucket name and file name).
- Example Input (Simplified Pub/Sub message content):
- Flask App Receives & Decodes: The
main.pyapplication receives this HTTP POST request from Pub/Sub. Thedecode_pubsubfunction extracts the meaningful information. - Type Identification & Routing: The application looks at the message attributes (or sometimes the decoded data) to figure out it's a "file" event.
- Handler Called: The routing logic then calls the
handle_raw_filefunction, passing along the details ofnew_sales_data.csv.- Action: The
handle_raw_filefunction now takes over to begin processing this new file. What exactly it does is the subject of Chapter 2: Data Ingestion & Transformation Pipeline. For now, it's important to know the message got to the right place!
- Action: The
Under the Hood: A Peek Inside main.py
Let's look at how this is implemented step-by-step.
A Visual Overview:
Here's a simplified sequence of what happens:
Diving into the Code (run/pubsub/main.py):
Our Flask application has an endpoint (a specific URL) that Pub/Sub sends messages to.
-
Receiving the Message: The
@app.route("/", methods=["POST"])line tells Flask that theindex()function below it should handle incoming HTTP POST requests to the base URL (/) of our application. Pub/Sub sends its messages as POST requests.# File: run/pubsub/main.py
# ... (imports and app setup) ...
@app.route("/", methods=["POST"]) # type: ignore
def index():
"""Base url method to process pub sub messages"""
try:
logger.info("Received request for new file")
# decode pubsub
data, msg, code = decode_pubsub(request) # Step 1: Open the envelope
if code != 200:
# If decoding fails, report error and stop
return data, msg
# ... more processing happens here ...In this snippet,
decode_pubsub(request)takes the raw incoming request from Pub/Sub and tries to extract the useful data from it. If it can't understand the message, it stops early.logger.info(...)helps us track what's happening, which is part of Chapter 7: Structured Logging and Alerting. -
Identifying the Message Type: After successfully decoding, the system needs to figure out what kind of event this message represents.
# File: run/pubsub/main.py
# ... (inside the index() function, after decode_pubsub) ...
# Validate the message type (e.g., Cloud Storage event, Task, Dataform)
code, trigger_type = validate_file_trigger(data) # Step 2: Check letterhead
if code != 200:
# Not a valid or expected message type for us
return data, code
logger.info(data) # Log the content for debugging
# ...The
validate_file_trigger(data)function examines the decodeddata. It returns atrigger_type, which could be"file","task", or"dataform". This tells us which "department" should handle the message. -
Routing to the Correct Handler: Now comes the actual routing. An
if/elif/elsestructure directs the flow based on thetrigger_type.# File: run/pubsub/main.py
# ... (inside the index() function, after validate_file_trigger) ...
if trigger_type == "file":
data_obj = StorageTrigger(data) # Prepare file-specific data
logger.info("Processing file %s", data_obj.full_name)
# Call the specialized function for file events
result = handle_raw_file(data_obj, db) # Step 3a: Send to File Dept.
elif trigger_type == "task":
data_obj = TaskObj(data) # Prepare task-specific data
logger.info("Processing task %s", data_obj.task_id)
# Call the specialized function for task events
result = handle_task_recursive(
data_obj.task_id, data_obj.batch_id, db, data_obj.previous_task_id
) # Step 3b: Send to Task Dept.
elif trigger_type == "dataform":
logger.info("Processing dataform execution completion trigger...")
# Call the specialized function for Dataform events
result = handle_completed_dataform(data, db) # Step 3c: Send to Dataform Dept.
# ... (logging results and returning response to Pub/Sub) ...
return result
# ... (exception handling) ...- If
trigger_typeis"file",handle_raw_fileis called. TheStorageTrigger(data)part likely converts the genericdatainto an object specifically structured for file events. Thedbparameter you see passed relates to Chapter 3: Metadata Persistence (Database). - If it's
"task",handle_task_recursiveis called. Similarly,TaskObj(data)prepares task-specific data. - If it's
"dataform",handle_completed_dataformgets the call.
Each of these handler functions (
handle_raw_file,handle_task_recursive,handle_completed_dataform) contains the specialized logic for its event type. We'll explore them in detail in their respective chapters. - If
A Note on Setting Up Triggers (e.g., for Dataform):
Sometimes, you need to explicitly tell Google Cloud services to send messages to Pub/Sub when certain events happen. For instance, to get messages when a Dataform workflow completes, a "log sink" is created. This is done by a script like run/scripts/addDataFormCompletionTrigger.sh.
# File: run/scripts/addDataFormCompletionTrigger.sh
# This script tells Google Cloud Logging to send specific Dataform
# completion messages to our Pub/Sub topic.
gcloud logging sinks create $dataformlogsinkname \
pubsub.googleapis.com/projects/$PROJECT_ID/topics/$pubsubtopic \
--log-filter='jsonPayload.@type="type.googleapis.com/google.cloud.dataform.logging.v1.WorkflowInvocationCompletionLogEntry"'
This command essentially says: "Hey Google Cloud Logging, if you see a log entry that matches this specific Dataform completion filter, please send a copy of it as a message to this Pub/Sub topic ($pubsubtopic)." Our application is listening to this topic. This is an example of how events from Chapter 5: External Services Integration (Dataform & Power BI) make their way into our system.
Conclusion
You've now seen how the cloud-accelerator-function acts as a central hub, receiving various event messages via Google Cloud Pub/Sub. You learned that the Flask application in main.py decodes these messages and, like a mailroom clerk, intelligently routes them to specialized handler functions based on the message type. This routing mechanism is crucial for organizing how different events are processed.
This ensures that a "new file" message is handled by logic that understands files, while a "task completed" message is handled by logic that understands task workflows.
Now that we understand how messages arrive and are sorted, we're ready to see what happens next. In the next chapter, we'll follow the journey of a "new file" message and explore the Chapter 2: Data Ingestion & Transformation Pipeline.