Skip to main content

Chapter 2: Data Ingestion & Transformation Pipeline

Welcome to Chapter 2! In Chapter 1: Pub/Sub Event Handling & Routing, we learned how our cloud-accelerator-function acts like a mailroom, receiving messages about events like new file uploads and routing them to the correct handler. Now, let's follow one of those messages – specifically, one announcing a new data file – and see what happens next. We're about to dive into the heart of data processing: the Data Ingestion & Transformation Pipeline.

The Data Factory: Why an Ingestion & Transformation Pipeline?

Imagine you run a small online cookie shop. Every day, you get a list of online orders as a simple text file (a CSV file, perhaps daily_orders_20231115.csv). This file contains raw, unprocessed order details. To understand your sales, track popular cookies, or see which regions order most, you can't just stare at a pile of these CSV files. You need to:

  1. Collect these files (Ingestion).
  2. Clean them up and organize them into a standard format (Transformation).
  3. Store them in a way that's easy to ask questions from, like in a structured database (Loading).

This whole process, from raw file to usable data, is what a Data Ingestion & Transformation Pipeline does. Think of it as a mini-factory or an assembly line for your data. Raw materials (your data files) go in one end, pass through several processing stages, and come out as a finished product (clean, queryable data).

Our cloud-accelerator-function provides this "data factory" to automatically handle new files.

Key Stages of Our Data Assembly Line

When a new data file arrives, it kicks off a series of steps:

  1. Raw File Arrival (The Loading Dock):

    • A new data file, say customer_updates_batch001.csv, is uploaded to a designated "raw" bucket in Google Cloud Storage (GCS).
    • As we saw in Chapter 1, this upload triggers an event, and our handle_raw_file function is called.
  2. Initial Checks & Understanding the File (Quality Control):

    • The system first peeks at the file. Is it a CSV? Is it a JSON file meant to be a schema for a table?
    • It often expects filenames to follow a certain pattern to understand what the data is about. For example, domain_tablename_batchid.csv like sales_orders_001.csv tells us it's for the "sales" domain, the "orders" table, and it's batch "001".
    # Simplified concept from helpers/utils.py
    def get_info_from_filename(filename: str):
    # Example: "sales_orders_001.csv"
    parts = filename.replace(".csv", "").split("_")
    domain_name = parts[0] # "sales"
    table_name = parts[1] # "orders"
    batch_id_str = parts[2] # "001"
    # ... more logic to convert batch_id_str to number ...
    return domain_name, table_name, int(batch_id_str)

    This helps the system automatically know where the data belongs.

  3. Transformation (Processing & Repackaging):

    • Why transform? Raw CSV files are human-readable, but for efficient storage and super-fast analysis on large datasets, other formats are better. Apache Parquet is a popular choice. Parquet files are typically smaller than CSVs and can be queried much faster by systems like BigQuery.
    • The Change: The system takes the data from the CSV file and converts it into Parquet format.
    • (The exact tools or libraries used for this conversion are handled internally by the accelerator, ensuring the data is correctly translated.)
  4. Loading into BigQuery (Storing the Finished Product):

    • The newly transformed Parquet data is then loaded into a table in Google BigQuery. BigQuery is a powerful data warehouse that lets you run SQL queries on very large datasets.
    • Table Schemas: How does BigQuery know what columns to create (e.g., order_id, product_name, quantity, price) and their data types (number, text, date)?
      • Sometimes, you might upload a separate JSON schema file (e.g., sales_orders_schema.json) alongside your data. This file explicitly defines the table structure.
      • Our system can read this schema file from GCS.
    # Simplified concept from helpers/storage_helper.py
    def read_json_schema_from_gcs(bucket_name: str, schema_file_path: str):
    # ... (connects to GCS) ...
    # content = download schema_file_path from bucket_name
    # schema_data = json.loads(content)
    # return schema_data
    # Example schema_data:
    # [
    # {"name": "order_id", "type": "STRING"},
    # {"name": "customer_name", "type": "STRING"},
    # {"name": "order_date", "type": "DATE"}
    # ]
    print(f"Reading schema from {bucket_name}/{schema_file_path}") # Placeholder
    return [{"name": "column1", "type": "STRING"}] # Dummy schema

    This schema guides BigQuery in creating the table correctly before loading the Parquet data.

  5. File Housekeeping (Cleaning Up the Workspace):

    • Once the data is successfully processed and loaded into BigQuery, we don't need the original raw file cluttering the "raw" bucket.
    • Success: The original file (e.g., customer_updates_batch001.csv) is moved to an "archived" GCS bucket. This keeps your raw bucket clean and provides a backup.
    • Problems: If something goes wrong during any step (e.g., the CSV is malformed, the schema is missing), the original file is moved to an "error" GCS bucket. This isolates problematic files for later inspection. Information about the error is often stored with the file.

    The system uses helper functions to manage these file movements. It also needs to know which buckets are for raw, archived, or error files. This is often set in Chapter 6: Configuration Management.

    # Simplified concept from helpers/storage_helper.py
    def get_configured_bucket_path(config_key: str, default_folder: str):
    # path = os.environ.get(config_key) # Reads from environment variable
    # if not path:
    # path = f"default_raw_bucket/{default_folder}" # Fallback
    # return path
    print(f"Path for {config_key} would be looked up or defaulted to {default_folder}")
    return f"some-bucket/{default_folder}" # Dummy path
    # Simplified concept from helpers/storage_helper.py
    def move_file_to_archived(original_file_info, batch_id):
    archive_path_root = get_configured_bucket_path("CA_ARCHIVED", "archived")
    # Creates a dated folder like "archived/2023/11/15/"
    # date_folder = datetime.today().strftime('%Y/%m/%d')
    # destination_path = f"{archive_path_root}/{date_folder}"
    # move_blob(original_file_info.bucket, destination_path, original_file_info.full_name)
    print(f"Moving {original_file_info.full_name} to ARCHIVED area.")
    # Simplified concept from helpers/storage_helper.py
    def move_file_to_error(original_file_info, batch_id, error_message):
    error_path_root = get_configured_bucket_path("CA_ERROR", "error")
    # date_folder = datetime.today().strftime('%Y/%m/%d')
    # destination_path = f"{error_path_root}/{date_folder}"
    # metadata = {"error_message": error_message, "batch_id": batch_id}
    # move_blob(original_file_info.bucket, destination_path, original_file_info.full_name, metadata)
    print(f"Moving {original_file_info.full_name} to ERROR area due to: {error_message}")

