Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove current timestamps and insert ts #76

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ with
, failed_transaction_count
, batch_id
, batch_run_date
, batch_insert_ts
from {{ ref('stg_history_ledgers') }}
where
cast(batch_run_date as date) < date_add(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day)
Expand Down Expand Up @@ -67,7 +66,6 @@ with
, extra_signers
, batch_id
, batch_run_date
, batch_insert_ts
, resource_fee
, soroban_resources_instructions
, soroban_resources_read_bytes
Expand Down Expand Up @@ -195,7 +193,6 @@ with
, type_string
, batch_id
, batch_run_date
, batch_insert_ts
, asset_balance_changes
, parameters
, parameters_decoded
Expand Down Expand Up @@ -390,7 +387,6 @@ with
-- general fields
, hist_ops.batch_id
, hist_ops.batch_run_date
, current_timestamp() as batch_insert_ts
from history_operations as hist_ops
join history_transactions as hist_trans
on hist_ops.transaction_id = hist_trans.transaction_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ with
-- general fields
, enriched.batch_id
, enriched.batch_run_date
, enriched.batch_insert_ts
from {{ ref('enriched_history_operations') }} as enriched
where
enriched.type in (24, 25, 26)
Expand Down
2 changes: 0 additions & 2 deletions models/marts/history_assets.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
, exclude_duplicates.asset_issuer
, new_load.batch_id
, new_load.batch_run_date
, new_load.batch_insert_ts
from exclude_duplicates
left join
new_load on
Expand Down Expand Up @@ -76,7 +75,6 @@
, asset_issuer
, batch_id
, batch_run_date
, batch_insert_ts
from prep_dedup
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ with
, concat(s.account_id, '-', s.signer
) as unique_id
, s.batch_run_date
, s.batch_insert_ts
, row_number()
over (
partition by s.account_id, s.signer
Expand All @@ -39,10 +38,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched
where
s.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(s.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
s.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)
select
Expand All @@ -56,7 +52,5 @@ select
, deleted
, unique_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_signers
where row_nr = 1
8 changes: 1 addition & 7 deletions models/marts/ledger_current_state/accounts_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ with
, a.sequence_ledger
, a.sequence_time
, a.batch_run_date
, a.batch_insert_ts
, row_number()
over (
partition by a.account_id
Expand All @@ -53,10 +52,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched
where
a.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(a.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
a.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)

Expand Down Expand Up @@ -107,6 +103,4 @@ select
, sequence_ledger
, sequence_time
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from get_creation_account
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ with
, cb.deleted
, cb.batch_id
, cb.batch_run_date
, cb.batch_insert_ts
, cb.closed_at
, cb.ledger_sequence
, row_number()
Expand All @@ -37,7 +36,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cb.batch_run_date >= date_sub(current_date(), interval 2 day)
cb.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 2 day)
{% endif %}
)

