Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EIP-1559 fields (cli, airflow, dataflow) #96

Merged
merged 16 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions airflow/dags/polygonetl_airflow/build_load_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def add_load_tasks(task, file_format, allow_quoted_newlines=False):
dag=dag
)

def load_task():
def load_task(ds, **kwargs):
client = bigquery.Client()
job_config = bigquery.LoadJobConfig()
schema_path = os.path.join(dags_folder, 'resources/stages/raw/schemas/{task}.json'.format(task=task))
Expand All @@ -119,13 +119,35 @@ def load_task():
job_config.ignore_unknown_values = True

export_location_uri = 'gs://{bucket}/export'.format(bucket=output_bucket)
uri = '{export_location_uri}/{task}/*.{file_format}'.format(
export_location_uri=export_location_uri, task=task, file_format=file_format)
table_ref = client.dataset(dataset_name_raw).table(task)
if load_all_partitions:
# Support export files that are missing EIP-1559 fields (exported before EIP-1559 upgrade)
job_config.allow_jagged_rows = True

uri = "{export_location_uri}/{task}/*.{file_format}".format(
export_location_uri=export_location_uri,
task=task,
file_format=file_format,
)
table_ref = client.dataset(dataset_name_raw).table(task)
else:
uri = "{export_location_uri}/{task}/block_date={ds}/*.{file_format}".format(
export_location_uri=export_location_uri,
task=task,
ds=ds,
file_format=file_format,
)
table_name = f'{task}_{ds.replace("-", "_")}'
table_ref = client.dataset(dataset_name_raw).table(table_name)

load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
submit_bigquery_job(load_job, job_config)
assert load_job.state == 'DONE'

if not load_all_partitions:
table = client.get_table(table_ref)
table.expires = datetime.now() + timedelta(days=3)
client.update_table(table, ["expires"])

load_operator = PythonOperator(
task_id='load_{task}'.format(task=task),
python_callable=load_task,
Expand All @@ -142,6 +164,11 @@ def enrich_task(ds, **kwargs):
template_context['ds'] = ds
template_context['params'] = environment

if load_all_partitions or always_load_all_partitions:
template_context["params"]["ds_postfix"] = ""
else:
template_context["params"]["ds_postfix"] = "_" + ds.replace("-", "_")

client = bigquery.Client()

# Need to use a temporary table because bq query sets field modes to NULLABLE and descriptions to null
Expand Down
7 changes: 6 additions & 1 deletion airflow/dags/resources/stages/enrich/schemas/blocks.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@
"name": "transaction_count",
"type": "INT64",
"description": "The number of transactions in the block"
},
{
"name": "base_fee_per_gas",
"type": "INT64",
"description": "Protocol base fee per gas, which can move up or down"
}
]
]
22 changes: 21 additions & 1 deletion airflow/dags/resources/stages/enrich/schemas/transactions.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,25 @@
"type": "STRING",
"mode": "REQUIRED",
"description": "Hash of the block where this transaction was in"
},
{
"name": "max_fee_per_gas",
"type": "INT64",
"description": "Total fee that covers both base and priority fees"
},
{
"name": "max_priority_fee_per_gas",
"type": "INT64",
"description": "Fee given to miners to incentivize them to include the transaction"
},
{
"name": "transaction_type",
"type": "INT64",
"description": "Transaction type. One of 0 (Legacy), 1 (Legacy), 2 (EIP-1559)"
},
{
"name": "receipt_effective_gas_price",
"type": "INT64",
"description": "The actual value per gas deducted from the senders account. Replacement of gas_price after EIP-1559"
}
]
]
30 changes: 0 additions & 30 deletions airflow/dags/resources/stages/enrich/sqls/amended_tokens.sql

This file was deleted.

