class: title, center, middle # Incremental models `where updated_date >= '2023-03-21'` --- class: subtitle, center, middle # From earlier: --- .left-column[ # The Big Idea * Let's keep the old table, and we'll just _add_ the _new_ records * Less wasteful and probably faster * We just need to know: * What we mean by "new" * What we mean by "add" * As long as we have good answers, we're in good shape ] .right-column[
] --- # Why? * It costs time (and money) to transform data * Historical data doesn't (shouldn't) change * If you can get away _without_ re-transforming historical data, you save time and money
---
Incremental Models | Focus
Explain use cases for incremental models
Build intuition for implementing incremental models
Understand the tradeoffs incremental models introduce
--- # Example: Snowplow events * Page views and pings on **learn.getdbt.com**! Snowplow is constantly capturing data regarding how visitors interact with our pages. * Create a source like so: ```yml version: 2 sources: - name: snowplow database: raw loaded_at_field: collector_tstamp freshness: error_after: {count: 1, period: hour} tables: - name: events ``` --- ### We've got a model: events → page views .denser-text[ ```sql with events as ( select * from {{ source('snowplow', 'events') }} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] --- # Tactics - How do we want to materialize this model? -- - Start with `view` - When it takes too long to query, switch to `table` - When it takes too long to build, switch to `incremental` -- - How can we make this incremental? --- # Building an incremental model **1. ❓ How do we tell dbt to add new rows instead of recreating the table?** -- ```sql {{ config( materialized = 'incremental' ) }} ``` --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table? ** -- **2. ❓ How do we identify new rows?** -- 💡 What if we compared the source data to the already-transformed data? -- As a statement, run: ```sql select max(max_collector_tstamp) from {{ ref('page_views') }} ``` How do we integrate this into our model? --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ❓ How do we identify new rows _on "subsequent" runs only?_** -- ```sql with events as ( select * from {{ source('snowplow', 'events') }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), ... ``` --- # Special Jinja variables ### `{{ this }}` .dense-text[Represents the currently existing database object mapped to this model.] ### `is_incremental()` .dense-text[Checks four conditions:] .denser-text[ 1. Does this model already exist as an object in the database? 2. Is that database object a table? 3. Is this model configured with `materialized = 'incremental'`? 4. Was the `--full-refresh` flag passed to this `dbt run`? ] .dense-text[Yes Yes Yes No == an incremental run] ??? If all of the conditions are met, `is_incremental()` returns `true`. --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ✅ How do we identify new rows on "subsequent" runs only? ** --- # Let's try it out! --- .denser-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ source('snowplow', 'events') }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] --- ## Run 1: .denser-text[ `$ dbt run -s page_views` Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ] --- ## Run 2: .denser-text[ `$ dbt run -s page_views` Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select max(max_collector_tstamp) from analytics.dbt_alice.page_views) ), ... ) ``` ```sql insert into analytics.dbt_alice.page_views ( "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" ) ( select "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" from analytics.dbt_alice.page_views__dbt_tmp ); ``` ] ??? Teacher note: see the insert! cool! --- ## Run 3 (full refresh): .denser-text[ `$ dbt run -s page_views --full-refresh` Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ] --- # Conceptual framework
--- # Conceptual framework
--- # What if: -- **Our data showed up in our data warehouse late?** -- Our cutoff time might mean we miss these data! -- 💡 What if we widen the window to the last three days? -- .dense-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ source('snowplow', 'events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] --- # Conceptual framework
--- # Conceptual framework
We're going to end up with duplicate records! -- 💡Replace existing rows, insert new rows. --- # Handling late arriving data Use the `unique_key` config to avoid duplicates: .dense-text[ ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id' ) }} with events as ( select * from {{ source('snowplow', 'events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] --- # Run 4: .denser-text[ `dbt run -s page_views` Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from analytics.dbt_alice.page_views) ), ... ) ``` ```sql merge into analytics.dbt_alice.page_views as DBT_INTERNAL_DEST using analytics.dbt_alice.page_views__dbt_tmp as DBT_INTERNAL_SOURCE on DBT_INTERNAL_SOURCE.page_view_id = DBT_INTERNAL_DEST.page_view_id when matched then update set "PAGE_VIEW_ID" = DBT_INTERNAL_SOURCE."PAGE_VIEW_ID","EVENT_ID" = DBT_INTERNAL_SOURCE."EVENT_ID","ANONYMOUS_USER_ID" = DBT_INTERNAL_SOURCE."ANONYMOUS_USER_ID","SESSION_ID" = DBT_INTERNAL_SOURCE."SESSION_ID","EVENT" = DBT_INTERNAL_SOURCE."EVENT","DEVICE_TYPE" = DBT_INTERNAL_SOURCE."DEVICE_TYPE","PAGE_URL" = DBT_INTERNAL_SOURCE."PAGE_URL","PAGE_TITLE" = DBT_INTERNAL_SOURCE."PAGE_TITLE","PAGE_URLSCHEME" = DBT_INTERNAL_SOURCE."PAGE_URLSCHEME","PAGE_URLHOST" = DBT_INTERNAL_SOURCE."PAGE_URLHOST","PAGE_URLPORT" = DBT_INTERNAL_SOURCE."PAGE_URLPORT","PAGE_URLPATH" = DBT_INTERNAL_SOURCE."PAGE_URLPATH","PAGE_URLQUERY" = DBT_INTERNAL_SOURCE."PAGE_URLQUERY","PAGE_URLFRAGMENT" = DBT_INTERNAL_SOURCE."PAGE_URLFRAGMENT","COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."COLLECTOR_TSTAMP","DERIVED_TSTAMP" = DBT_INTERNAL_SOURCE."DERIVED_TSTAMP","APPROX_TIME_ON_PAGE" = DBT_INTERNAL_SOURCE."APPROX_TIME_ON_PAGE","PAGE_VIEW_START" = DBT_INTERNAL_SOURCE."PAGE_VIEW_START","MAX_COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."MAX_COLLECTOR_TSTAMP" when not matched then insert ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP") values ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP")set ``` ] --- # Under the hood What's the DDL/DML that dbt is running? 1. Create a temp table of new records 2. Reconcile the existing table with the temp table, using one of: - `merge` ("upsert" new field values on row) - `delete+insert` ("delete" entire row and "insert" new row in place) - `insert overwrite` ("replace" _entire partitions_) Depends on: - database support (e.g. Redshift does not have a `merge` statement) - user input: `incremental_strategy` ([docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models/#what-is-an-incremental_strategy)) --- # Conceptual framework
--- # How do you set the cutoff? What if data arrives _really_ late? -- In our opinion, the goal of incremental models is to _approximate_ the "true" table in a fraction of the runtime: - Perform an analysis on the arrival time of data - Figure out your organization's tolerance for correctness - Set the cutoff based on these two inputs - Once a week, perform a `--full-refresh` run to get the "true" table -- "Close enough & performant" --- # Our Snowplow data: .left-column-66[
X: days between firing + collection Y: % of all events ] .right-column-33[ - Dataset: 2.5 months of Snowplow data, 285mm events - 99.82% of events arrive within 1 hr of firing - 99.88% within 24 hr - .highlight[99.93% within 72 hr] - ∴ 3 day cutoff, refresh weekly ] --- class: subtitle, center, middle # How things fall apart --- # What if... A page view has events either side of our cut off timestamp?
--- # What if... A page view has events either side of our cut off timestamp?
-- We'd end up under-counting the page view duration. But, maybe we're okay with that, page view durations aren't _that_ important. --- # What if... We want to calculate window functions? - `page_view_in_session_index` - `page_view_for_user_index` --- .denser-text[ Replace: ```sql ) select * from joined ``` with: ```sql ... indexed as ( select *, row_number() over ( partition by session_id order by page_view_start ) as page_view_in_session_index, row_number() over ( partition by anonymous_user_id order by page_view_start ) as page_view_for_user_index from joined ) select * from indexed ``` ] --- # What if... We want to calculate window functions? - `page_view_in_session_index` - `page_view_for_user_index` -- Window functions will return incorrect results since they will be performed on a subset of data. --- # "Always correct & slow"
- If a user has a new event, recalculate _all_ page views for that user - This works with window functions! But it's **_slowwwww_**... --- # "Always correct & slow" ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id' ) }} with events as ( select * from {{ source('snowplow', 'events') }} {% if is_incremental() %} where anonymous_user_id in ( select distinct anonymous_user_id from {{ source('snowplow', 'events') }} where event_timestamp >= (select dateadd(day, -3, max(event_timestamp)::date) from {{ this }}) ) {% endif %} ) ... ``` --- # Too clever by half
- Whenever a user has a new session, pull the user's _most recent session only_, and perform relative calculations - This takes some hard thinking! E.g. [Segment package](https://github.com/dbt-labs/segment/blob/master/models/sessionization/segment_web_sessions.sql#L67) --- class: middle ## Want even more info?
.center[[Read it!](https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303)] --- # What about _truly_ massive datasets? Bigger datasets (and Big Data technologies) are different cost-optimization problems. - Always rebuild past 3 days. Fully ignore late arrivals. - Always replace data at the _partition level_. - No unique keys. - Targeted lookback? No way: too much extra data to scan. - Avoid full refreshes We're going to talk about this next :) --- ## Should I use an incremental model? ### Good candidates - Immutable event streams: tall + skinny table, append-only, no updates - If there _are_ any updates, a reliable `updated_at` field ### Not-so-good candidates - You have small-ish data - Your data changes constantly: new columns, renamed columns, etc. - Your data is updated in unpredictable ways - Your transformation performs comparisons or calculations that require _other_ rows --- # Incremental models introduce tradeoffs: * Most incremental models are "approximately correct" * They introduce a level of code complexity (i.e. how easy it is for someone to understand your code) * Prioritizing correctness can negate performance gains from incrementality Think of incremental models as an _upgrade_, not a _starting point_. --- # If you do use incremental models: Keep the inputs and transformations of your incremental models as singular, simple, and immutable as possible. - Slowly changing dimensions, like a `product_name` that the company regularly rebrands? Join from a `dim` table. - Window functions for quality-of-life counters? Fix it in post—i.e., a separate model. This is how our Snowplow package calculates each user's `session_index`. --- class: subtitle ## Checkpoint - **Share out:** What are some cases at your organization where you would implement incremental models? -- ## Questions? --- class: subtitle # Knowledge check You should be able to: - Explain use cases for incremental models - Build intuition for implementing incremental models - Understand the tradeoffs incremental models introduce ## Questions? --- class: subtitle # Zoom Out
Polish your dbt project
Intro to Jinja
dbtonic Jinja
Packages and projects
dbt Materializations
Incremental Models
--
Techniques of the trade
Closing ceremony
---