Expand All @@ -57,7 +56,5 @@ select
, batch_run_date
, closed_at
, ledger_sequence
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_balance
where rn = 1
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ with
, cfg.deleted
, cfg.batch_id
, cfg.batch_run_date
, cfg.batch_insert_ts
, cfg.ledger_sequence
, row_number()
over (
Expand All @@ -72,10 +71,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cfg.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cfg.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
date(cfg.closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)

Expand Down Expand Up @@ -130,7 +126,5 @@ select
, deleted
, batch_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_settings
where rn = 1
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ with
, cc.deleted
, cc.batch_id
, cc.batch_run_date
, cc.batch_insert_ts
, cc.ledger_sequence
, cc.ledger_key_hash
, row_number()
Expand All @@ -33,10 +32,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cc.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cc.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
date(cc.closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)

Expand All @@ -51,7 +47,5 @@ select
, deleted
, batch_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_code
where rn = 1
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ with
, cd.deleted
, cd.batch_id
, cd.batch_run_date
, cd.batch_insert_ts
, cd.ledger_sequence
, cd.ledger_key_hash
, concat(cd.contract_id, '-', cd.ledger_key_hash) as unique_id
Expand All @@ -40,10 +39,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
cd.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(cd.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
date(cd.closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)

Expand All @@ -64,8 +60,6 @@ select
, deleted
, batch_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
, unique_id
from current_data
where rn = 1
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ with
, l.closed_at
, lp.deleted
, lp.batch_run_date
, lp.batch_insert_ts
, row_number()
over (
partition by lp.liquidity_pool_id
Expand All @@ -49,10 +48,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched
where
lp.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(lp.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
lp.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}

)
Expand All @@ -75,7 +71,5 @@ select
, closed_at
, deleted
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_lps
where row_nr = 1
8 changes: 1 addition & 7 deletions models/marts/ledger_current_state/offers_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ with
, o.deleted
, o.sponsor
, o.batch_run_date
, o.batch_insert_ts
, row_number()
over (
partition by o.offer_id
Expand All @@ -43,10 +42,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched
where
o.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(o.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
o.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}

)
Expand All @@ -70,7 +66,5 @@ select
, deleted
, sponsor
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_offers
where row_nr = 1
8 changes: 1 addition & 7 deletions models/marts/ledger_current_state/trust_lines_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ with
, concat(tl.account_id, '-', tl.asset_code, '-', tl.asset_issuer, '-', tl.liquidity_pool_id
) as unique_id
, tl.batch_run_date
, tl.batch_insert_ts
, row_number()
over (
partition by tl.account_id, tl.asset_code, tl.asset_issuer, tl.liquidity_pool_id
Expand All @@ -46,10 +45,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
tl.batch_run_date >= date_sub(current_date(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(tl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
tl.batch_run_date >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}

)
Expand All @@ -71,7 +67,5 @@ select
, deleted
, unique_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_tls
where row_nr = 1
8 changes: 1 addition & 7 deletions models/marts/ledger_current_state/ttl_current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ with
, ttl.deleted
, ttl.batch_id
, ttl.batch_run_date
, ttl.batch_insert_ts
, row_number()
over (
partition by ttl.key_hash
Expand All @@ -31,10 +30,7 @@ with
{% if is_incremental() %}
-- limit the number of partitions fetched incrementally
where
ttl.closed_at >= timestamp_sub(current_timestamp(), interval 30 day)
-- fetch the last week of records loaded
and timestamp_add(ttl.batch_insert_ts, interval 7 day)
> (select max(t.upstream_insert_ts) from {{ this }} as t)
date(ttl.closed_at) >= date_sub(date('{{ dbt_airflow_macros.ds() }}'), interval 30 day)
{% endif %}
)

Expand All @@ -47,7 +43,5 @@ select
, deleted
, batch_id
, batch_run_date
, batch_insert_ts as upstream_insert_ts
, current_timestamp() as batch_insert_ts
from current_expiration
where rn = 1
1 change: 0 additions & 1 deletion models/staging/stg_account_signers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ with
, deleted
, batch_id
, batch_run_date
, batch_insert_ts
, closed_at
, ledger_sequence
from raw_table
Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_accounts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ with
, sequence_time
, batch_id
, batch_run_date
, batch_insert_ts
, closed_at
, ledger_sequence
from raw_table
Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_claimable_balances.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ with
, deleted
, batch_id
, batch_run_date
, batch_insert_ts
, closed_at
, ledger_sequence
from raw_table
Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_config_settings.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ with
, deleted
, batch_id
, batch_run_date
, batch_insert_ts
from raw_table
)

Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_contract_code.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ with
, deleted
, batch_id
, batch_run_date
, batch_insert_ts
from raw_table
)

Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_contract_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ with
, deleted
, batch_id
, batch_run_date
, batch_insert_ts
from raw_table
)

Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_history_effects.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ with
, closed_at
, batch_id
, batch_run_date
, batch_insert_ts
from raw_table
)

Expand Down
1 change: 0 additions & 1 deletion models/staging/stg_history_ledgers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ with
, soroban_fee_write_1kb
, batch_id
, batch_run_date
, batch_insert_ts
from raw_table
)

Expand Down
Loading
Loading