Skip to main content

Chapter 1: Pub/Sub Event Handling & Routing

Welcome to the cloud-accelerator-function tutorial! This project is designed to help you build robust data processing pipelines in the cloud. In this first chapter, we'll explore the very first step in our system: how it receives and understands different kinds of messages.

The Mailroom: Why Do We Need Event Handling and Routing?

Imagine a busy office building. It constantly receives mail: letters with new customer orders, packages containing reports, internal memos about project updates, and so on. If all this mail was just dumped in a pile, it would be chaos! Nobody would know what to do with what.

This is where a central mailroom comes in. The mailroom staff:

  1. Receives all incoming mail.
  2. Looks at each item to see what it is (an order, a report, a memo).
  3. Sends (routes) each item to the correct department that knows how to handle it.

Our cloud-accelerator-function faces a similar challenge. It needs to react to various "events" happening in your cloud environment. For example:

  • A new data file (like sales_data_january.csv) is uploaded to a storage bucket.
  • A data processing task (like "transform customer data") has just finished.
  • A signal arrives from another service like Dataform, indicating a data model has been updated.

Each of these events is a "message" that our system needs to process. Pub/Sub Event Handling & Routing is the "central mailroom" of our cloud-accelerator-function. It's the main entry point that receives all these different messages and directs them to the appropriate specialized logic for processing.

Key Concepts: The Parts of Our Mailroom

Let's break down the components involved:

  1. Google Cloud Pub/Sub (The Postal Service): Think of Pub/Sub as a super-efficient postal service provided by Google Cloud. When an event occurs (like a file upload), other Google Cloud services can publish a "message" about this event to a Pub/Sub "topic" (like a specific mailbox). Our application then "subscribes" to this topic to receive these messages.

  2. Events/Messages (The Mail): These are the actual pieces of information delivered by Pub/Sub. Each message contains details about what happened. For example, a message about a new file might include the file's name and where it's stored.

  3. The Flask Application (main.py) (The Mailroom Clerk): At the heart of our cloud-accelerator-function is a web application built using a Python framework called Flask. This application, primarily defined in the run/pubsub/main.py file, is our "mailroom clerk." It's constantly listening for new messages arriving from Pub/Sub.

  4. Decoding (Opening the Envelope): Pub/Sub messages arrive in a specific format. Our Flask application needs to "decode" them, which is like opening the envelope to read the letter inside, to understand their content.

  5. Routing (Sorting the Mail): Once the message is decoded, the Flask application inspects its content to determine its "type" (e.g., is this about a new file, a completed task, or a Dataform update?). Based on this type, it "routes" the message to a specific Python function designed to handle that exact kind of event.

  6. Handler Functions (The Specialist Departments): These are specialized Python functions within our project. Each handler is an expert in dealing with one particular type of event. For example:

How It Works: From Event to Action

Let's trace the journey of a message, using a "new file upload" as an example:

  1. Event Occurs: You upload a file named new_sales_data.csv to a Google Cloud Storage bucket.
  2. Pub/Sub Notified: Google Cloud Storage is configured to send a message to a Pub/Sub topic whenever a new file is uploaded to that bucket.
  3. Message Delivered: Pub/Sub delivers this message to our cloud-accelerator-function (specifically, to the Flask application in main.py).
    • Example Input (Simplified Pub/Sub message content):
      {
      "message": {
      "data": "base64-encoded-string-representing-file-info",
      "attributes": {
      "eventType": "OBJECT_FINALIZE", // Means a new object/file was created
      "bucketId": "my-data-bucket",
      "objectId": "uploads/new_sales_data.csv"
      }
      }
      }
      (The actual data part is encoded, but it contains details like the bucket name and file name).
  4. Flask App Receives & Decodes: The main.py application receives this HTTP POST request from Pub/Sub. The decode_pubsub function extracts the meaningful information.
  5. Type Identification & Routing: The application looks at the message attributes (or sometimes the decoded data) to figure out it's a "file" event.
  6. Handler Called: The routing logic then calls the handle_raw_file function, passing along the details of new_sales_data.csv.

Under the Hood: A Peek Inside main.py

Let's look at how this is implemented step-by-step.

A Visual Overview:

Here's a simplified sequence of what happens:

Diving into the Code (run/pubsub/main.py):

