class: title, center, middle # Incremental models `where updated_date >= '2023-05-26'` --- class: subtitle, center, middle # From earlier: --- # What if... | id | name | cohort | is_enrolled | |----|---------|----------------|-------------| | 1 | Rumi | Earth | false | | 2 | Angaline | Wind | true | -- | 3 | Ursula | Fire | true | -- ### What would you do? * **View:** Recalculate every time I'm queried. Always up to date, often slow. * **Table:** Rebuild whole thing from scratch! Brute force always works. * **Ephemeral:** I don't exist in your data platform. No way to query me, even if you wanted to. -- * **Incremental:** Add _new_ data to an existing table --- # Why? * It costs time (and money) to transform data * Historical data shouldn't be changing
--- ### Table vs Incremental
--- ### Table vs Incremental
---
Incremental | Focus
Explain use cases for incremental models
Build intuition for implementing incremental models
Understand the tradeoffs incremental models introduce
--- # Example: Snowplow events * Page views and pings on **www.getdbt.com/dbt-learn/lessons**! Snowplow is constantly capturing data regarding how visitors interact with our pages. * Thousands of records daily * Create a source like so: ```yml version: 2 sources: - name: snowplow database: raw tables: - name: events ``` --- ### We've got a model: events → page views .denser-text[ ```sql with events as ( select * from {{ ref('stg_events') }} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] --- # Tactics - How do we want to materialize this model? -- - Start with `view` - When it takes too long to query, switch to `table` - When it takes too long to build, switch to `incremental` -- - How can we make this incremental? --- # Building an incremental model **1. ❓ How do we tell dbt to add new rows instead of recreating the table?** -- ```sql {{ config( materialized = 'incremental' ) }} ``` --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table? ** -- **2. ❓ How do we identify new rows?** -- 💡 What if we compared the source data to the already-transformed data? -- As a statement, run: ```sql select max(max_collector_tstamp) from {{ ref('snowplow__page_views') }} ``` How do we integrate this into our model? --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ❓ How do we identify new rows _on "subsequent" runs only?_** -- ```sql with events as ( select * from {{ ref('stg_events') }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), ... ``` --- # Special Jinja variables ### `{{ this }}` .dense-text[Represents the currently existing database object mapped to this model.] ### `is_incremental()` .dense-text[Checks four conditions:] .denser-text[ 1. Does this model already exist as an object in the database? 2. Is that database object a table? 3. Is this model configured with `materialized = 'incremental'`? 4. Was the `--full-refresh` flag passed to this `dbt run`? ] .dense-text[Yes Yes Yes No == an incremental run] ??? If all of the conditions are met, `is_incremental()` returns `true`. --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ✅ How do we identify new rows on "subsequent" runs only? ** --- # Let's try it out! --- .denser-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ ref('stg_events' }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] --- ## Run 1: `dbt run --select page_views` -- .denser-text[ Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ] --- ## Run 2: `dbt run --select page_views` -- .denser-text[ Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select max(max_collector_tstamp) from analytics.dbt_alice.page_views) ), ... ) ``` ```sql insert into analytics.dbt_alice.page_views ( "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" ) ( select "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" from analytics.dbt_alice.page_views__dbt_tmp ); ``` ] ??? Teacher note: see the insert! cool! --- ## Run 3 (full refresh): `dbt run --select page_views --full-refresh` -- .denser-text[ Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ] --- # Conceptual framework
--- # Conceptual framework
--- class: subtitle #Checkpoint * How do we identify the cutoff for our table? * What happens with a `full refresh`? * Why would I materialize as an incremental model? --- class: subtitle #Hands-On (15 min)
Build:
Use the command that builds your models to your data platform
Add:
A `config block` to `snowplow__page_views.sql`
Incremental materialization
`if` statement to the import CTE
Build:
Use the command that builds your models to your data platform
Bonus:
Go to learn.getdbt.com and browse a slide link
Return to the last `Build` step and see how your results change
--- # What if: -- **Our data showed up in our data platform late?** -- Our cutoff time might mean we miss these data! -- 💡 What if we widen the window to the last three days? -- .dense-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ ref('stg_events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] --- # Conceptual framework
--- # Conceptual framework
We're going to end up with duplicate records! -- 💡Replace existing rows, insert new rows. --- # Handling late arriving data Use the `unique_key` config to avoid duplicates: .dense-text[ ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id' ) }} with events as ( select * from {{ ref('stg_events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] --- # Run 4: `dbt run --select page_views` -- .denser-text[ Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from analytics.dbt_alice.page_views) ), ... ) ``` ```sql merge into analytics.dbt_alice.page_views as DBT_INTERNAL_DEST using analytics.dbt_alice.page_views__dbt_tmp as DBT_INTERNAL_SOURCE on DBT_INTERNAL_SOURCE.page_view_id = DBT_INTERNAL_DEST.page_view_id when matched then update set "PAGE_VIEW_ID" = DBT_INTERNAL_SOURCE."PAGE_VIEW_ID","EVENT_ID" = DBT_INTERNAL_SOURCE."EVENT_ID","ANONYMOUS_USER_ID" = DBT_INTERNAL_SOURCE."ANONYMOUS_USER_ID","SESSION_ID" = DBT_INTERNAL_SOURCE."SESSION_ID","EVENT" = DBT_INTERNAL_SOURCE."EVENT","DEVICE_TYPE" = DBT_INTERNAL_SOURCE."DEVICE_TYPE","PAGE_URL" = DBT_INTERNAL_SOURCE."PAGE_URL","PAGE_TITLE" = DBT_INTERNAL_SOURCE."PAGE_TITLE","PAGE_URLSCHEME" = DBT_INTERNAL_SOURCE."PAGE_URLSCHEME","PAGE_URLHOST" = DBT_INTERNAL_SOURCE."PAGE_URLHOST","PAGE_URLPORT" = DBT_INTERNAL_SOURCE."PAGE_URLPORT","PAGE_URLPATH" = DBT_INTERNAL_SOURCE."PAGE_URLPATH","PAGE_URLQUERY" = DBT_INTERNAL_SOURCE."PAGE_URLQUERY","PAGE_URLFRAGMENT" = DBT_INTERNAL_SOURCE."PAGE_URLFRAGMENT","COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."COLLECTOR_TSTAMP","DERIVED_TSTAMP" = DBT_INTERNAL_SOURCE."DERIVED_TSTAMP","APPROX_TIME_ON_PAGE" = DBT_INTERNAL_SOURCE."APPROX_TIME_ON_PAGE","PAGE_VIEW_START" = DBT_INTERNAL_SOURCE."PAGE_VIEW_START","MAX_COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."MAX_COLLECTOR_TSTAMP" when not matched then insert ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP") values ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP")set ``` ] --- class: subtitle #Hands-On (15 min)
Expand:
Increase the incremental cutoff by 3 days in `snowplow__page_views`
Add:
A unique key to the config block
Build:
Use the command that builds your models to your data platform
--- # Under the hood What's the DDL/DML that dbt is running? 1. Create a temp table of new records 2. Reconcile the existing table with the temp table, using one of: - `merge` ("upsert" new field values on row) - `delete+insert` ("delete" entire row and "insert" new row in place) - `insert overwrite` ("replace" _entire partitions_) Depends on: - database support (e.g. Redshift does not have a `merge` statement) - user input: `incremental_strategy` ([docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models/#what-is-an-incremental_strategy)) --- # Conceptual framework
--- # How do you set the cutoff? What if data arrives _really_ late? -- In our opinion, the goal of incremental models is to _approximate_ the "true" table in a fraction of the runtime: - Perform an analysis on the arrival time of data - Figure out your organization's tolerance for correctness - Set the cutoff based on these two inputs - Once a week, perform a `--full-refresh` run to get the "true" table -- "Close enough & performant" --- # Our Snowplow data: .left-column-66[
X: days between firing + collection Y: % of all events ] .right-column-33[ - Dataset: 2.5 months of Snowplow data, 285mm events - 99.82% of events arrive within 1 hr of firing - 99.88% within 24 hr - .highlight[99.93% within 72 hr] - ∴ 3 day cutoff, refresh weekly ] --- # What if... -- **A column is added/removed from my model in the future?** -- Shifts in available data or data types could cause issues -- 💡 Let's implement a configuration to account for these future changes! --- # Accounting for future table changes We can add an `on_schema_change` config to the top of our model ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id', on_schema_change = 'sync_all_columns' ) }} ``` -- This will: -- - add any new columns to the existing table - remove any columns that are now missing - inclusive of data type changes -- ### Note - this _will_ _not_ backfill values in old records for new columns --- class: subtitle, center, middle # Questions? --- class: subtitle, center, middle # How things fall apart --- # What if... A page view has events either side of our cut off timestamp?
--- # What if... A page view has events either side of our cut off timestamp?
-- We'd end up under-counting the page view duration. But, maybe we're okay with that, page view durations aren't _that_ important. --- # What if... We want to calculate window functions? - `page_view_in_session_index` - `page_view_for_user_index` --- .denser-text[ Replace: ```sql ) select * from joined ``` with: ```sql ... indexed as ( select *, row_number() over ( partition by session_id order by page_view_start ) as page_view_in_session_index, row_number() over ( partition by anonymous_user_id order by page_view_start ) as page_view_for_user_index from joined ) select * from indexed ``` ] --- # What if... We want to calculate window functions? - `page_view_in_session_index` - `page_view_for_user_index` -- Window functions will return incorrect results since they will be performed on a subset of data. --- # "Always correct & slow"
- If a user has a new event, recalculate _all_ page views for that user - This works with window functions! But it's **_slowwwww_**... --- # "Always correct & slow" ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id' ) }} with events as ( select * from {{ ref('stg_events') }} {% if is_incremental() %} where anonymous_user_id in ( select distinct anonymous_user_id from {{ source('snowplow', 'events') }} where event_timestamp >= (select dateadd(day, -3, max(event_timestamp)::date) from {{ this }}) ) {% endif %} ) ... ``` --- # Too clever by half
- Whenever a user has a new session, pull the user's _most recent session only_, and perform relative calculations - This takes some hard thinking! E.g. [Segment package](https://github.com/dbt-labs/segment/blob/master/models/sessionization/segment_web_sessions.sql#L67) --- class: middle ## Want even more info?
.center[[Read it!](https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303)] --- # What about _truly_ massive datasets? Bigger datasets (and Big Data technologies) are different cost-optimization problems. - Always rebuild past 3 days. Fully ignore late arrivals. - Always replace data at the _partition level_. - No unique keys. - Targeted lookback? No way: too much extra data to scan. - Avoid full refreshes We're going to talk about this next :) --- ## Should I use an incremental model? ### Good candidates - Immutable event streams: tall + skinny table, append-only, no updates - If there _are_ any updates, a reliable `updated_at` field ### Not-so-good candidates - You have small-ish data - Your data changes constantly: new columns, renamed columns, etc. - Your data is updated in unpredictable ways - Your transformation performs comparisons or calculations that require _other_ rows --- # Incremental models introduce tradeoffs: * Most incremental models are "approximately correct" * They introduce a level of code complexity (i.e. how easy it is for someone to understand your code) * Prioritizing correctness can negate performance gains from incrementality Think of incremental models as an _upgrade_, not a _starting point_. --- # If you do use incremental models: Keep the inputs and transformations of your incremental models as singular, simple, and immutable as possible. - Slowly changing dimensions, like a `product_name` that the company regularly rebrands? Join from a `dim` table. - Window functions for quality-of-life counters? Fix it in post—i.e., a separate model. This is how our Snowplow package calculates each user's `session_index`. --- class: subtitle ## Checkpoint - **Share out:** What are some cases at your organization where you would implement incremental models? -- ## Questions? --- class: subtitle # Knowledge check You should be able to: - Explain use cases for incremental models - Build intuition for implementing incremental models - Understand the tradeoffs incremental models introduce ## Questions? --- # Resources * [Incremental Models: overview](https://docs.getdbt.com/docs/build/incremental-models) * [Incremental Models: in-depth](https://docs.getdbt.com/guides/best-practices/materializations/4-incremental-models) * [Limits of Incrementality](https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303) --- class: subtitle #Zoom Out
Learn Norms
Standard Materializations
Incremental
--
Snapshots