Skip to main content

From scratch

Starting from Scratch: Retail Analytics Pipeline

In this tutorial, you’ll build a simple retail analytics pipeline to analyze sales data. You’ll define a source table for raw transactions, a transformation job to calculate total sales per product, and a destination table for storing aggregated results. This domain is extensible, allowing you to add customer segmentation, inventory tracking, and other use cases later.


Part 1: Develop a Simple Retail Analytics Pipeline

Step 1: Prerequisites

Make sure you have completed the Getting Started section, ensuring you have:

  • A GCP project authenticated with gcloud.
  • Datoria CLI installed.
  • VS Code with the Datoria extension installed.
info

If you have existing relations in you want to work with, you should follow From existing database instead


Step 2: Initialize Your Datoria Project

  1. Open a terminal and navigate to your project directory.
  2. Run:
datoria workspace init
  1. Provide the required configuration:
    • GCP Project: Enter a project ID.
    • BiqQuery Location: Specify the dataset location. We're default to EU in this documentation. A comprehensive list of locations can be found at BigQuery locations

These values are not used when actually importing data, but are used during development.

This creates a datoria-config.json file for your project.


Step 3: Define a Dataset

We'll need a place to store the data. Let's define a dataset for our shop:

  1. Create a new file named shop.ts.
  2. Define a dataset for our shop:
import { BQDataset } from "@datoria/sdk";

export const shopDataset = BQDataset({
project: "datoria-test",
name: "shop",
location: "EU",
});

Replace datoria-test with your GCP project ID.

You can set a bunch of interesting things on BQDataset, but we'll just set the (required) location for now.


Step 4: Define the Source Table (Raw Transactions)

We need a table which will serve as a source table.

  1. Create a new file named raw_transactions.ts.
  2. Define a table for raw transaction data:
import {
BQFieldType,
BQField,
BQPartitioning,
BQSchema,
BQTable,
} from "@datoria/sdk";
import { shopDataset } from "./shop";

const schema: BQSchema = [
BQField("transaction_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("quantity", BQFieldType.Integer, "REQUIRED"),
BQField("price_per_unit", BQFieldType.Float, "REQUIRED"),
BQField("transaction_date", BQFieldType.DateTime, "REQUIRED"),
];

export const transactionsTable = BQTable({
dataset: shopDataset,
name: "raw_transactions",
partitioning: BQPartitioning.Day("transaction_date"),
schema,
});

Step 4: Define the Transformation Job (Sales Aggregation)

  1. Create a new file named sales_aggregation.ts.
  2. Define a job that calculates total sales per product:
import {
BQFieldType,
BQField,
BQPartitioning,
BQSchema,
BQTable,
TransformationJob,
sql,
} from "@datoria/sdk";

import { shopDataset } from "./shop";
import { transactionsTable } from "./raw_transactions";

const schema: BQSchema = [
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("total_sales", BQFieldType.Float, "NULLABLE"),
];

const destination = BQTable({
dataset: shopDataset,
name: "aggregated_sales",
partitioning: BQPartitioning.Unpartitioned,
schema,
});

export const salesAggregationJob = TransformationJob({
name: "sales-agg",
destination,
query: sql`
SELECT
product_id,
SUM(price_per_unit * quantity) AS total_sales
FROM ${transactionsTable.allPartitions()}
GROUP BY product_id
`,
});

Highlights of the Datoria SDK:

  • SQL Code Completion: Auto-complete for table and column names while writing SQL queries.
  • SQL Rendering: View the exact SQL query generated for execution.
  • Inline Documentation: Hover over SDK methods for usage details.

Part 2: Migrate and Run the Pipeline

Step 1: Verify the migration plan

  1. Preview the changes to be applied:
    datoria migrate --plan
  2. Review the tables and jobs being created.

Step 2: Migrate the Pipeline

Apply the changes to BigQuery:

datoria migrate

Step 3: Insert some sample data

  1. Open the BigQuery Console.
  2. Insert some test data
    INSERT INTO `shop.raw_transactions` (transaction_id, product_id, quantity, price_per_unit, transaction_date)
    VALUES
    ("txn1", "prod1", 2, 10.5, "2024-12-13T10:00:00"),
    ("txn2", "prod2", 1, 20.0, "2024-12-13T11:00:00");

Step 4: Execute the Pipeline

Run all jobs in your pipeline:

datoria run
  • Datoria executes the transformation job, creating the aggregated_sales table.
  • The rendered SQL query and execution logs are displayed for debugging.
info

The run command does check if there is anything that needs to be executed. If you run the command multiple times without inserting any new rows then it will not execute any jobs.


Step 5: Verify the Results

  1. Open the BigQuery Console.
  2. Query the aggregated_sales table:
    SELECT * FROM `shop.aggregated_sales`;
  3. Confirm the output:
    product_idtotal_sales
    prod121.0
    prod220.0

Next Steps

Now that you have a working retail analytics pipeline, consider extending it with these ideas:

  • Partitions and dependencies Create a chain of jobs that process a daily partition
  • Invalidation Play around with invalidation. Can be a great to look at this in combination with an aggregate job.