22 changes: 13 additions & 9 deletions airflow/dags/resources/stages/enrich/sqls/balances.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
with double_entry_book as (
-- debits
select to_address as address, CAST(value AS FLOAT64) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
and to_address is not null
Expand All @@ -10,27 +10,31 @@ with double_entry_book as (
union all
-- credits
select from_address as address, -CAST(value AS FLOAT64) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
and from_address is not null
and status = 1
and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
union all
-- transaction fees debits
select miner as address, sum(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions` as transactions
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks` as blocks on blocks.number = transactions.block_number
select
miner as address,
sum(cast(receipt_gas_used as numeric) * cast((receipt_effective_gas_price - coalesce(base_fee_per_gas, 0)) as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}` as transactions
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks{{params.ds_postfix}}` as blocks on blocks.number = transactions.block_number
where true
and date(transactions.block_timestamp) <= '{{ds}}'
group by blocks.miner
group by blocks.number, blocks.miner
union all
-- transaction fees credits
select from_address as address, -(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions`
select
from_address as address,
-(cast(receipt_gas_used as numeric) * cast(receipt_effective_gas_price as numeric)) as value
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}`
where true
and date(block_timestamp) <= '{{ds}}'
)
select address, sum(value) as eth_balance
from double_entry_book
group by address
group by address
5 changes: 3 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/blocks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ SELECT
blocks.extra_data,
blocks.gas_limit,
blocks.gas_used,
blocks.transaction_count
FROM {{params.dataset_name_raw}}.blocks AS blocks
blocks.transaction_count,
blocks.base_fee_per_gas
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/contracts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.contracts AS contracts
JOIN {{params.dataset_name_raw}}.blocks AS blocks ON contracts.block_number = blocks.number
FROM {{params.dataset_name_raw}}.contracts{{params.ds_postfix}} AS contracts
JOIN {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks ON contracts.block_number = blocks.number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/resources/stages/enrich/sqls/logs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.logs AS logs ON blocks.number = logs.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.logs{{params.ds_postfix}} AS logs ON blocks.number = logs.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
{% endif %}
{% endif %}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ insert (
extra_data,
gas_limit,
gas_used,
transaction_count
transaction_count,
base_fee_per_gas
) values (
timestamp,
number,
Expand All @@ -39,7 +40,8 @@ insert (
extra_data,
gas_limit,
gas_used,
transaction_count
transaction_count,
base_fee_per_gas
)
when not matched by source and date(timestamp) = '{{ds}}' then
delete
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ insert (
receipt_status,
block_timestamp,
block_number,
block_hash
block_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
transaction_type,
receipt_effective_gas_price
) values (
`hash`,
nonce,
Expand All @@ -37,7 +41,11 @@ insert (
receipt_status,
block_timestamp,
block_number,
block_hash
block_hash,
max_fee_per_gas,
max_priority_fee_per_gas,
transaction_type,
receipt_effective_gas_price
)
when not matched by source and date(block_timestamp) = '{{ds}}' then
delete
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/token_transfers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.token_transfers AS token_transfers ON blocks.number = token_transfers.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.token_transfers{{params.ds_postfix}} AS token_transfers ON blocks.number = token_transfers.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/resources/stages/enrich/sqls/tokens.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.tokens AS tokens ON blocks.number = tokens.block_number
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.tokens{{params.ds_postfix}} AS tokens ON blocks.number = tokens.block_number
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/resources/stages/enrich/sqls/traces.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ SELECT
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.traces AS traces ON blocks.number = traces.block_number
JOIN {{params.dataset_name_raw}}.transactions AS transactions
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.traces{{params.ds_postfix}} AS traces ON blocks.number = traces.block_number
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions
ON traces.transaction_index = transactions.transaction_index
and traces.block_number = transactions.block_number
where true
Expand Down
12 changes: 8 additions & 4 deletions airflow/dags/resources/stages/enrich/sqls/transactions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ SELECT
receipts.status AS receipt_status,
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
blocks.number AS block_number,
blocks.hash AS block_hash
FROM {{params.dataset_name_raw}}.blocks AS blocks
JOIN {{params.dataset_name_raw}}.transactions AS transactions ON blocks.number = transactions.block_number
JOIN {{params.dataset_name_raw}}.receipts AS receipts ON transactions.hash = receipts.transaction_hash
blocks.hash AS block_hash,
transactions.max_fee_per_gas,
transactions.max_priority_fee_per_gas,
transactions.transaction_type,
receipts.effective_gas_price as receipt_effective_gas_price
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions ON blocks.number = transactions.block_number
JOIN {{params.dataset_name_raw}}.receipts{{params.ds_postfix}} AS receipts ON transactions.hash = receipts.transaction_hash
where true
{% if not params.load_all_partitions %}
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
Expand Down
Loading