Dependency Management
Understanding Dependencies in Datoria
In Datoria, dependencies are explicit declarations of which data a transformation requires. Unlike traditional approaches that use simple table references, Datoria works with partition-level dependencies that precisely specify which slices of data are needed.
This granular approach allows Datoria to:
- Process only the necessary data
- Track dependencies across complex pipelines
- Validate that required data exists
- Optimize processing based on data changes
Dependency Types
Let's start with a simple partitioned table as our example:
import { BQField, BQFieldType, BQPartitioning, BQTable } from "@datoria/sdk";
import { analyticsDataset } from "../tables-and-partitioning.md";
export const userEventsTable = BQTable({
dataset: analyticsDataset,
name: "user_events",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("event_type", BQFieldType.String, "REQUIRED"),
BQField("date", BQFieldType.Date, "REQUIRED"),
],
});
When you reference this table in SQL, you need to be explicit about which partitions you want to access. There are several ways to do this:
1. All Partitions: allPartitions()
To access all partitions of a table:
import { sql, Dependency, Invalidation } from "@datoria/sdk";
// Long form (explicit)
export const allPartitionsQuery = sql`SELECT * FROM ${Dependency.All(userEventsTable, Invalidation.IgnoreUpstreamChanges)}`;
// Short form (convenient)
export const shortFormQuery = sql`SELECT * FROM ${userEventsTable.allPartitions()}`;
This is useful for:
- Creating views that need to access the entire table
- Working with reference or dimension tables
- Performing historical analysis across all partitions
2. Single Partition: at()
To access a specific partition:
import { BQExpr } from "@datoria/sdk";
// Long form (explicit)
export const longFormQuery = sql`
SELECT *
FROM ${Dependency.Single(
userEventsTable,
BQExpr.ExecutionDate,
Invalidation.IgnoreUpstreamChanges,
)}
`;
// Medium form
export const mediumFormQuery = sql`
SELECT *
FROM ${userEventsTable.at(BQExpr.ExecutionDate)}
`;
// Short form (most convenient)
export const todayQuery = sql`
SELECT *
FROM ${userEventsTable.atExecutionDate()}
`;
At execution time, these references are expanded to target specific partitions. For example, when processing the 2023-01-15 partition, the query becomes:
SELECT *
FROM (SELECT *
FROM ` my_project.analytics_dataset.user_events ` x
WHERE x.date = DATE ('2023-01-15'))
This is the most common pattern for day-to-day data processing and ensures you only access the data you need.
3. Partition Range: range()
To access a range of partitions:
// Process all days in a month
export const monthlyQuery = sql`
SELECT
DATE_TRUNC(date, MONTH) as month,
COUNT(DISTINCT user_id) as monthly_active_users
FROM ${userEventsTable.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`;
This is useful for aggregations that span multiple partitions, like monthly rollups of daily data.
Partition Expressions with BQExpr
The BQExpr
system lets you construct expressions that reference specific partitions relative to the current execution:
// Current execution date
BQExpr.ExecutionDate;
// Previous day
BQExpr.ExecutionDate.dayMinusDays(1);
// First day of current month
BQExpr.ExecutionMonth.atFirstDayOfMonth();
// Last day of current month
BQExpr.ExecutionMonth.atLastDayOfMonth();
// Current execution month
BQExpr.ExecutionMonth;
// Previous month
BQExpr.ExecutionMonth.monthMinusMonths(1);
These expressions make it easy to specify complex date relationships in a type-safe way.
Dependency Chains
Dependencies can form chains across multiple transformations. For example, if job B depends on job A, and job A depends on table X, then job B has an indirect dependency on table X.
Datoria tracks these chains automatically, ensuring that all dependencies are satisfied before running a job.
import { TransformationJob } from "@datoria/sdk";
// Define tables for our example
const eventsTable = BQTable({
dataset: analyticsDataset,
name: "events",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("user_id", BQFieldType.String, "REQUIRED"),
BQField("event_type", BQFieldType.String, "REQUIRED"),
],
});
const dailyStatsTable = BQTable({
dataset: analyticsDataset,
name: "daily_stats-deps",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("users", BQFieldType.Integer, "REQUIRED"),
],
});
const monthlyStatsTable = BQTable({
dataset: analyticsDataset,
name: "monthly_stats-deps",
partitioning: BQPartitioning.Month("month"),
schema: [
BQField("month", BQFieldType.Date, "REQUIRED"),
BQField("monthly_users", BQFieldType.Integer, "REQUIRED"),
],
});
// First job
export const dailyStatsJob = TransformationJob({
name: "daily_stats",
destination: dailyStatsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(DISTINCT user_id) as users
FROM ${eventsTable.atExecutionDate()}
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 1),
});
// Second job that depends on the first
export const monthlyStatsJob = TransformationJob({
name: "monthly_stats",
destination: monthlyStatsTable,
query: sql`
SELECT
${BQExpr.ExecutionMonth.atFirstDayOfMonth()} as month,
SUM(users) as monthly_users
FROM ${dailyStatsJob.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`,
startPartition: BQPartition.Month(2023, 1),
});
This creates a dependency chain where the monthly job depends on the daily job, which depends on the original events table.
Inference of Start Partition
For jobs that depend on other jobs, Datoria can infer the appropriate start partition:
// Define a destination table for our example
const dependentTable = BQTable({
dataset: analyticsDataset,
name: "dependent_data",
partitioning: BQPartitioning.Day("date"),
schema: dailyStatsTable.schema, // Reuse schema from previous example
});
// Define a job for our example
const sourceJob = dailyStatsJob; // Reference the job created in the previous example
export const dependentJob = TransformationJob({
name: "dependent_job",
destination: dependentTable,
query: sql`
SELECT *
FROM ${sourceJob.atExecutionDate()}
`,
// No startPartition needed - will be inferred from dependencies
});
If sourceJob
has a start partition of 2023-01-01
, then dependentJob
will automatically start from that date.
With multiple dependencies, Datoria takes the latest start partition from all dependencies to ensure that all required data exists.
Practical Examples
Daily Aggregation
// Define tables for our example
const dailyAggregateTable = BQTable({
dataset: analyticsDataset,
name: "daily_aggregate",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("events", BQFieldType.Integer, "REQUIRED"),
BQField("users", BQFieldType.Integer, "REQUIRED"),
],
});
export const dailyAggregationJob = TransformationJob({
name: "daily_aggregation",
destination: dailyAggregateTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(*) as events,
COUNT(DISTINCT user_id) as users
FROM ${eventsTable.atExecutionDate()}
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 1),
});
Monthly Rollup
// Define tables for our example
const monthlyRollupTable = BQTable({
dataset: analyticsDataset,
name: "monthly_rollup",
partitioning: BQPartitioning.Month("month"),
schema: [
BQField("month", BQFieldType.Date, "REQUIRED"),
BQField("total_events", BQFieldType.Integer, "REQUIRED"),
BQField("total_users", BQFieldType.Integer, "REQUIRED"),
],
});
export const monthlyRollupJob = TransformationJob({
name: "monthly_rollup",
destination: monthlyRollupTable,
query: sql`
SELECT
${BQExpr.ExecutionMonth.atFirstDayOfMonth()} as month,
SUM(events) as total_events,
SUM(users) as total_users
FROM ${dailyAggregationJob.range(
BQExpr.ExecutionMonth.atFirstDayOfMonth(),
BQExpr.ExecutionMonth.atLastDayOfMonth(),
)}
GROUP BY month
`,
startPartition: BQPartition.Month(2023, 1),
});
Sliding Window Analysis
import { BQPartition } from "@datoria/sdk";
// Define tables for our example
const rollingAverageTable = BQTable({
dataset: analyticsDataset,
name: "rolling_averages",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("rolling_7_day_avg", BQFieldType.Float, "REQUIRED"),
],
});
export const slidingWindowJob = TransformationJob({
name: "sliding_window",
destination: rollingAverageTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
AVG(users) as rolling_7_day_avg
FROM ${dailyAggregationJob.range(
BQExpr.ExecutionDate.dayMinusDays(6),
BQExpr.ExecutionDate,
)}
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 7), // Need 7 days of history
});