Jobs
Jobs are the workhorses of your data pipeline in Datoria. A job defines how data should be transformed from one form to another using a declarative approach.
TransformationJob Basics
At its core, a TransformationJob
consists of:
- A query that defines the transformation logic
- A destination table where results will be written
- Dependencies on source data (implicitly defined in the query)
- Optional settings like startPartition and metadata
Here's a basic example:
import { BQPartition, TransformationJob, sql } from "@datoria/sdk";
import {
analyticsDataset,
dailyEventsTable,
} from "../../core-concepts/tables-and-partitioning.md";
// Define our destination table
const userStatsTable = BQTable({
dataset: analyticsDataset,
name: "user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("event_count", BQFieldType.Integer, "REQUIRED"),
],
});
const simpleQuery = sql`
SELECT
event_date as date,
user_id,
COUNT(*) as event_count
FROM ${dailyEventsTable.atExecutionDate()}
GROUP BY event_date, user_id
`;
export const simpleStatsJob = TransformationJob({
name: "daily_user_stats",
destination: userStatsTable,
query: simpleQuery,
startPartition: BQPartition.Day(2025, 1, 1),
});
Creating a Complete Job
Let's build a more comprehensive example. We'll extract "flower" orders from a date-partitioned source table and write them into a dedicated destination table.
1. Define Source Tables
import {
BQTable,
BQField,
BQFieldType,
BQPartitioning,
BQSourceTable,
} from "@datoria/sdk";
const ordersTable = BQTable({
dataset: analyticsDataset,
name: "orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
],
});
const productMetadataTable = BQSourceTable({
dataset: analyticsDataset,
name: "product_metadata",
location: analyticsDataset.location,
partitioning: BQPartitioning.Unpartitioned,
schema: [
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
BQField("category", BQFieldType.String, "REQUIRED"),
],
});
2. Define Destination Table
const flowerOrdersTable = BQTable({
dataset: analyticsDataset,
name: "flower_orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
],
});
3. Define Transformation Query
const transformQuery = sql`
SELECT o.date, o.user_id, o.product_id, m.name
FROM ${ordersTable.atExecutionDate()} o
JOIN ${productMetadataTable.allPartitions()} m USING (product_id)
WHERE m.category = 'flowers'
`;
4. Define the Job
export const flowerOrdersJob = TransformationJob({
name: "flower_orders-job",
destination: flowerOrdersTable,
query: transformQuery,
startPartition: BQPartition.Day(2025, 1, 1),
metadata: {
description: "Extracts flower orders from the orders table",
owner: "retail-analytics-team",
},
});
Understanding Partition Dependencies
In the example above, the job:
- Reads from
ordersTable.atExecutionDate()
, processing one day partition at a time - Joins with
productMetadataTable.allPartitions()
, an unpartitioned reference table - Writes results to the corresponding date partition in
flowerOrdersTable
This granular approach ensures that only the necessary data is processed and that dependencies are correctly tracked.
Start Partition Inference
Datoria needs to know which partition should be the first for each job. There are two ways to handle this:
1. Explicit Definition
For jobs with no dependencies on other jobs, set startPartition
explicitly:
startPartition: BQPartition.Day(2025, 1, 1);
2. Automatic Inference
For jobs that depend on other jobs, Datoria can infer the start partition automatically:
import { BQExpr } from "@datoria/sdk";
export const flowerOrdersYesterdayJob = TransformationJob({
name: "flower_orders_yesterday_job",
destination: BQTable({
dataset: analyticsDataset,
name: "flower_orders_yesterday",
partitioning: BQPartitioning.Day("date"),
schema: flowerOrdersTable.schema,
}),
query: sql`
SELECT x.*
FROM ${flowerOrdersJob.at(BQExpr.ExecutionDate.dayMinusDays(1))} x
`,
// No startPartition needed - will be inferred from dependencies
});
In this case, Datoria knows that flowerOrdersJob
starts at 2025-01-01
, so the earliest partition for
flowerOrdersYesterdayJob
will be 2025-01-02
.
With multiple dependencies, Datoria takes the latest start partition from all dependencies.
Start Partition and Partition Expiration
When working with tables that have partition expiration settings, Datoria automatically harmonizes start partition specifications with what data actually exists:
Automatic Adjustment for Expired Partitions
Datoria calculates an effective start partition that considers both:
- The explicitly defined start partition (or the one inferred from dependencies)
- The oldest available partition is based on what hasn't yet expired
This means you can set a historical start partition date once and never worry about updating it as older partitions expire. Datoria will intelligently begin processing from the oldest available partition.
Multiple Dependencies with Different Expiration Settings
For jobs that depend on multiple tables with different partition expiration settings, Datoria takes the most conservative approach. It will only process partitions where all required source data is still available.
This ensures data integrity by preventing partial joins or incomplete processing that could occur if only some of the source data had expired.
Benefits of This Approach
This automatic handling provides several advantages:
- No need to manually update start partitions as data ages out
- Prevents job failures due to references to expired partitions
- Eliminates the need to track and synchronize expiration dates across your codebase
- Makes your pipeline definitions more stable and maintainable
With Datoria, your data pipeline continues to operate smoothly even as the window of available data shifts over time due to partition expiration.
Common Job Patterns
Daily Aggregation
// Define a table representing user sessions
const userSessionsTable = BQTable({
dataset: analyticsDataset,
name: "user_sessions",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("session_duration", BQFieldType.Integer, "REQUIRED"),
],
});
// Define the destination table for our example
const dailyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "daily_user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("daily_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("total_session_time", BQFieldType.Integer, "REQUIRED"),
],
});
export const dailyAggregationJob = TransformationJob({
name: "daily_user_stats_job",
destination: dailyUserStatsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(DISTINCT user_id) as daily_active_users,
SUM(session_duration) as total_session_time
FROM ${userSessionsTable.atExecutionDate()}
GROUP BY date
`,
startPartition: BQPartition.Day(2025, 1, 1),
});
Monthly Rollup from Daily Data
// Define the destination table
const monthlyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "monthly_user_stats",
partitioning: BQPartitioning.Month("month"),
schema: [
BQField("month", BQFieldType.Date, "REQUIRED"),
BQField("monthly_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("monthly_session_time", BQFieldType.Integer, "REQUIRED"),
],
});
export const monthlyUserStatsJob = TransformationJob({
name: "monthly_user_stats_job",
destination: monthlyUserStatsTable,
query: sql`
SELECT
DATE_TRUNC(${BQExpr.ExecutionMonth.atFirstDayOfMonth()}, MONTH) as month,
SUM(daily_active_users) as monthly_active_users,
SUM(total_session_time) as monthly_session_time
FROM ${dailyAggregationJob.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`,
startPartition: BQPartition.Month(2025, 1),
});
Job Metadata
Jobs can include metadata to provide additional context:
export const jobWithMetadata = {
...monthlyUserStatsJob,
name: "monthly_user_stats_job-ctx",
metadata: {
description: "Montly aggregation of user statistics",
owner: "data-platform-team",
slack_channel: "#data-alerts",
criticality: "high",
sla_minutes: 120,
tags: ["user-analytics", "daily-metrics"],
webhookURI: "https://api.example.com/job-notifications",
},
};
This metadata can be used by external systems, documentation generators, or custom extensions.
Testing Jobs
Jobs can be tested using Datoria's unit testing framework:
import { UnitTest, TestRelation } from "@datoria/sdk";
export const flowerOrdersTest = UnitTest({
testSubject: flowerOrdersJob,
testRelations: [
TestRelation(
ordersTable,
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
},
{
date: "2025-01-01",
user_id: "user2",
product_id: "prod2",
},
),
TestRelation(
productMetadataTable,
{
product_id: "prod1",
name: "Rose Bouquet",
category: "flowers",
},
{
product_id: "prod2",
name: "Desk Lamp",
category: "home",
},
),
],
assertEquals: [
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
name: "Rose Bouquet",
},
],
});
See the Unit Testing section for more details.
Running Jobs
To run a job, use the Datoria CLI:
datoria run job-name
Or to run all jobs:
datoria run
See the Running Jobs section for more details.
Best Practices
- Use descriptive names for jobs to make them easy to identify
- Include metadata like owner and description for documentation
- Set explicit startPartition for jobs without dependencies on other jobs
- Define job-specific destination tables rather than sharing destinations
- Test jobs with unit tests to verify their behavior