Skip to main content

Partitioning Data

Introduction

In the from scratch tutorial, you learned how to define a raw table and aggregate daily sales. But what if you need to optimize your partitions for performance and cost-efficiency?

In this guide, we’ll extend the existing pipeline by aggregating daily transaction data into monthly partitions. You’ll learn how to:

  1. Transition from day-based to month-based partitions.
  2. Use Datoria to import data from daily partitions into monthly ones in a safe manner
  3. Build confidence that the aggregation logic aligns with your business needs.

Prerequisites

Ensure you’ve completed the "From Scratch" tutorial:

  • Raw transactions table (raw_transactions.ts).
  • Daily aggregation job (sales_aggregation.ts).

Step 1: Define the Monthly Partitioned Table

To store the monthly aggregated results, define a table partitioned by month:

  1. Create a new file: monthly_sales_aggregation.ts
  2. Define the table with monthly partitioning:
import {
BQField,
BQFieldType,
BQPartition,
BQPartitioning,
BQTable,
} from "@datoria/sdk";
import { shopDataset } from "../../tutorials/from-scratch.md/shop";

export const monthlySalesTable: BQTable<BQPartition.Month> = BQTable({
dataset: shopDataset,
name: "monthly_sales",
partitioning: BQPartitioning.Month("transaction_month"),
schema: [
BQField("transaction_month", BQFieldType.Date, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("total_sales", BQFieldType.Float, "NULLABLE"),
],
});

What’s New Here?

  • We define transaction_month as the partitioning column using BQPartitioning.Month.
  • Partitions are aligned to the first day of each month.

Step 2: Define the Monthly Aggregation Job

Next, create a job to aggregate daily transactions into monthly partitions.

import {
BQPartition,
TransformationJob,
sql,
BQExpr,
Invalidation,
} from "@datoria/sdk";
import { monthlySalesTable } from "./monthly_sales_aggregation";
import { transactionsTable } from "../../tutorials/from-scratch.md/raw_transactions";

export const monthlySalesAggregationJob: TransformationJob<BQPartition.Month> =
TransformationJob({
name: "monthly-sales-agg",
destination: monthlySalesTable,
query: sql`
SELECT
${BQExpr.ExecutionMonth} AS transaction_month,
product_id,
SUM(price_per_unit * quantity) AS total_sales
FROM ${transactionsTable.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
Invalidation.UpstreamChangedAndClockPassed("Europe/Oslo", "08:00"),
)}
GROUP BY 1, 2
`,
startPartition: BQPartition.Month(2025, 1),
});

Key Points:

  1. Input Range:
    • ExecutionMonth.atFirstDayOfMonth() ensures processing starts at the beginning of the month.
    • atLastDayOfMonth() marks the end of the range.
  2. Invalidation Window:
    • Ensures consistency by invalidating and reprocessing partitions after 08:00 when upstream changes occur.

Step 3: Migrate and Run the Updated Pipeline

  1. Preview Changes:
    Confirm the deployment plan includes the new table and job:

    datoria migrate --plan
  2. Migrate:
    Apply the changes:

    datoria migrate
  3. Run the Monthly Job:
    Execute the monthly aggregation job:

    datoria run monthly-sales-agg --now

Step 4: Verify the Results

  1. Query the Table:
    In the BigQuery Console, verify the monthly aggregated results:

    SELECT * FROM `shop.monthly_sales`;
  2. Example Output:

    transaction_monthproduct_idtotal_sales
    2024-12-01prod1210.0
    2024-12-01prod2200.0

What Did We Achieve?

  1. Improved Performance: Transitioned from daily to monthly partitions for reduced query overhead.
  2. Pipeline Correctness: Ensured partitions are complete, consistent, and validated.
  3. Scalability: Created a foundation for adding other time-based aggregations.