Jobs
Jobs are the workhorses of your data pipeline in Datoria. A job defines how data should be transformed from one form to another using a declarative approach.
TransformationJob Basics
At its core, a TransformationJob
consists of:
- A query that defines the transformation logic
- A destination table where results will be written
- Dependencies on source data (implicitly defined in the query)
- Optional settings like startPartition and metadata
Here's a basic example:
import { BQPartition, TransformationJob, sql } from "@datoria/sdk";
import {
analyticsDataset,
dailyEventsTable,
} from "../../core-concepts/tables-and-partitioning.md";
// Define our destination table
const userStatsTable = BQTable({
dataset: analyticsDataset,
name: "user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("event_count", BQFieldType.Integer, "REQUIRED"),
],
});
const simpleQuery = sql`
SELECT
event_date as date,
user_id,
COUNT(*) as event_count
FROM ${dailyEventsTable.atExecutionDate()}
GROUP BY event_date, user_id
`;
export const simpleStatsJob = TransformationJob({
name: "daily_user_stats",
destination: userStatsTable,
query: simpleQuery,
startPartition: BQPartition.Day(2025, 1, 1),
});
Creating a Complete Job
Let's build a more comprehensive example. We'll extract "flower" orders from a date-partitioned source table and write them into a dedicated destination table.
1. Define Source Tables
import {
BQTable,
BQField,
BQFieldType,
BQPartitioning,
BQSourceTable,
} from "@datoria/sdk";
const ordersTable = BQTable({
dataset: analyticsDataset,
name: "orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
],
});
const productMetadataTable = BQSourceTable({
dataset: analyticsDataset,
name: "product_metadata",
location: analyticsDataset.location,
partitioning: BQPartitioning.Unpartitioned,
schema: [
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
BQField("category", BQFieldType.String, "REQUIRED"),
],
});
2. Define Destination Table
const flowerOrdersTable = BQTable({
dataset: analyticsDataset,
name: "flower_orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
],
});
3. Define Transformation Query
const transformQuery = sql`
SELECT o.date, o.user_id, o.product_id, m.name
FROM ${ordersTable.atExecutionDate()} o
JOIN ${productMetadataTable.allPartitions()} m USING (product_id)
WHERE m.category = 'flowers'
`;
4. Define the Job
export const flowerOrdersJob = TransformationJob({
name: "flower_orders-job",
destination: flowerOrdersTable,
query: transformQuery,
startPartition: BQPartition.Day(2025, 1, 1),
metadata: {
description: "Extracts flower orders from the orders table",
owner: "retail-analytics-team",
},
});
Understanding Partition Dependencies
In the example above, the job:
- Reads from
ordersTable.atExecutionDate()
, processing one day partition at a time - Joins with
productMetadataTable.allPartitions()
, an unpartitioned reference table - Writes results to the corresponding date partition in
flowerOrdersTable
This granular approach ensures that only the necessary data is processed and that dependencies are correctly tracked.
Start Partition Inference
Datoria needs to know which partition should be the first for each job. There are two ways to handle this:
1. Explicit Definition
For jobs with no dependencies on other jobs, set startPartition
explicitly:
startPartition: BQPartition.Day(2025, 1, 1);
2. Automatic Inference
For jobs that depend on other jobs, Datoria can infer the start partition automatically:
import { BQExpr } from "@datoria/sdk";
export const flowerOrdersYesterdayJob = TransformationJob({
name: "flower_orders_yesterday_job",
destination: BQTable({
dataset: analyticsDataset,
name: "flower_orders_yesterday",
partitioning: BQPartitioning.Day("date"),
schema: flowerOrdersTable.schema,
}),
query: sql`
SELECT x.*
FROM ${flowerOrdersJob.at(BQExpr.ExecutionDate.dayMinusDays(1))} x
`,
// No startPartition needed - will be inferred from dependencies
});
In this case, Datoria knows that flowerOrdersJob
starts at 2025-01-01
, so the earliest partition for
flowerOrdersYesterdayJob
will be 2025-01-02
.
With multiple dependencies, Datoria takes the latest start partition from all dependencies.
Common Job Patterns
Daily Aggregation
// Define a table representing user sessions
const userSessionsTable = BQTable({
dataset: analyticsDataset,
name: "user_sessions",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("session_duration", BQFieldType.Integer, "REQUIRED"),
],
});
// Define the destination table for our example
const dailyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "daily_user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("daily_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("total_session_time", BQFieldType.Integer, "REQUIRED"),
],
});
export const dailyAggregationJob = TransformationJob({
name: "daily_user_stats_job",
destination: dailyUserStatsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(DISTINCT user_id) as daily_active_users,
SUM(session_duration) as total_session_time
FROM ${userSessionsTable.atExecutionDate()}
GROUP BY date
`,
startPartition: BQPartition.Day(2025, 1, 1),
});
Monthly Rollup from Daily Data
// Define the destination table
const monthlyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "monthly_user_stats",
partitioning: BQPartitioning.Month("month"),
schema: [
BQField("month", BQFieldType.Date, "REQUIRED"),
BQField("monthly_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("monthly_session_time", BQFieldType.Integer, "REQUIRED"),
],
});
export const monthlyUserStatsJob = TransformationJob({
name: "monthly_user_stats_job",
destination: monthlyUserStatsTable,
query: sql`
SELECT
DATE_TRUNC(${BQExpr.ExecutionMonth.atFirstDayOfMonth()}, MONTH) as month,
SUM(daily_active_users) as monthly_active_users,
SUM(total_session_time) as monthly_session_time
FROM ${dailyAggregationJob.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`,
startPartition: BQPartition.Month(2025, 1),
});
Data Quality Checks
// Define the destination table
const dataQualityResultsTable = BQTable({
dataset: analyticsDataset,
name: "data_quality_results",
partitioning: BQPartitioning.Day("check_date"),
schema: [
BQField("check_date", BQFieldType.Date, "REQUIRED"),
BQField("table_name", BQFieldType.String, "REQUIRED"),
BQField("status", BQFieldType.String, "REQUIRED"),
BQField("check_description", BQFieldType.String, "REQUIRED"),
],
});
export const dataQualityJob = TransformationJob({
name: "data_quality_checks_job",
destination: dataQualityResultsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as check_date,
'user_events' as table_name,
CASE WHEN COUNT(*) = 0 THEN 'FAILED' ELSE 'PASSED' END as status,
'No data received' as check_description
FROM ${dailyEventsTable.atExecutionDate()}
`,
startPartition: BQPartition.Day(2025, 1, 1),
});
Job Metadata
Jobs can include metadata to provide additional context:
export const jobWithMetadata = {
...dataQualityJob,
name: "data_quality_checks_job-ctx",
metadata: {
description: "Daily aggregation of user statistics",
owner: "data-platform-team",
slack_channel: "#data-alerts",
criticality: "high",
sla_minutes: 120,
tags: ["user-analytics", "daily-metrics"],
webhookURI: "https://api.example.com/job-notifications",
},
};
This metadata can be used by external systems, documentation generators, or custom extensions.
Testing Jobs
Jobs can be tested using Datoria's unit testing framework:
import { UnitTest, TestRelation } from "@datoria/sdk";
export const flowerOrdersTest = UnitTest({
testSubject: flowerOrdersJob,
testRelations: [
TestRelation(
ordersTable,
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
},
{
date: "2025-01-01",
user_id: "user2",
product_id: "prod2",
},
),
TestRelation(
productMetadataTable,
{
product_id: "prod1",
name: "Rose Bouquet",
category: "flowers",
},
{
product_id: "prod2",
name: "Desk Lamp",
category: "home",
},
),
],
assertEquals: [
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
name: "Rose Bouquet",
},
],
});
See the Unit Testing section for more details.
Running Jobs
To run a job, use the Datoria CLI:
datoria run job-name
Or to run all jobs:
datoria run
See the Running Jobs section for more details.
Best Practices
- Use descriptive names for jobs to make them easy to identify
- Include metadata like owner and description for documentation
- Set explicit startPartition for jobs without dependencies on other jobs
- Define job-specific destination tables rather than sharing destinations
- Test jobs with unit tests to verify their behavior