How It Works: Following a CSV File's Journey

Let's visualize the journey of sales_data_jan_2024.csv:

This diagram shows a simplified flow:

  1. You upload sales_data_jan_2024.csv to the raw GCS bucket.
  2. Google Cloud sends a message (via Pub/Sub, as covered in Chapter 1) to our cloud-accelerator-function.
  3. The function (specifically, the handle_raw_file logic) kicks in:
    • It validates the file.
    • It transforms the CSV data into the more efficient Parquet format.
    • If a schema file (like sales_data_schema.json) exists, it uses it to ensure the BigQuery table is set up correctly.
    • It loads the Parquet data into the appropriate BigQuery table.
  4. Finally, if all went well, it moves the original sales_data_jan_2024.csv to an archive bucket. If things went wrong, it would move it to an error bucket.

Under the Hood: A Peek at File Management

The cloud-accelerator-function relies on robust helper functions, primarily in helpers/storage_helper.py, to manage files in Google Cloud Storage.

Knowing Where Files Go:

The system needs to know the names of your raw, archived, and error buckets. As mentioned, this information often comes from environment variables, which are part of the function's Chapter 6: Configuration Management. The get_bucket_path_from_config function (shown simplified earlier) is responsible for fetching these paths.

# From: run/pubsub/helpers/storage_helper.py

def get_bucket_path_from_config(
environ_key: str, # e.g., "CA_ARCHIVED"
default_environ_key: str = "", # e.g., "CA_RAW"
default_folder="" # e.g., "archived"
) -> str:
result_bucket_path = os.environ.get(environ_key)
if result_bucket_path is None:
# If specific bucket isn't set, use a default base bucket and a subfolder
raw_bucket_path = os.environ.get(default_environ_key, "ca_raw") # Fallback bucket
# ... (logic to combine raw_bucket_path and default_folder) ...
# result_bucket_path = f"{raw_bucket_name}/{default_folder}"
# For simplicity, let's assume it forms the path
pass # Actual logic is more detailed
return result_bucket_path # e.g., "my-project-archived-bucket/daily_files"

This function ensures that even if a specific bucket (like CA_ARCHIVED_BUCKET) isn't explicitly configured, the system can default to a known structure (e.g., a folder named archived inside your main CA_RAW_BUCKET).

The Actual Move:

The core file moving operation is handled by a function like move_blob.

# Simplified from: run/pubsub/helpers/storage_helper.py

def move_blob(
src_bucket_path: str, # e.g., "my-raw-bucket"
dst_bucket_path: str, # e.g., "my-archive-bucket/2023/11/15"
blob_name: str, # e.g., "uploads/sales.csv"
# ... (client and custom_metadata are also params) ...
) -> None:
# 1. Get source and destination bucket objects and new blob name
# src_bucket = client.bucket(src_bucket_name)
# dst_bucket = client.bucket(dst_bucket_name)
# src_blob = src_bucket.blob(blob_name)
# new_blob_name = f"{dst_folder_path}/{original_filename}"

# 2. Copy the blob to the new destination
# src_bucket.copy_blob(src_blob, dst_bucket, new_blob_name)

# 3. (Optionally) Add metadata to the new copied blob
# if custom_metadata:
# destination_blob.metadata = custom_metadata
# destination_blob.patch()

# 4. Delete the original blob from the source bucket
# src_bucket.delete_blob(blob_name)
logger.info(f"Moved {blob_name} from {src_bucket_path} to {dst_bucket_path}")

This function essentially copies the file to its new home (archive or error bucket) and then deletes the original from the raw bucket, effectively "moving" it. It can also add custom metadata (like an error message or a batch_id) to the moved file.

Conclusion

You've now seen the journey of data within the cloud-accelerator-function! It's not just about receiving a file; it's an entire automated assembly line that:

  • Validates and understands incoming data.
  • Transforms it into an optimal format (like Parquet).
  • Loads it into a queryable system like BigQuery, often using schema files for structure.
  • Keeps the digital workspace tidy by archiving processed files or isolating errored ones.

This pipeline turns raw, potentially messy data into a valuable, organized asset ready for analysis. But how does the system keep track of all these files, their processing status, and any potential issues or dependencies? That's where metadata comes in.

In our next chapter, we'll explore Chapter 3: Metadata Persistence (Database) to understand how the system records and uses information about the data it processes.