Skip to main content

Chapter 3: Metadata Persistence (Database)

In Chapter 2: Data Ingestion & Transformation Pipeline, we saw how our cloud-accelerator-function processes raw data files, transforms them, and loads them into BigQuery. It's like an efficient factory assembly line. But imagine this factory processes hundreds or thousands of files every day! How does it keep track of everything?

  • Which files have been processed successfully?
  • Which ones encountered errors?
  • If a file fails, what was the problem?
  • If we try to process the same file again, will it do all the work twice or be smart about it?

This is where our system's "memory" comes into play. This chapter is all about Metadata Persistence – how our cloud-accelerator-function remembers crucial information using a database.

The System's Librarian: Why We Need a Database

Think of a large library. It has thousands of books. How does the librarian know:

  • Which books are currently checked out?
  • Which books are overdue?
  • Where to find a specific book (which shelf, which section)?
  • How many copies of "Moby Dick" does the library own?

The librarian doesn't keep all this in their head! They use a catalog system – traditionally card catalogs, now often a digital database. This catalog stores metadata: data about the books (title, author, ISBN, status, location).

Our cloud-accelerator-function needs a similar "catalog" or "memory" for all the data it handles and all the tasks it performs. This memory is a PostgreSQL database.

PostgreSQL is a powerful and reliable open-source database system. To help our Python code "talk" to this PostgreSQL database easily, we use a tool called SQLAlchemy. SQLAlchemy is like a skilled translator that lets our Python code interact with the database using Python objects and methods, instead of having to write complex SQL (Structured Query Language) commands directly.

What Kind of Information Do We Store? (The "Index Cards")

Our database stores several types of "index cards" or records, each tracking different aspects of the system:

  1. File Processing States (Files model):

    • For every file that comes in (e.g., sales_data_jan.csv), we create a record.
    • This record tracks its journey: Is it NEW, PROCESSING, SUCCESS, or did it ERROR out? What was the error message?
    • This is like the librarian knowing if a book is "on shelf," "checked out," or "sent for repair."
  2. Table Definitions and Schemas (Domain, Tables, Schemas models):

    • Remember how in Chapter 2 we might use a JSON schema file to define the structure of our tables in BigQuery?
    • The database stores information about these table definitions: What "domain" (e.g., "Sales," "Marketing") does a table belong to? What are its columns (product_id, customer_name) and their data types (NUMBER, TEXT)?
    • This is like the library catalog knowing that "Moby Dick" is a "Novel" (domain), has chapters (structure), and is written in English (schema detail).
  3. Dataform Queries or Power BI Reports (Query, Repo, PowerBiReport models):

    • Our system can also interact with other tools like Dataform (for complex data transformations) and Power BI (for creating reports). We'll learn more about this in Chapter 5: External Services Integration (Dataform & Power BI).
    • The database keeps track of these external items: definitions of Dataform queries, locations of Power BI reports, etc.
    • This is like the catalog knowing about special collections or digital resources the library provides.
  4. Task Execution History (TaskLog, TaskBatch models):

    • The system often performs sequences of actions, called "tasks." For example, "process new sales file" could be a task that involves several steps.
    • The database logs every time a task runs: When did it start? When did it finish? Was it part of a larger "batch" of tasks?
    • This is the librarian's daily logbook, recording all significant activities. We'll delve into tasks in Chapter 4: Task Definition & Execution Workflow.

Why is This "Memory" So Important?

Having this persistent metadata stored in a database is crucial for several reasons:

  • Tracking: We always know the current status of any file or task. Is it done? Is it stuck?
  • Auditing: We can look back at the history. If a customer asks why their data from last Tuesday wasn't updated, we can check the logs and file statuses in the database.
  • Reliability: If our cloud-accelerator-function crashes and restarts (computers can be like that!), it can look at the database to see what it was doing and pick up where it left off, or at least know what needs attention.
  • Idempotency: This is a fancy word meaning that doing the same operation multiple times doesn't have unintended side effects. For example, if we accidentally receive a signal to process sales_data_jan.csv again, the system can check the database. If it sees "sales_data_jan.csv - status: SUCCESS," it can wisely say, "I've already done that!" and avoid re-processing it and creating duplicate data.

How It Works: Remembering a File's Journey

Let's imagine our cloud-accelerator-function receives a new file, orders_20240315.csv. Here's a simplified idea of how the database helps:

  1. A new file orders_20240315.csv arrives.
  2. The cloud-accelerator-function (CAF) first asks the Database: "Have I seen this file before?"
  3. The Database checks its Files table. Let's say it's a brand new file.
  4. The CAF tells the Database: "Okay, create a new record for orders_20240315.csv with the status NEW." The database stores this.
  5. The CAF then starts processing the file (transforming it, loading it to BigQuery, as we saw in Chapter 2: Data Ingestion & Transformation Pipeline).
  6. If everything goes well: The CAF tells the Database: "Update the record for orders_20240315.csv. Set its status to SUCCESS."
  7. If something goes wrong (e.g., the file is corrupted): The CAF tells the Database: "Update the record for orders_20240315.csv. Set its status to ERROR and maybe add an error message like 'File format not recognized'."

