Partitioning Data
Introduction
In the from scratch tutorial, you learned how to define a raw table and aggregate daily sales. But what if you need to optimize your partitions for performance and cost-efficiency?
In this guide, we’ll extend the existing pipeline by aggregating daily transaction data into monthly partitions. You’ll learn how to:
- Transition from day-based to month-based partitions.
- Use Datoria to import data from daily partitions into monthly ones in a safe manner
- Build confidence that the aggregation logic aligns with your business needs.
Prerequisites
Ensure you’ve completed the "From Scratch" tutorial:
- Raw transactions table (
raw_transactions.ts
). - Daily aggregation job (
sales_aggregation.ts
).
Step 1: Define the Monthly Partitioned Table
To store the monthly aggregated results, define a table partitioned by month:
- Create a new file:
monthly_sales_aggregation.ts
- Define the table with monthly partitioning:
import {
BQField,
BQFieldType,
BQPartition,
BQPartitioning,
BQTable,
} from "@datoria/sdk";
import { shopDataset } from "../../tutorials/from-scratch.md/shop";
export const monthlySalesTable: BQTable<BQPartition.Month> = BQTable({
dataset: shopDataset,
name: "monthly_sales",
partitioning: BQPartitioning.Month("transaction_month"),
schema: [
BQField("transaction_month", BQFieldType.Date, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("total_sales", BQFieldType.Float, "NULLABLE"),
],
});
What’s New Here?
- We define
transaction_month
as the partitioning column usingBQPartitioning.Month
. - Partitions are aligned to the first day of each month.
Step 2: Define the Monthly Aggregation Job
Next, create a job to aggregate daily transactions into monthly partitions.
import {
BQPartition,
TransformationJob,
sql,
BQExpr,
Invalidation,
} from "@datoria/sdk";
import { monthlySalesTable } from "./monthly_sales_aggregation";
import { transactionsTable } from "../../tutorials/from-scratch.md/raw_transactions";
export const monthlySalesAggregationJob: TransformationJob<BQPartition.Month> =
TransformationJob({
name: "monthly-sales-agg",
destination: monthlySalesTable,
query: sql`
SELECT
${BQExpr.ExecutionMonth} AS transaction_month,
product_id,
SUM(price_per_unit * quantity) AS total_sales
FROM ${transactionsTable.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
Invalidation.UpstreamChangedAndClockPassed("Europe/Oslo", "08:00"),
)}
GROUP BY 1, 2
`,
startPartition: BQPartition.Month(2025, 1),
});
Key Points:
- Input Range:
ExecutionMonth.atFirstDayOfMonth()
ensures processing starts at the beginning of the month.atLastDayOfMonth()
marks the end of the range.
- Invalidation Window:
- Ensures consistency by invalidating and reprocessing partitions after
08:00
when upstream changes occur.
- Ensures consistency by invalidating and reprocessing partitions after
Step 3: Migrate and Run the Updated Pipeline
-
Preview Changes:
Confirm the deployment plan includes the new table and job:datoria migrate --plan
-
Migrate:
Apply the changes:datoria migrate
-
Run the Monthly Job:
Execute the monthly aggregation job:datoria run monthly-sales-agg --now
Step 4: Verify the Results
-
Query the Table:
In the BigQuery Console, verify the monthly aggregated results:SELECT * FROM `shop.monthly_sales`;
-
Example Output:
transaction_month product_id total_sales 2024-12-01 prod1 210.0 2024-12-01 prod2 200.0
What Did We Achieve?
- Improved Performance: Transitioned from daily to monthly partitions for reduced query overhead.
- Pipeline Correctness: Ensured partitions are complete, consistent, and validated.
- Scalability: Created a foundation for adding other time-based aggregations.