Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AN-5627/sl-2 (do not merge) #408

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/dbt_run_streamline_chainhead.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_realtime"
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_realtime" "polygon_models,tag:streamline_core_complete_receipts" "polygon_models,tag:streamline_core_realtime_receipts" "polygon_models,tag:streamline_core_complete_confirm_blocks" "polygon_models,tag:streamline_core_realtime_confirm_blocks"

- name: Run Chainhead Tests
run: |
dbt test -m "polygon_models,tag:chainhead"
2 changes: 1 addition & 1 deletion .github/workflows/dbt_run_streamline_history_adhoc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ on:
description: 'DBT Run Command'
required: true
options:
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_history"
- dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m "polygon_models,tag:streamline_core_complete" "polygon_models,tag:streamline_core_history" "polygon_models,tag:streamline_core_complete_receipts" "polygon_models,tag:streamline_core_history_receipts" "polygon_models,tag:streamline_core_complete_confirm_blocks" "polygon_models,tag:streamline_core_history_confirm_blocks"

env:
DBT_PROFILES_DIR: ./
Expand Down
56 changes: 45 additions & 11 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,55 @@ vars:
START_GHA_TASKS: False
STUDIO_TEST_USER_ID: '{{ env_var("STUDIO_TEST_USER_ID", "98d15c30-9fa5-43cd-9c69-3d4c0bb269f5") }}'

API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
#### STREAMLINE 2.0 BEGIN ####

API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]

["INTERNAL_DEV"]
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_POLYGON_API_DEV
EXTERNAL_FUNCTION_URI: rzyjrd54s6.execute-api.us-east-1.amazonaws.com/dev/
ROLES:
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_POLYGON_API
EXTERNAL_FUNCTION_URI: p6dhi5vxn4.execute-api.us-east-1.amazonaws.com/prod/
API_INTEGRATION: aws_polygon_api_stg_v2
EXTERNAL_FUNCTION_URI:
API_AWS_ROLE_ARN:
ROLES:
- AWS_LAMBDA_POLYGON_API
- INTERNAL_DEV
- BI_ANALYTICS_READER

# prod:
# API_INTEGRATION: aws_polygon_api_prod_v2
# EXTERNAL_FUNCTION_URI:
# API_AWS_ROLE_ARN:
# ROLES:
# - AWS_LAMBDA_POLYGON_API
# - INTERNAL_DEV
# - DBT_CLOUD_POLYGON

#### STREAMLINE 2.0 END ####

#### FSC_EVM BEGIN ####
# Visit https://github.com/FlipsideCrypto/fsc-evm/wiki for more information on required and optional variables

### GLOBAL VARIABLES BEGIN ###
## REQUIRED
GLOBAL_PROD_DB_NAME: 'polygon'
GLOBAL_NODE_SECRET_PATH: 'Vault/prod/polygon/quicknode/mainnet'
GLOBAL_BLOCKS_PER_HOUR: 1700
GLOBAL_USES_STREAMLINE_V1: True
GLOBAL_USES_SINGLE_FLIGHT_METHOD: True

### GLOBAL VARIABLES END ###

### MAIN_PACKAGE VARIABLES BEGIN ###

### CORE ###
## REQUIRED

## OPTIONAL
# GOLD_FULL_REFRESH: True
# SILVER_FULL_REFRESH: True

### MAIN_PACKAGE VARIABLES END ###

#### FSC_EVM END ####
226 changes: 226 additions & 0 deletions macros/fsc_evm_temp/_legacy/silver_traces.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
{% macro silver_traces_v1(
full_reload_start_block,
full_reload_blocks,
full_reload_mode = false,
TRACES_ARB_MODE = false,
TRACES_SEI_MODE = false,
TRACES_KAIA_MODE = false,
use_partition_key = false,
schema_name = 'bronze'
) %}
WITH bronze_traces AS (
SELECT
block_number,
{% if use_partition_key %}
partition_key,
{% else %}
_partition_by_block_id AS partition_key,
{% endif %}

VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
{% if TRACES_SEI_MODE %}
DATA :txHash :: STRING AS tx_hash,
{% endif %}
_inserted_timestamp
FROM

{% if is_incremental() and not full_reload_mode %}
{{ ref(
schema_name ~ '__traces'
) }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
AND DATA :result IS NOT NULL {% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}

{% elif is_incremental() and full_reload_mode %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key BETWEEN (
SELECT
MAX(partition_key) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(partition_key) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% else %}
_partition_by_block_id BETWEEN (
SELECT
MAX(_partition_by_block_id) - 100000
FROM
{{ this }}
)
AND (
SELECT
MAX(_partition_by_block_id) + {{ full_reload_blocks }}
FROM
{{ this }}
)
{% endif %}

{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% else %}
{{ ref(
schema_name ~ '__traces_fr'
) }}
WHERE
{% if use_partition_key %}
partition_key <= {{ full_reload_start_block }}
{% else %}
_partition_by_block_id <= {{ full_reload_start_block }}
{% endif %}

{% if TRACES_ARB_MODE %}
AND block_number > 22207817
{% endif %}
{% endif %}

qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position
ORDER BY
_inserted_timestamp DESC)) = 1
),
flatten_traces AS (
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
IFF(
path IN (
'result',
'result.value',
'result.type',
'result.to',
'result.input',
'result.gasUsed',
'result.gas',
'result.from',
'result.output',
'result.error',
'result.revertReason',
'result.time',
'gasUsed',
'gas',
'type',
'to',
'from',
'value',
'input',
'error',
'output',
'time',
'revertReason'
{% if TRACES_ARB_MODE %},
'afterEVMTransfers',
'beforeEVMTransfers',
'result.afterEVMTransfers',
'result.beforeEVMTransfers'
{% endif %}
{% if TRACES_KAIA_MODE %},
'reverted',
'result.reverted'
{% endif %}
),
'ORIGIN',
REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '')
) AS trace_address,
_inserted_timestamp,
OBJECT_AGG(
key,
VALUE
) AS trace_json,
CASE
WHEN trace_address = 'ORIGIN' THEN NULL
WHEN POSITION(
'_' IN trace_address
) = 0 THEN 'ORIGIN'
ELSE REGEXP_REPLACE(
trace_address,
'_[0-9]+$',
'',
1,
1
)
END AS parent_trace_address,
SPLIT(
trace_address,
'_'
) AS trace_address_array
FROM
bronze_traces txs,
TABLE(
FLATTEN(
input => PARSE_JSON(
txs.full_traces
),
recursive => TRUE
)
) f
WHERE
f.index IS NULL
AND f.key != 'calls'
AND f.path != 'result'
{% if TRACES_ARB_MODE %}
AND f.path NOT LIKE 'afterEVMTransfers[%'
AND f.path NOT LIKE 'beforeEVMTransfers[%'
{% endif %}
{% if TRACES_KAIA_MODE %}
and f.key not in ('message', 'contract')
{% endif %}
GROUP BY
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
partition_key,
trace_address,
_inserted_timestamp
)
SELECT
block_number,
{% if TRACES_SEI_MODE %}
tx_hash,
{% else %}
tx_position,
{% endif %}
trace_address,
parent_trace_address,
trace_address_array,
trace_json,
partition_key,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number'] +
(['tx_hash'] if TRACES_SEI_MODE else ['tx_position']) +
['trace_address']
) }} AS traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id
ORDER BY
_inserted_timestamp DESC)) = 1
{% endmacro %}
Loading