Skip to main content

Jobs

Jobs are the workhorses of your data pipeline in Datoria. A job defines how data should be transformed from one form to another using a declarative approach.

TransformationJob Basics

At its core, a TransformationJob consists of:

  • A query that defines the transformation logic
  • A destination table where results will be written
  • Dependencies on source data (implicitly defined in the query)
  • Optional settings like startPartition and metadata

Here's a basic example:

import { BQPartition, TransformationJob, sql } from "@datoria/sdk";
import {
analyticsDataset,
dailyEventsTable,
} from "../../core-concepts/tables-and-partitioning.md";

// Define our destination table
const userStatsTable = BQTable({
dataset: analyticsDataset,
name: "user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("event_count", BQFieldType.Integer, "REQUIRED"),
],
});

const simpleQuery = sql`
SELECT
event_date as date,
user_id,
COUNT(*) as event_count
FROM ${dailyEventsTable.atExecutionDate()}
GROUP BY event_date, user_id
`;

export const simpleStatsJob = TransformationJob({
name: "daily_user_stats",
destination: userStatsTable,
query: simpleQuery,
startPartition: BQPartition.Day(2025, 1, 1),
});

Creating a Complete Job

Let's build a more comprehensive example. We'll extract "flower" orders from a date-partitioned source table and write them into a dedicated destination table.

1. Define Source Tables

import {
BQTable,
BQField,
BQFieldType,
BQPartitioning,
BQSourceTable,
} from "@datoria/sdk";

const ordersTable = BQTable({
dataset: analyticsDataset,
name: "orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
],
});

const productMetadataTable = BQSourceTable({
dataset: analyticsDataset,
name: "product_metadata",
location: analyticsDataset.location,
partitioning: BQPartitioning.Unpartitioned,
schema: [
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
BQField("category", BQFieldType.String, "REQUIRED"),
],
});

2. Define Destination Table

const flowerOrdersTable = BQTable({
dataset: analyticsDataset,
name: "flower_orders",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("product_id", BQFieldType.String, "REQUIRED"),
BQField("name", BQFieldType.String, "REQUIRED"),
],
});

3. Define Transformation Query

const transformQuery = sql`
SELECT o.date, o.user_id, o.product_id, m.name
FROM ${ordersTable.atExecutionDate()} o
JOIN ${productMetadataTable.allPartitions()} m USING (product_id)
WHERE m.category = 'flowers'
`;

4. Define the Job

export const flowerOrdersJob = TransformationJob({
name: "flower_orders-job",
destination: flowerOrdersTable,
query: transformQuery,
startPartition: BQPartition.Day(2025, 1, 1),
metadata: {
description: "Extracts flower orders from the orders table",
owner: "retail-analytics-team",
},
});

Understanding Partition Dependencies

In the example above, the job:

  • Reads from ordersTable.atExecutionDate(), processing one day partition at a time
  • Joins with productMetadataTable.allPartitions(), an unpartitioned reference table
  • Writes results to the corresponding date partition in flowerOrdersTable

This granular approach ensures that only the necessary data is processed and that dependencies are correctly tracked.

Start Partition Inference

Datoria needs to know which partition should be the first for each job. There are two ways to handle this:

1. Explicit Definition

For jobs with no dependencies on other jobs, set startPartition explicitly:

startPartition: BQPartition.Day(2025, 1, 1);

2. Automatic Inference

For jobs that depend on other jobs, Datoria can infer the start partition automatically:

import { BQExpr } from "@datoria/sdk";

export const flowerOrdersYesterdayJob = TransformationJob({
name: "flower_orders_yesterday_job",
destination: BQTable({
dataset: analyticsDataset,
name: "flower_orders_yesterday",
partitioning: BQPartitioning.Day("date"),
schema: flowerOrdersTable.schema,
}),
query: sql`
SELECT x.*
FROM ${flowerOrdersJob.at(BQExpr.ExecutionDate.dayMinusDays(1))} x
`,
// No startPartition needed - will be inferred from dependencies
});

In this case, Datoria knows that flowerOrdersJob starts at 2025-01-01, so the earliest partition for flowerOrdersYesterdayJob will be 2025-01-02.

With multiple dependencies, Datoria takes the latest start partition from all dependencies.

Start Partition and Partition Expiration

When working with tables that have partition expiration settings, Datoria automatically harmonizes start partition specifications with what data actually exists:

Automatic Adjustment for Expired Partitions

