Chapter 2: Data Ingestion Pipeline
In the previous chapter, we learned how the hsr-cloud-accelerator uses a Configuration Hub as a single source of truth for all its settings. Now, let's see how those settings are put to use in our first major process: getting data into the system.
The Problem: A Messy Loading Dock
Imagine you run a restaurant. Every morning, suppliers drop off ingredients: bags of flour, crates of vegetables, and boxes of spices. If they just dump everything in the middle of the kitchen floor, it would be chaos! Some vegetables might be rotten, the flour bags could be torn, and everything is in the wrong place. Before any cooking can happen, a chef has to sort, clean, and prep everything. This is slow, messy, and error-prone.
Handling data is very similar. We get raw data files (like CSVs or JSONs) from various sources. If we just "dump" them into our database, we might load bad data, break our reports, or cause the whole system to fail. We need a clean, organized, and automatic way to receive and prepare our data.
The Solution: A Data Assembly Line
The hsr-cloud-accelerator solves this with the Data Ingestion Pipeline. Think of it as a smart, automated assembly line for data. It's the official front door for all raw data, ensuring that everything that enters our system is clean, standardized, and ready to use.
This pipeline automatically kicks in whenever a new file is dropped into a specific "inbox" folder in our cloud storage. It then performs a series of crucial steps:
- Validation: It checks the file to make sure it has the right columns and structure.
- Transformation: It converts the file from its original format (like CSV) into a highly efficient format called Parquet, which is perfect for big data analysis.
- Loading: It loads the clean, transformed data into our data warehouse, BigQuery.
- Archiving: It moves the original raw file to an archive folder for safekeeping.
Let's see how this works with a simple example.
How It Works: From Raw File to Clean Data
Our assembly line is event-driven. This means it doesn't run all the time; it patiently waits for something to happen. The "something" is a new file arriving in our raw data bucket.
This whole process is handled by a serverless function—a small piece of code that runs in the cloud without us needing to manage a server.
Step 1: A File Arrives
A user or an automated process uploads a file named sales_report.csv into a specific folder in Google Cloud Storage. The location of this folder is defined in our main_config.yaml from Chapter 1.
Step 2: The Pipeline Wakes Up Google Cloud Storage sees the new file and sends a notification. This notification automatically triggers our Data Ingestion Pipeline function.
Step 3 & 4: Process the File
The function reads the sales_report.csv, checks if it has the expected columns (like date, product_id, amount), and converts it into the sales_report.parquet format.
Step 5 & 6: Load and Archive
The pipeline then tells BigQuery to load the data from the new Parquet file. Once BigQuery confirms the data is loaded successfully, the pipeline moves the original sales_report.csv into an "archive" folder. This keeps our "inbox" clean and ready for the next file.
Under the Hood: A Peek at the Code
Let's imagine what the Python code for our serverless function might look like. We'll simplify it to focus on the key ideas.
The function starts by receiving information about the file that triggered it.
# A simplified conceptual function
def process_new_file(event, context):
# 1. Get the file details from the event notification
bucket_name = event['bucket']
file_name = event['name']
print(f"New file received: {file_name} in bucket: {bucket_name}")
# ... more code to follow
This first step just unpacks the notification to figure out which file it needs to work on.
Next, it reads the raw data from the CSV file into memory. We use the popular pandas library for this.
# ... continuing from above
# 2. Read the raw CSV file from Cloud Storage
from google.cloud import storage
import pandas as pd
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
csv_data = blob.download_as_bytes()
df = pd.read_csv(io.BytesIO(csv_data))
Here, we're using the Google Cloud library to access the file and pandas to easily load the CSV data into a table-like structure called a DataFrame.
Now for the crucial validation step. We check if the data looks right. For this example, we'll just check if the required columns are present.
# ... continuing from above
# 3. Validate the data
required_columns = {'date', 'product_id', 'amount'}
if not required_columns.issubset(df.columns):
raise ValueError("File is missing required columns!")
print("Validation successful!")
If the file doesn't have the columns we expect, the function stops and reports an error. This prevents bad data from ever reaching our database.
With the data validated, we can now load it into our BigQuery data warehouse. The Google Cloud library makes this straightforward.
# ... continuing from above
# 4. Load the clean DataFrame into BigQuery
from google.cloud import bigquery
bq_client = bigquery.Client()
# The table ID comes from our configuration!
table_id = "my_project.my_dataset.sales"
job = bq_client.load_table_from_dataframe(df, table_id)
job.result() # Wait for the job to complete
print(f"Loaded {job.output_rows} rows into {table_id}")
Notice how table_id would be pulled from the settings we defined in our Configuration Hub. This is how our pipeline knows the correct destination for the data without hardcoding it.
The final step is to clean up by archiving the original file. This is simply a move operation in Cloud Storage.
# ... continuing from above
# 5. Archive the original file
archive_bucket = storage_client.bucket('my-archive-bucket')
bucket.copy_blob(blob, archive_bucket)
blob.delete()
print(f"Archived original file {file_name}")
And that's it! Our data is now safely loaded and the pipeline is ready for the next file. This entire process happens in seconds, automatically, every single time.
Conclusion
You've just learned about the Data Ingestion Pipeline, the automated and reliable front door for all data entering the hsr-cloud-accelerator.
- It acts as an assembly line for raw data files.
- It's triggered automatically when a new file arrives in Cloud Storage.
- It validates, transforms, and loads the data into BigQuery.
- It ensures that only clean and standardized data enters our system.
This pipeline is perfect for handling files that we generate or receive. But what about getting data from external systems like Google Analytics or third-party APIs? For that, we need a way to fetch the data first.
Next up: External Data Extractors
Generated by AI Codebase Knowledge Builder