Our Flask application has an endpoint (a specific URL) that Pub/Sub sends messages to.

  1. Receiving the Message: The @app.route("/", methods=["POST"]) line tells Flask that the index() function below it should handle incoming HTTP POST requests to the base URL (/) of our application. Pub/Sub sends its messages as POST requests.

    # File: run/pubsub/main.py
    # ... (imports and app setup) ...

    @app.route("/", methods=["POST"]) # type: ignore
    def index():
    """Base url method to process pub sub messages"""
    try:
    logger.info("Received request for new file")
    # decode pubsub
    data, msg, code = decode_pubsub(request) # Step 1: Open the envelope
    if code != 200:
    # If decoding fails, report error and stop
    return data, msg
    # ... more processing happens here ...

    In this snippet, decode_pubsub(request) takes the raw incoming request from Pub/Sub and tries to extract the useful data from it. If it can't understand the message, it stops early. logger.info(...) helps us track what's happening, which is part of Chapter 7: Structured Logging and Alerting.

  2. Identifying the Message Type: After successfully decoding, the system needs to figure out what kind of event this message represents.

    # File: run/pubsub/main.py
    # ... (inside the index() function, after decode_pubsub) ...

    # Validate the message type (e.g., Cloud Storage event, Task, Dataform)
    code, trigger_type = validate_file_trigger(data) # Step 2: Check letterhead
    if code != 200:
    # Not a valid or expected message type for us
    return data, code

    logger.info(data) # Log the content for debugging
    # ...

    The validate_file_trigger(data) function examines the decoded data. It returns a trigger_type, which could be "file", "task", or "dataform". This tells us which "department" should handle the message.

  3. Routing to the Correct Handler: Now comes the actual routing. An if/elif/else structure directs the flow based on the trigger_type.

    # File: run/pubsub/main.py
    # ... (inside the index() function, after validate_file_trigger) ...

    if trigger_type == "file":
    data_obj = StorageTrigger(data) # Prepare file-specific data
    logger.info("Processing file %s", data_obj.full_name)
    # Call the specialized function for file events
    result = handle_raw_file(data_obj, db) # Step 3a: Send to File Dept.

    elif trigger_type == "task":
    data_obj = TaskObj(data) # Prepare task-specific data
    logger.info("Processing task %s", data_obj.task_id)
    # Call the specialized function for task events
    result = handle_task_recursive(
    data_obj.task_id, data_obj.batch_id, db, data_obj.previous_task_id
    ) # Step 3b: Send to Task Dept.

    elif trigger_type == "dataform":
    logger.info("Processing dataform execution completion trigger...")
    # Call the specialized function for Dataform events
    result = handle_completed_dataform(data, db) # Step 3c: Send to Dataform Dept.

    # ... (logging results and returning response to Pub/Sub) ...
    return result
    # ... (exception handling) ...
    • If trigger_type is "file", handle_raw_file is called. The StorageTrigger(data) part likely converts the generic data into an object specifically structured for file events. The db parameter you see passed relates to Chapter 3: Metadata Persistence (Database).
    • If it's "task", handle_task_recursive is called. Similarly, TaskObj(data) prepares task-specific data.
    • If it's "dataform", handle_completed_dataform gets the call.

    Each of these handler functions (handle_raw_file, handle_task_recursive, handle_completed_dataform) contains the specialized logic for its event type. We'll explore them in detail in their respective chapters.

A Note on Setting Up Triggers (e.g., for Dataform):

Sometimes, you need to explicitly tell Google Cloud services to send messages to Pub/Sub when certain events happen. For instance, to get messages when a Dataform workflow completes, a "log sink" is created. This is done by a script like run/scripts/addDataFormCompletionTrigger.sh.

# File: run/scripts/addDataFormCompletionTrigger.sh
# This script tells Google Cloud Logging to send specific Dataform
# completion messages to our Pub/Sub topic.

gcloud logging sinks create $dataformlogsinkname \
pubsub.googleapis.com/projects/$PROJECT_ID/topics/$pubsubtopic \
--log-filter='jsonPayload.@type="type.googleapis.com/google.cloud.dataform.logging.v1.WorkflowInvocationCompletionLogEntry"'

This command essentially says: "Hey Google Cloud Logging, if you see a log entry that matches this specific Dataform completion filter, please send a copy of it as a message to this Pub/Sub topic ($pubsubtopic)." Our application is listening to this topic. This is an example of how events from Chapter 5: External Services Integration (Dataform & Power BI) make their way into our system.

Conclusion

You've now seen how the cloud-accelerator-function acts as a central hub, receiving various event messages via Google Cloud Pub/Sub. You learned that the Flask application in main.py decodes these messages and, like a mailroom clerk, intelligently routes them to specialized handler functions based on the message type. This routing mechanism is crucial for organizing how different events are processed.

This ensures that a "new file" message is handled by logic that understands files, while a "task completed" message is handled by logic that understands task workflows.

Now that we understand how messages arrive and are sorted, we're ready to see what happens next. In the next chapter, we'll follow the journey of a "new file" message and explore the Chapter 2: Data Ingestion & Transformation Pipeline.