Chapter 4: Main Event Router
In the last chapter, we learned how External Data Extractors act as specialist agents to fetch data from outside services. We now have multiple ways for data and tasks to enter our system: files can be uploaded directly, or extractors can go out and get them. This brings up a new question: with so much going on, how does our application know what to do next?
The Problem: A Chaotic Switchboard
Imagine you're an operator at a busy telephone switchboard from the 1950s. Calls are coming in constantly. One person wants to talk to the fire department, another needs the police, and a third is calling a friend across town. If you don't have a clear system for plugging the right wires into the right sockets, you'll connect the wrong people, cause chaos, and nobody will get the help they need.
Our data platform faces a similar challenge. We have many different kinds of incoming "calls" or events:
- A new sales CSV file has been uploaded.
- An External Data Extractor has finished its job.
- A scheduled task needs to start.
- A data processing job in BigQuery has completed.
We need a central, intelligent "operator" to handle all these messages, understand what each one means, and connect it to the correct part of our application.
The Solution: A Smart Dispatcher
The hsr-cloud-accelerator uses a Main Event Router to solve this. Think of it as the application's central nervous system or a smart dispatcher in an emergency call center. It's a web service that is always listening for one thing: messages.
When a message arrives, the Main Event Router's job is to:
- Receive the incoming message (the "call").
- Inspect it to understand what kind of event it is (a new file, a completed task, etc.).
- Route it to the correct specialist handler (the "department").
This ensures every request is handled promptly and correctly, keeping our entire system organized and efficient.
How It Works: From Event to Action
The router listens for messages from a Google Cloud service called Pub/Sub. Pub/Sub is like a reliable postal service for applications. Other services (like Cloud Storage) can "publish" a message to a specific topic (an "address"), and our router, which is "subscribed" to that topic, will receive it instantly.
Let's see how it handles a new file arriving in our system.
- Event Occurs: A
.parquetfile lands in our archived storage bucket. (This could be the output of our Data Ingestion Pipeline). - Message Sent: Cloud Storage automatically sends a notification message to our Pub/Sub topic.
- Router Receives and Decodes: The Main Event Router gets the message. It opens it up and sees that the event is for a file named
bills.parquetin the archived bucket. - Router Dispatches: Based on this information, the router knows exactly what to do. It calls the specialist handler designed for loading Parquet files into BigQuery.
- Handler Acts: The
Parquet-to-BQ Handlertakes over and begins its work. The router's job for this event is now complete.
Under the Hood: A Peek at the Code
Our Main Event Router is built using Flask, a lightweight Python framework for creating web services. Let's look at a simplified version of its core logic, which you can find in run/pubsub/main.py.
The first piece is the "listener" itself. This is a Flask route that accepts incoming web requests (POST requests) at the root URL (/).
# File: run/pubsub/main.py (Simplified)
from flask import Flask, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def process_pubsub_message():
# 1. Get the raw message data from the request
envelope = request.get_json()
# ... more code to follow
This code sets up a simple web server. When Google Pub/Sub sends a message, it makes a POST request to this server, and this function is automatically called.
Next, the message from Pub/Sub is encoded for security and efficiency. Our router needs to decode it to read its contents.
# ... continuing from above
import base64
import json
# 2. Decode the message to read its contents
pubsub_message = envelope['message']
message_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
data = json.loads(message_data)
After this step, data is a clean Python dictionary that we can easily work with. It might look something like this: {'bucket': 'my-archive-bucket', 'name': 'bills.parquet'}.
Now for the most important part: the routing logic. The router inspects the data dictionary to decide which handler to call.
# ... continuing from above
# Import our specialist handlers
from handlers.parquet_to_bq import add_parquet_to_bigquery
from handlers.swiggy_extractor import swiggy_extractor
# 3. The "Routing" Logic
# Is it a parquet file from our pipeline?
if 'parquet' in data['name']:
add_parquet_to_bigquery(data, db.session)
# Is it a scheduled task to run the Swiggy extractor?
elif data.get('task') == 'run_swiggy':
swiggy_extractor()
else:
print("Unknown event type. Ignoring.")
This is the heart of the dispatcher. It's a simple set of if/elif conditions that check the message's content.
- If the filename contains
.parquet, it knows to call theadd_parquet_to_bigqueryfunction (fromhandlers/parquet_to_bq.py). - If the message contains a key
'task'with the value'run_swiggy', it calls theswiggy_extractorfunction (fromhandlers/swiggy_extractor.py).
By adding more conditions, we can easily teach our router how to handle new kinds of events without changing any of the existing logic.
Conclusion
You've just learned about the Main Event Router, the central nervous system of the hsr-cloud-accelerator.
- It acts as a smart dispatcher for all incoming events.
- It listens for messages from Google Pub/Sub.
- It decodes and inspects each message to understand its purpose.
- It routes the message to the correct specialist handler for processing.
This simple but powerful pattern keeps our application decoupled and organized. It allows different parts of the system to communicate without needing to know about each other directly.
So far, we've seen how to get data in and how to route events. But what happens when one task can only start after another one is finished? How do we build complex workflows with multiple steps?
Next up: Task & Dependency Engine
Generated by AI Codebase Knowledge Builder