From scratch
Starting from Scratch: Retail Analytics Pipeline
In this tutorial, you’ll build a simple retail analytics pipeline to analyze sales data. You’ll define a source table for raw transactions, a transformation job to calculate total sales per product, and a destination table for storing aggregated results. This domain is extensible, allowing you to add customer segmentation, inventory tracking, and other use cases later.
Part 1: Develop a Simple Retail Analytics Pipeline
Step 1: Prerequisites
Make sure you have completed the Getting Started section, ensuring you have:
- A GCP project authenticated with
gcloud
. - Datoria CLI installed.
- VS Code with the Datoria extension installed.
If you have existing relations in you want to work with, you should follow From existing database instead
Step 2: Initialize Your Datoria Project
- Open a terminal and navigate to your project directory.
- Run:
datoria workspace init
- Provide the required configuration:
- GCP Project: Enter a project ID.
- BiqQuery Location: Specify the dataset location. We're default to
EU
in this documentation. A comprehensive list of locations can be found at BigQuery locations
These values are not used when actually importing data, but are used during development.
This creates a datoria-config.json
file for your project.
Step 3: Define a Dataset
We'll need a place to store the data. Let's define a dataset for our shop:
- Create a new file named
shop.ts
. - Define a dataset for our shop:
import { BQDataset } from "@datoria/sdk";
export const shopDataset = BQDataset({
project: "datoria-test",
name: "shop",
location: "EU",
});
Replace datoria-test
with your GCP project ID.
You can set a bunch of interesting things on BQDataset
, but we'll just set the (required) location for now.
Step 4: Define the Source Table (Raw Transactions)
We need a table which will serve as a source table.
- Create a new file named
raw_transactions.ts
. - Define a table for raw transaction data:
import {
BQFieldType,
BQField,
BQPartitioning,
BQSchema,
BQTable,
} from "@datoria/sdk";
import { shopDataset } from "./shop";
const schema: BQSchema = [
BQField("transaction_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("quantity", BQFieldType.Integer, "REQUIRED"),
BQField("price_per_unit", BQFieldType.Float, "REQUIRED"),
BQField("transaction_date", BQFieldType.DateTime, "REQUIRED"),
];
export const transactionsTable = BQTable({
dataset: shopDataset,
name: "raw_transactions",
partitioning: BQPartitioning.Day("transaction_date"),
schema,
});
Step 4: Define the Transformation Job (Sales Aggregation)
- Create a new file named
sales_aggregation.ts
. - Define a job that calculates total sales per product:
import {
BQFieldType,
BQField,
BQPartitioning,
BQSchema,
BQTable,
TransformationJob,
sql,
} from "@datoria/sdk";
import { shopDataset } from "./shop";
import { transactionsTable } from "./raw_transactions";
const schema: BQSchema = [
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("total_sales", BQFieldType.Float, "NULLABLE"),
];
const destination = BQTable({
dataset: shopDataset,
name: "aggregated_sales",
partitioning: BQPartitioning.Unpartitioned,
schema,
});
export const salesAggregationJob = TransformationJob({
name: "sales-agg",
destination,
query: sql`
SELECT
product_id,
SUM(price_per_unit * quantity) AS total_sales
FROM ${transactionsTable.allPartitions()}
GROUP BY product_id
`,
});
Highlights of the Datoria SDK:
- SQL Code Completion: Auto-complete for table and column names while writing SQL queries.
- SQL Rendering: View the exact SQL query generated for execution.
- Inline Documentation: Hover over SDK methods for usage details.
Part 2: Migrate and Run the Pipeline
Step 1: Verify the migration plan
- Preview the changes to be applied:
datoria migrate --plan
- Review the tables and jobs being created.
Step 2: Migrate the Pipeline
Apply the changes to BigQuery:
datoria migrate
Step 3: Insert some sample data
- Open the BigQuery Console.
- Insert some test data
INSERT INTO `shop.raw_transactions` (transaction_id, product_id, quantity, price_per_unit, transaction_date)
VALUES
("txn1", "prod1", 2, 10.5, "2024-12-13T10:00:00"),
("txn2", "prod2", 1, 20.0, "2024-12-13T11:00:00");
Step 4: Execute the Pipeline
Run all jobs in your pipeline:
datoria run
- Datoria executes the transformation job, creating the
aggregated_sales
table. - The rendered SQL query and execution logs are displayed for debugging.
The run command does check if there is anything that needs to be executed. If you run the command multiple times without inserting any new rows then it will not execute any jobs.
Step 5: Verify the Results
- Open the BigQuery Console.
- Query the
aggregated_sales
table:SELECT * FROM `shop.aggregated_sales`;
- Confirm the output:
product_id total_sales prod1 21.0 prod2 20.0
Next Steps
Now that you have a working retail analytics pipeline, consider extending it with these ideas:
- Partitions and dependencies Create a chain of jobs that process a daily partition
- Invalidation Play around with invalidation. Can be a great to look at this in combination with an aggregate job.