class: title, center, middle # Incremental models `where updated_date >= '2024-09-12'` --- # What if... | id | name | cohort | is_enrolled | |----|---------|----------------|-------------| | 1 | Rumi | Earth | false | | 2 | Angaline | Wind | true | ??? [instructor notes] * We have two students in our table, but what if we get another student? -- | 3 | Ursula | Fire | true | ??? [instructor notes] * How would each materialization type handle this additonal record? -- ### What would you do? * **View:** Recalculate every time I'm queried. Always up to date, often slow. * **Table:** Rebuild whole thing from scratch! Brute force always works. * **Ephemeral:** I don't exist in your data platform. No way to query me, even if you wanted to. -- * **Incremental:** Add _new_ data to an existing table ??? [instructor notes] * Incremental is the right materialization to go with here. Let's talk about why (next slide) --- # Why? * It costs time (and money) to transform data * Historical data shouldn't be changing
--- ### Table vs Incremental
??? [instructor notes] * Here we have new data coming in each day. * Monday we build a new table. * Tuesday, more data comes in, and we drop and replace Monday's table, rebuild Monday's rows, and add Tuesday's rows * Wednesday, more data comes in, and we do the same thing, drop and replace. And so on. * This can be costly and might not be neccessary. * If the records built on the previous day haven't really changed, why drop and rebuild them? --- ### Table vs Incremental
??? [instructor notes] * Here is where incremental models would make sense. * Rather than dropping and replacing the table each day, we can simply append/merge/delete+insert the new records to the already existing table. ---
Incremental | Focus
Explain use cases for incremental models
Build intuition for implementing incremental models
Understand the tradeoffs incremental models introduce
--- # Example: Snowplow events * Page views and pings on **www.getdbt.com/dbt-learn/lessons**! Snowplow is constantly capturing data regarding how visitors interact with our pages. * Thousands of records daily * Create a source like so: ```yml version: 2 sources: - name: snowplow database: raw tables: - name: events ``` ??? [instructor notes] * Let's look at an example. * We have a source, Snowplow, that is tracking events on our website. * This source table has hundreds of thousands of records coming in each day. --- ### We've got a model: stg_snowplow__events → fct_page_views .denser-text[ ```sql with events as ( select * from {{ ref('stg_snowplow__events') }} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] ??? [instructor notes] * Here we have a model where we're just looking at the page view events and how long they're spending on end page. --- # Tactics - How do we want to materialize this model? ??? [instructor notes] * With all the different materialization strategies, let's talk through when we would use each one and when to switch to a different one. -- - Start with `view` - When it takes too long to query, switch to `table` - When it takes too long to build, switch to `incremental` -- - How can we make this incremental? --- # Building an incremental model **1. ❓ How do we tell dbt to add new rows instead of recreating the table?** -- ```sql {{ config( materialized = 'incremental' ) }} ``` ??? [instructor notes] * To make a model incremental, we first need to tell dbt to materialize incrementally. * We do this with a config block at the top of the model. * Note that while you **can** set the materialization as incremental in yml files, this is not recommended. --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table? ** -- **2. ❓ How do we identify new rows?** ??? [instructor notes] * Now that dbt knows to add only the new rows, we need to determine what is considered **new** -- 💡 What if we compared the source data to the already-transformed data? -- As a statement, run: ```sql select max(max_collector_tstamp) from {{ ref('fct_page_views') }} ``` How do we integrate this into our model? ??? [instructor notes] * This select will just select the max timestamp from the model to figure out what the most recent loaded record was. --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ❓ How do we identify new rows _on "subsequent" runs only?_** -- ```sql with events as ( select * from {{ ref('stg_snowplow__events') }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), ... ``` ??? [instructor notes] * There are two new things we need to add to this query: * We need to tell dbt to only filter for those **new** records when we've already built the table and have records loaded. * The **is_incremental()** macro is going to do this work for us. * The `{{ this }}` in our from line is not a placeholder. It allows us to select the max timestamp from the model that we are currently in. * We wouldn't want to use the ref macro here because dbt would give us an error stating we can't ref the model in the query that is creating the model. --- # Special Jinja variables ### `{{ this }}` .dense-text[Represents the currently existing database object mapped to this model.] ### `is_incremental()` .dense-text[Checks four conditions:] .denser-text[ 1. Does this model already exist as an object in the database? 2. Is that database object a table? 3. Is this model configured with `materialized = 'incremental'`? 4. Was the `--full-refresh` flag passed to this `dbt run`? ] .dense-text[Yes Yes Yes No == an incremental run] ??? If all of the conditions are met, `is_incremental()` returns `true`. --- # Building an incremental model **1. ✅ How do we tell dbt to add new rows instead of recreating the table?** **2. ✅ How do we identify new rows on "subsequent" runs only? ** --- # Let's try it out! --- .denser-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ ref('stg_snowplow__events') }} {% if is_incremental() %} where collector_tstamp >= (select max(max_collector_tstamp) from {{ this }}) {% endif %} ), page_views as ( select * from events where event = 'page_view' ), aggregated_page_events as ( select page_view_id, count(*) * 10 as approx_time_on_page, min(derived_tstamp) as page_view_start, max(collector_tstamp) as max_collector_tstamp from events group by 1 ), joined as ( select * from page_views left join aggregated_page_events using (page_view_id) ) select * from joined ``` ] --
demo
??? [instructor notes] DEMO: * Show the fct_page_views model in the marts folder * Run `dbt build` and should the DDL that is created (tranisent table on first run) * Run `dbt build` again and have learners guess what they think will happen * Show logs and see if they're correct. Point out the temp table DDL, the added where clause to the sql query, and the merge statement. --- ## Run 1: `dbt run --select fct_page_views` ??? [instructor notes] * Recap what happens on each run of an incremental model -- .denser-text[ Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.fct_page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ] --- ## Run 2: `dbt run --select fct_page_views` -- .denser-text[ Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.fct_page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select max(max_collector_tstamp) from analytics.dbt_alice.fct_page_views) ), ... ) ``` ```sql insert into analytics.dbt_alice.fct_page_views ( "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" ) ( select "SESSION_ID", "ANONYMOUS_USER_ID", "PAGE_VIEW_ID", "PAGE_URL", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP" from analytics.dbt_alice.fct_page_views__dbt_tmp ); ``` ] --- ## Run 3 (full refresh): `dbt run --select fct_page_views --full-refresh` ??? [instructor notes] * If we ever want to drop an rebuild the entire table, we can pass the flag --full-refresh and that will ignore the incremental logic. -- .denser-text[ Excerpts from logs: ```sql create or replace transient table analytics.dbt_alice.fct_page_views as ( with events as ( select * from raw.snowplow.events ), ... ) ``` ]
demo
??? [instructor notes] DEMO: * Use the full refresh flag in the IDE and show the logs. Point out the DDL (transient table) and lack of where clause. --- # Conceptual framework
??? [instructor notes] * Our where clause is drawing a hard cut off point for the old records and what we consider new records. --- # Conceptual framework
??? [instructor notes] * Visually we can see the appending of the new records to the existing table. * A full refresh rebuilds everything. --- class: subtitle #Checkpoint * How do we identify the cutoff for our table? * What happens with a `full refresh`? * Why would I materialize as an incremental model? --- class: subtitle #Hands-On (15 min)
Build:
Use the command that builds your models to your data platform
Add:
A `config block` to `fct_page_views.sql`
Incremental materialization
`if` statement to the import CTE
Build:
Use the command that builds your models to your data platform
Bonus:
Go to learn.getdbt.com and browse a slide link
Return to the last `Build` step and see how your results change
--- # What if: -- **Our data showed up in our data platform late?** -- Our cutoff time might mean we miss these data! -- 💡 What if we widen the window to the last three days? -- .dense-text[ ```sql {{ config( materialized = 'incremental' ) }} with events as ( select * from {{ ref('stg_snowplow__events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] ??? [instructor notes] * Sometimes we have late or missing records that might get cut off with a hard cutoff line. * We can use a `dateadd()` function to widen this cut off window to include any late or missing records. * We use 3 days, but it can be any time window neccessary. --- # Conceptual framework
??? [instructor notes] * If we widen the cutoff window, we now are taking rows that were missing and the rows that were already loaded in that window and adding them to the existing table. * What issue do we see happening here? --- # Conceptual framework
We're going to end up with duplicate records! -- 💡Replace existing rows, insert new rows. --- # Handling late arriving data Use the `unique_key` config to avoid duplicates: .dense-text[ ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id' ) }} with events as ( select * from {{ ref('stg_snowplow__events') }} {% if is_incremental() %} where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from {{ this }}) {% endif %} ), ... ``` ] ??? [instructor notes] * A unique key allows us to merge existing records so we don't end up with duplicates. --- # Run 4: `dbt run --select fct_page_views` -- .denser-text[ Excerpts from logs: ```sql create or replace temporary table analytics.dbt_alice.fct_page_views__dbt_tmp as ( with events as ( select * from raw.snowplow.events where collector_tstamp >= (select dateadd('day', -3, max(max_collector_tstamp)) from analytics.dbt_alice.fct_page_views) ), ... ) ``` ```sql merge into analytics.dbt_alice.fct_page_views as DBT_INTERNAL_DEST using analytics.dbt_alice.fct_page_views__dbt_tmp as DBT_INTERNAL_SOURCE on DBT_INTERNAL_SOURCE.page_view_id = DBT_INTERNAL_DEST.page_view_id when matched then update set "PAGE_VIEW_ID" = DBT_INTERNAL_SOURCE."PAGE_VIEW_ID","EVENT_ID" = DBT_INTERNAL_SOURCE."EVENT_ID","ANONYMOUS_USER_ID" = DBT_INTERNAL_SOURCE."ANONYMOUS_USER_ID","SESSION_ID" = DBT_INTERNAL_SOURCE."SESSION_ID","EVENT" = DBT_INTERNAL_SOURCE."EVENT","DEVICE_TYPE" = DBT_INTERNAL_SOURCE."DEVICE_TYPE","PAGE_URL" = DBT_INTERNAL_SOURCE."PAGE_URL","PAGE_TITLE" = DBT_INTERNAL_SOURCE."PAGE_TITLE","PAGE_URLSCHEME" = DBT_INTERNAL_SOURCE."PAGE_URLSCHEME","PAGE_URLHOST" = DBT_INTERNAL_SOURCE."PAGE_URLHOST","PAGE_URLPORT" = DBT_INTERNAL_SOURCE."PAGE_URLPORT","PAGE_URLPATH" = DBT_INTERNAL_SOURCE."PAGE_URLPATH","PAGE_URLQUERY" = DBT_INTERNAL_SOURCE."PAGE_URLQUERY","PAGE_URLFRAGMENT" = DBT_INTERNAL_SOURCE."PAGE_URLFRAGMENT","COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."COLLECTOR_TSTAMP","DERIVED_TSTAMP" = DBT_INTERNAL_SOURCE."DERIVED_TSTAMP","APPROX_TIME_ON_PAGE" = DBT_INTERNAL_SOURCE."APPROX_TIME_ON_PAGE","PAGE_VIEW_START" = DBT_INTERNAL_SOURCE."PAGE_VIEW_START","MAX_COLLECTOR_TSTAMP" = DBT_INTERNAL_SOURCE."MAX_COLLECTOR_TSTAMP" when not matched then insert ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP") values ("PAGE_VIEW_ID", "EVENT_ID", "ANONYMOUS_USER_ID", "SESSION_ID", "EVENT", "DEVICE_TYPE", "PAGE_URL", "PAGE_TITLE", "PAGE_URLSCHEME", "PAGE_URLHOST", "PAGE_URLPORT", "PAGE_URLPATH", "PAGE_URLQUERY", "PAGE_URLFRAGMENT", "COLLECTOR_TSTAMP", "DERIVED_TSTAMP", "APPROX_TIME_ON_PAGE", "PAGE_VIEW_START", "MAX_COLLECTOR_TSTAMP")set ``` ] --
demo
??? [instructor notes] DEMO: * Change the where clause to include the lookback window * Add the unique key to the config block --- class: subtitle #Hands-On (15 min)
Expand:
Increase the incremental cutoff by 3 days in `fct_page_views`
Add:
A unique key to the config block
Build:
Use the command that builds your models to your data platform
--- # Under the hood What's the DDL/DML that dbt is running? 1. Create a temp table of new records 2. Reconcile the existing table with the temp table, using one of: - `merge` ("upsert" new field values on row) - `delete+insert` ("delete" entire row and "insert" new row in place) - `insert overwrite` ("replace" _entire partitions_) Depends on: - database support (e.g. Redshift does not have a `merge` statement) - user input: `incremental_strategy` ([docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models/#what-is-an-incremental_strategy)) --- # Conceptual framework
--- # How do you set the cutoff? What if data arrives _really_ late? -- In our opinion, the goal of incremental models is to _approximate_ the "true" table in a fraction of the runtime: - Perform an analysis on the arrival time of data - Figure out your organization's tolerance for correctness - Set the cutoff based on these two inputs - Once a week, perform a `--full-refresh` run to get the "true" table -- "Close enough & performant" --- # Our Snowplow data: .left-column-66[
X: days between firing + collection Y: % of all events ] .right-column-33[ - Dataset: 2.5 months of Snowplow data, 285mm events - 99.82% of events arrive within 1 hr of firing - 99.88% within 24 hr - .highlight[99.93% within 72 hr] - ∴ 3 day cutoff, refresh weekly ] --- # What if... -- **A column is added/removed from my model in the future?** -- Shifts in available data or data types could cause issues -- 💡 Let's implement a configuration to account for these future changes! --- # Accounting for future table changes We can add an `on_schema_change` config to the top of our model ```sql {{ config( materialized = 'incremental', unique_key = 'page_view_id', on_schema_change = 'sync_all_columns' ) }} ``` -- This will: -- - add any new columns to the existing table - remove any columns that are now missing - inclusive of data type changes -- ### Note - this _will_ _not_ backfill values in old records for new columns --- class: subtitle, center, middle # Questions? --- ## Should I use an incremental model? ### Good candidates - Immutable event streams: tall + skinny table, append-only, no updates - If there _are_ any updates, a reliable `updated_at` field ### Not-so-good candidates - You have small-ish data - Your data changes constantly: new columns, renamed columns, etc. - Your data is updated in unpredictable ways - Your transformation performs comparisons or calculations that require _other_ rows --- # What about _truly_ massive datasets? Bigger datasets (and Big Data technologies) are different cost-optimization problems. - Always rebuild past 3 days. Fully ignore late arrivals. - Always replace data at the _partition level_. - No unique keys. - Targeted lookback? No way: too much extra data to scan. - Avoid full refreshes --- # Incremental models introduce tradeoffs: * Most incremental models are "approximately correct" * They introduce a level of code complexity (i.e. how easy it is for someone to understand your code) * Prioritizing correctness can negate performance gains from incrementality Think of incremental models as an _upgrade_, not a _starting point_. --- # If you do use incremental models: Keep the inputs and transformations of your incremental models as singular, simple, and immutable as possible. - Slowly changing dimensions, like a `product_name` that the company regularly rebrands? Join from a `dim` table. - Window functions for quality-of-life counters? Fix it in post—i.e., a separate model. This is how our Snowplow package calculates each user's `session_index`. --- class: middle ## Want even more info?
.center[[Read it!](https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303)] --- class: subtitle ## Checkpoint - **Share out:** What are some cases at your organization where you would implement incremental models? -- ## Questions? --- class: subtitle # Knowledge check You should be able to: - Explain use cases for incremental models - Build intuition for implementing incremental models - Understand the tradeoffs incremental models introduce ## Questions? --- # Resources * [Incremental Models: overview](https://docs.getdbt.com/docs/build/incremental-models) * [Incremental Models: in-depth](https://docs.getdbt.com/guides/best-practices/materializations/4-incremental-models) * [Limits of Incrementality](https://discourse.getdbt.com/t/on-the-limits-of-incrementality/303)