Datoria calculates an effective start partition that considers both:

  • The explicitly defined start partition (or the one inferred from dependencies)
  • The oldest available partition is based on what hasn't yet expired

This means you can set a historical start partition date once and never worry about updating it as older partitions expire. Datoria will intelligently begin processing from the oldest available partition.

Multiple Dependencies with Different Expiration Settings

For jobs that depend on multiple tables with different partition expiration settings, Datoria takes the most conservative approach. It will only process partitions where all required source data is still available.

This ensures data integrity by preventing partial joins or incomplete processing that could occur if only some of the source data had expired.

Benefits of This Approach

This automatic handling provides several advantages:

  • No need to manually update start partitions as data ages out
  • Prevents job failures due to references to expired partitions
  • Eliminates the need to track and synchronize expiration dates across your codebase
  • Makes your pipeline definitions more stable and maintainable

With Datoria, your data pipeline continues to operate smoothly even as the window of available data shifts over time due to partition expiration.

Common Job Patterns

Daily Aggregation

// Define a table representing user sessions
const userSessionsTable = BQTable({
dataset: analyticsDataset,
name: "user_sessions",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("session_duration", BQFieldType.Integer, "REQUIRED"),
],
});

// Define the destination table for our example
const dailyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "daily_user_stats",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("daily_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("total_session_time", BQFieldType.Integer, "REQUIRED"),
],
});

export const dailyAggregationJob = TransformationJob({
name: "daily_user_stats_job",
destination: dailyUserStatsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(DISTINCT user_id) as daily_active_users,
SUM(session_duration) as total_session_time
FROM ${userSessionsTable.atExecutionDate()}
GROUP BY date
`,
startPartition: BQPartition.Day(2025, 1, 1),
});

Monthly Rollup from Daily Data

// Define the destination table
const monthlyUserStatsTable = BQTable({
dataset: analyticsDataset,
name: "monthly_user_stats",
partitioning: BQPartitioning.Month("month"),
schema: [
BQField("month", BQFieldType.Date, "REQUIRED"),
BQField("monthly_active_users", BQFieldType.Integer, "REQUIRED"),
BQField("monthly_session_time", BQFieldType.Integer, "REQUIRED"),
],
});

export const monthlyUserStatsJob = TransformationJob({
name: "monthly_user_stats_job",
destination: monthlyUserStatsTable,
query: sql`
SELECT
DATE_TRUNC(${BQExpr.ExecutionMonth.atFirstDayOfMonth()}, MONTH) as month,
SUM(daily_active_users) as monthly_active_users,
SUM(total_session_time) as monthly_session_time
FROM ${dailyAggregationJob.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`,
startPartition: BQPartition.Month(2025, 1),
});

Job Metadata

Jobs can include metadata to provide additional context:

export const jobWithMetadata = {
...monthlyUserStatsJob,
name: "monthly_user_stats_job-ctx",
metadata: {
description: "Montly aggregation of user statistics",
owner: "data-platform-team",
slack_channel: "#data-alerts",
criticality: "high",
sla_minutes: 120,
tags: ["user-analytics", "daily-metrics"],
webhookURI: "https://api.example.com/job-notifications",
},
};

This metadata can be used by external systems, documentation generators, or custom extensions.

Testing Jobs

Jobs can be tested using Datoria's unit testing framework:

import { UnitTest, TestRelation } from "@datoria/sdk";

export const flowerOrdersTest = UnitTest({
testSubject: flowerOrdersJob,
testRelations: [
TestRelation(
ordersTable,
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
},
{
date: "2025-01-01",
user_id: "user2",
product_id: "prod2",
},
),
TestRelation(
productMetadataTable,
{
product_id: "prod1",
name: "Rose Bouquet",
category: "flowers",
},
{
product_id: "prod2",
name: "Desk Lamp",
category: "home",
},
),
],
assertEquals: [
{
date: "2025-01-01",
user_id: "user1",
product_id: "prod1",
name: "Rose Bouquet",
},
],
});

See the Unit Testing section for more details.

Running Jobs

To run a job, use the Datoria CLI:

datoria run job-name

Or to run all jobs:

datoria run

See the Running Jobs section for more details.

Best Practices

  • Use descriptive names for jobs to make them easy to identify
  • Include metadata like owner and description for documentation
  • Set explicit startPartition for jobs without dependencies on other jobs
  • Define job-specific destination tables rather than sharing destinations
  • Test jobs with unit tests to verify their behavior