Now, if the same file signal comes again, when the CAF asks the database in step 2, the database will say, "Yes, I have a record for it, and its status is SUCCESS (or ERROR)." The CAF can then decide not to reprocess it if it's already successful.

Under the Hood: SQLAlchemy Models and Database Sessions

How does our Python code actually "talk" to the PostgreSQL database via SQLAlchemy? It uses something called Models and Sessions.

Database Models (The Blueprints):

A "model" in SQLAlchemy is a Python class that describes a table in our database. It's like a blueprint that tells SQLAlchemy what the table is named and what columns it has.

All these model definitions live in files like database/models.py. Let's look at a very simplified version of what a Files model might look like:

# Simplified example from database/models.py

# 'db' here is an instance of SQLAlchemy, set up elsewhere in the app
# class Files(db.Model):
# __tablename__ = 'files' # The actual table name in PostgreSQL

# id = db.Column(db.Integer, primary_key=True) # Unique ID for each record
# file_name = db.Column(db.String(255), nullable=False) # File's name
# status = db.Column(db.String(50), default="NEW") # e.g., NEW, PROCESSING, SUCCESS
# created_at = db.Column(db.DateTime, default=datetime.utcnow) # When it was recorded
# # ... other columns like error_message, batch_id, etc.

# def __repr__(self):
# return f"<File {self.file_name} - {self.status}>"
  • class Files(db.Model): This line declares a Python class named Files that SQLAlchemy will manage as a database table.
  • id = db.Column(...): Defines an id column, which will be the primary key (a unique identifier for each row).
  • file_name = db.Column(...): Defines a column to store the file's name as text.
  • status = db.Column(...): Defines a column to store the processing status.

There are similar model classes for Domain, Tables, TaskLog, etc., each defining the structure for their respective tables in the database.

Database Sessions (The Conversation):

To actually read from or write to the database, our code uses a SQLAlchemy "session." Think of a session as an ongoing conversation with the database. You get this session (often represented by a variable like db or db.session in our code) and then use it to perform operations.

Example: Adding a New File Record When a new file new_report.pdf arrives, a handler function (like handle_raw_file from Chapter 1, which receives a db object) might do something like this:

# Conceptual Python snippet (inside a handler function)
# from database.models import Files # Assuming Files model is imported
# db is the SQLAlchemy session passed to the function

# Create a new Python object representing the file record
new_file_entry = Files(file_name="new_report.pdf", status="PROCESSING")

# Add this new object to the session (tell SQLAlchemy we want to save it)
db.session.add(new_file_entry)

# Commit the session (actually save all changes to the database)
db.session.commit()

# What happens: A new row is inserted into the 'files' table in PostgreSQL.

This is much friendlier than writing raw SQL like INSERT INTO files (file_name, status) VALUES ('new_report.pdf', 'PROCESSING');. SQLAlchemy handles that translation for us.

Example: Updating a File's Status If the processing of new_report.pdf finishes successfully:

# Conceptual Python snippet (inside a handler function)
# file_to_update = db.session.query(Files).filter_by(file_name="new_report.pdf").first()

# if file_to_update:
# file_to_update.status = "SUCCESS" # Change the status
# db.session.commit() # Save the change

# What happens: The 'status' column for the 'new_report.pdf' row
# in the 'files' table is updated to 'SUCCESS'.

Here, db.session.query(Files).filter_by(...).first() is SQLAlchemy's way of saying, "Find me the first record in the Files table where the file_name is 'new_report.pdf'."

Connecting to the Database:

You might wonder, "How does the application know where the PostgreSQL database is and how to log in?" This information (like the database address, username, and password) is securely stored as part of the application's Chapter 6: Configuration Management. When the application starts, it uses this configuration to establish the connection that SQLAlchemy then uses.

When the cloud-accelerator-function is first deployed, scripts (like those in run/scripts/) might also help set up the database, and SQLAlchemy can even create all the tables based on your model definitions if they don't already exist.

Conclusion

You've now learned that the cloud-accelerator-function isn't just a data processor; it has a memory! This memory, implemented as a PostgreSQL database managed with SQLAlchemy, is vital. It stores metadata about:

  • File processing states.
  • Data table definitions.
  • Details of external system integrations.
  • A complete history of task executions.

This persistent metadata allows the system to track progress, audit its actions, recover from interruptions, and operate reliably and efficiently (hello, idempotency!). It's the backbone that supports the complex data pipelines we're building.

So far, we've seen how messages arrive (Chapter 1), how raw data is processed (Chapter 2), and how the system remembers everything (this chapter). But often, processing data isn't just one step; it's a series of coordinated actions or "tasks." In our next chapter, we'll explore Chapter 4: Task Definition & Execution Workflow to see how these multi-step processes are defined and managed.