Invalidation & Efficiency
What is Invalidation?
In Datoria, invalidation refers to the process of determining when transformed data needs to be recomputed due to changes in upstream data. By defining explicit invalidation rules, you can control exactly when and why your jobs should rerun.
Invalidation applies to data changes. If you modify the SQL used to produce an upstream table, a backfill will likely be needed instead of relying on invalidation.
Invalidation Strategies
Datoria provides several invalidation strategies that can be applied to dependencies:
1. Never Invalidate (Default)
The default strategy is to never automatically invalidate based on upstream changes:
import { sql } from "@datoria/sdk";
import { dailyEventsTable } from "../tables-and-partitioning.md";
// Default behavior: ignore upstream changes
export const defaultQuery =
sql`SELECT * FROM ${
dailyEventsTable.atExecutionDate()
}`;
This gives you complete control over when data is reprocessed, which can be useful for:
- Historical data that rarely changes
- Expensive transformations where controlled reprocessing is preferred
- Tables where you want explicit manual control over updates
2. Always Invalidate on Changes
To automatically reprocess whenever upstream data changes:
// Reprocess whenever upstream data changes
export const alwaysInvalidateQuery =
sql`SELECT * FROM ${
dailyEventsTable.atExecutionDate().invalidateAfterUpstreamChanged()
}`;
This strategy ensures your data is always up to date, which is useful for:
- Critical metrics and KPIs
- Real-time or near-real-time dashboards
- Data where freshness is the highest priority
3. Invalidate After Cooldown
To balance freshness with efficiency, you can reprocess after both an upstream change and a cooldown period:
// Reprocess after changes and a 2-hour cooldown
export const cooldownQuery =
sql`SELECT * FROM ${
dailyEventsTable
.atExecutionDate()
.invalidateAfterUpstreamChangedAndCooldownHasPassed("2 hours")
}`;
The cooldown can be specified in various formats:
"1 day"
- One day"2 hours"
- Two hours"30 minutes"
- Thirty minutes"P1D"
- One day (ISO 8601 format)"PT2H"
- Two hours (ISO 8601 format)
This approach is useful for:
- Balancing data freshness with processing efficiency
- Grouping multiple upstream changes into a single processing job
- Preventing processing storms when many changes happen in quick succession
4. Invalidate at Specific Times
For data that only needs updates at specific times of day:
// Reprocess after changes, but only at 8 AM or 8 PM London time
export const specificTimeQuery =
sql`SELECT * FROM ${
dailyEventsTable
.atExecutionDate()
.invalidateAfterUpstreamChangedAndClockPassed(
"Europe/London",
["08:00", "20:00"]
)
}`;
This approach works well for:
- Batch processing at scheduled times
- Ensuring data is ready for start-of-business
- Aligning updates with business operations
5. Conditional Invalidation
For more complex scenarios, you can apply different invalidation rules based on conditions:
import { Invalidation, TimePredicate } from "@datoria/sdk";
// Apply different invalidation rules based on age
export const conditionalQuery =
sql`SELECT * FROM ${
dailyEventsTable
.atExecutionDate()
.invalidateConditional(
TimePredicate.DataOlderThanDays(3), // Condition
Invalidation.IgnoreUpstreamChanges, // If data is older than 3 days
Invalidation.AlwaysWhenUpstreamChanged, // If data is 3 days old or newer
)
}`;
This approach is powerful for:
- Treating recent data differently from historical data
- Implementing complex business rules about data freshness
- Optimizing processing costs by focusing on most valuable data
How Invalidation Works
When a job has multiple dependencies with different invalidation rules:
- Each dependency's invalidation status is evaluated independently
- If any dependency indicates invalidation, the entire job is invalidated
- The most aggressive invalidation rule effectively takes precedence
This approach ensures that your data is as fresh as your most demanding requirement.
Real-World Invalidation Patterns
Recent Data Fresh, Historical Data Stable
import {
BQExpr,
BQField,
BQFieldType,
BQPartition,
BQPartitioning,
BQTable,
TransformationJob,
} from "@datoria/sdk";
import { analyticsDataset } from "../tables-and-partitioning.md";
// Define a destination table
const userMetricsTable = BQTable({
dataset: analyticsDataset,
name: "user_metrics",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("daily_active_users", BQFieldType.Integer, "REQUIRED"),
],
});
// Keep recent data fresh, but leave historical data stable
const eventsDep = dailyEventsTable
.atExecutionDate()
.invalidateConditional(
TimePredicate.DataOlderThanDays(7),
Invalidation.IgnoreUpstreamChanges, // Older than 7 days: ignore changes
Invalidation.AlwaysWhenUpstreamChanged, // Last 7 days: always update
);
export const userMetricsJob = TransformationJob({
name: "user_metrics",
destination: userMetricsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(DISTINCT user_id) as daily_active_users
FROM ${eventsDep}
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 1),
});
Batch Updates for High-Volume Tables
import { hourlyEventsTable } from "../tables-and-partitioning.md";
// Define a destination table
const batchResultsTable = BQTable({
dataset: analyticsDataset,
name: "batch_results",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("event_count", BQFieldType.Integer, "REQUIRED"),
BQField("unique_users", BQFieldType.Integer, "REQUIRED"),
],
});
// Update at fixed times to batch processing
const batchDep = hourlyEventsTable
.range(
BQExpr.ExecutionDate.atFirstHourOfDay(),
BQExpr.ExecutionDate.atLastHourOfDay(),
)
.invalidateAfterUpstreamChangedAndClockPassed("UTC", ["00:00", "12:00"]);
export const batchProcessingJob = TransformationJob({
name: "batch_processing",
destination: batchResultsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM ${batchDep}
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 1),
});
Critical Metrics with Immediate Updates
import { monthlyStatsTable } from "../tables-and-partitioning.md";
// Define a destination table
const businessKPIsTable = BQTable({
dataset: analyticsDataset,
name: "business_kpis",
partitioning: BQPartitioning.Day("date"),
schema: [
BQField("date", BQFieldType.Date, "REQUIRED"),
BQField("monthly_value", BQFieldType.Float, "REQUIRED"),
BQField("metric_count", BQFieldType.Integer, "REQUIRED"),
],
});
// Always update critical metrics immediately
const revenueDep = monthlyStatsTable.at(
BQExpr.ExecutionDate.atWholeMonth(),
Invalidation.AlwaysWhenUpstreamChanged,
);
export const criticalMetricsJob = TransformationJob({
name: "critical_metrics",
destination: businessKPIsTable,
query: sql`
SELECT
${BQExpr.ExecutionDate} as date,
SUM(value) as monthly_value,
COUNT(DISTINCT metric) as metric_count
FROM ${revenueDep}
WHERE metric LIKE 'revenue%'
GROUP BY date
`,
startPartition: BQPartition.Day(2023, 1, 1),
});
Optimizing Cloud Costs with Invalidation
Proper invalidation strategies can significantly reduce cloud costs by:
- Processing only what needs processing - Avoid redundant computation
- Balancing freshness with efficiency - Use cooldown periods to batch processing
- Prioritizing recent data - Apply different strategies based on data age
- Scheduling updates strategically - Align processing with business needs
Monitoring Invalidation
When running jobs, Datoria provides visibility into the invalidation process:
datoria run --plan
This command shows which partitions will be processed and why, including:
- Which upstream changes triggered invalidation
- Which invalidation rules were applied
- When partitions were last processed
Invalidation vs. Backfilling
It's important to understand the difference between invalidation and backfilling:
- Invalidation is for handling changes in upstream data
- Backfilling is for applying new logic to historical data
If you change the SQL logic of a job, you'll typically need to backfill rather than rely on invalidation. Datoria provides tools for both scenarios.
Troubleshooting Invalidation
Job Runs Too Frequently
If a job is running more often than expected:
- Check if multiple upstream dependencies are changing frequently
- Consider using cooldown periods to batch updates
- Use conditional invalidation to ignore changes to less important data
Job Doesn't Run When Expected
If a job isn't updating when upstream data changes:
- Verify that invalidation rules are properly defined
- Check if the execution schedule aligns with the invalidation rules
- Ensure that partition metadata is being updated correctly
Best Practices
1. Match Invalidation to Business Needs
Different data has different freshness requirements:
- Critical financial metrics might need immediate updates
- Marketing analytics might be fine with daily updates
- Historical analyses might never need reprocessing
Choose invalidation strategies that align with these requirements.
2. Use Conditional Invalidation for Large Datasets
For large datasets with years of history:
- Keep recent data (days/weeks) fresh with aggressive invalidation
- Apply more conservative rules to older data (months/years)
- Consider no automatic invalidation for archival data
3. Coordinate Invalidation Across Pipelines
For complex pipelines with many jobs:
- Use consistent invalidation patterns for related jobs
- Consider the full dependency chain when designing invalidation rules
- Document the expected behavior for clarity
4. Monitor and Adjust
Invalidation strategies aren't set in stone:
- Monitor how often jobs run and why
- Adjust strategies based on observed patterns
- Balance cost efficiency with data freshness