-
Notifications
You must be signed in to change notification settings - Fork 0
/
ethereum_erc20_transactions.sql
233 lines (230 loc) · 8.92 KB
/
ethereum_erc20_transactions.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
-- full load
CREATE table target_database.table_name WITH (
format = 'PARQUET',
parquet_compression = 'SNAPPY',
partitioned_by = array [ 'date_partition' ],
external_location = 'bucket_name/layer/data_source/table_name/'
) AS
WITH tokens_metadata AS (
SELECT
contract_address,
decimals,
symbol,
type,
hash_partition
FROM (
SELECT
contract_address,
decimals,
symbol,
standard as type,
hash_partition,
ROW_NUMBER() OVER (
PARTITION BY contract_address
ORDER BY last_refreshed DESC
) AS rn
FROM source_database.ethereum_tokens_metadata
WHERE standard = 'ERC-20'
)
WHERE rn = 1
),
erc20_transactions_source AS (
SELECT DISTINCT
tf.block_number,
tf.block_timestamp AS timestamp,
tf.transaction_hash AS hash,
tf.from_address,
tf.to_address,
tf.token_address,
tf.value,
ts.nonce,
ts.transaction_index,
CASE
WHEN tc.status IS NULL OR tc.status = 1 THEN false
ELSE true
END AS is_error,
tf.date_partition
FROM
source_database.ethereum_token_transfers AS tf
INNER JOIN source_database.ethereum_traces tc ON
tc.transaction_hash = tf.transaction_hash
AND tc.block_number = tf.block_number
AND tc.date_partition = tf.date_partition
INNER JOIN source_database.ethereum_transactions ts ON
ts.hash = tf.transaction_hash
AND ts.block_number = tf.block_number
AND ts.date_partition = tf.date_partition
WHERE tf.block_number >= filter_value
AND tc.status = 1
AND tf.value > 0
),
erc20_transactions_final AS (
SELECT DISTINCT
t.block_number,
t.timestamp,
t.hash,
t.from_address,
t.to_address,
t.value / (POWER(10, COALESCE(TRY_CAST(tk.decimals AS decimal), 18) / 2) * POWER(10, COALESCE(TRY_CAST(tk.decimals AS decimal), 18) / 2)) AS current_value,
tk.contract_address,
tk.symbol AS token_symbol,
COALESCE(TRY_CAST(tk.decimals AS decimal), 18) AS token_decimal,
tk.type AS token_type,
t.is_error,
CASE
WHEN c_1.address IS NOT NULL THEN true ELSE false
END AS from_is_contract,
CASE
WHEN c_2.address IS NOT NULL THEN true ELSE false
END AS to_is_contract,
CASE
WHEN r_1.token_address IS NOT NULL THEN true ELSE false
END AS is_rugpull,
CASE
WHEN r_2.token_address IS NOT NULL THEN true ELSE false
END AS to_address_is_rugpull,
SUBSTR(t.from_address, 3, 2) AS from_hash_partition,
SUBSTR(t.to_address, 3, 2) AS to_hash_partition,
t.date_partition
FROM erc20_transactions_source AS t
INNER join tokens_metadata as tk
ON tk.contract_address = t.token_address
AND tk.hash_partition = SUBSTR(t.token_address, 3, 2)
LEFT JOIN source_database.ethereum_contracts AS c_1 -- when the from address is a contract
ON c_1.address = t.from_address
AND c_1.hash_partition = SUBSTR(t.from_address, 3, 2)
LEFT JOIN source_database.ethereum_contracts AS c_2 -- when the to addresses is a contract
ON c_2.address = t.to_address
AND c_2.hash_partition = SUBSTR(t.to_address, 3, 2)
LEFT JOIN db_analytics_prod.rugpull_market_data AS r_1 -- when it is a rugpull
ON lower(r_1.token_address) = t.token_address
AND r_1.date_partition = t.date_partition
AND (
(DATE(r_1.date_time) < DATE(FROM_UNIXTIME(t.timestamp)) AND DATE(r_1.date_time) >= DATE(FROM_UNIXTIME(t.timestamp)) - INTERVAL '1' DAY)
OR (DATE(r_1.date_time) = DATE(FROM_UNIXTIME(t.timestamp)))
)
LEFT JOIN db_analytics_prod.rugpull_market_data AS r_2 -- when to_address is a rugpull
ON lower(r_2.token_address) = t.to_address
AND r_2.date_partition = t.date_partition
AND (
(DATE(r_2.date_time) < DATE(FROM_UNIXTIME(t.timestamp)) AND DATE(r_2.date_time) >= DATE(FROM_UNIXTIME(t.timestamp)) - INTERVAL '1' DAY)
OR (DATE(r_2.date_time) = DATE(FROM_UNIXTIME(t.timestamp)))
)
)
select CAST(uuid() AS varchar) AS uuid, * from erc20_transactions_final
-- incremental load
INSERT INTO target_database.table_name
WITH tokens_metadata AS (
SELECT
contract_address,
decimals,
symbol,
type,
hash_partition
FROM (
SELECT
contract_address,
decimals,
symbol,
standard as type,
hash_partition,
ROW_NUMBER() OVER (
PARTITION BY contract_address
ORDER BY last_refreshed DESC
) AS rn
FROM source_database.ethereum_tokens_metadata
WHERE standard = 'ERC-20'
)
WHERE rn = 1
),
erc20_transactions_source AS (
SELECT DISTINCT
tf.block_number,
tf.block_timestamp AS timestamp,
tf.transaction_hash AS hash,
tf.from_address,
tf.to_address,
tf.token_address,
tf.value,
ts.nonce,
ts.transaction_index,
CASE
WHEN tc.status = 1 THEN false ELSE true
END AS is_error,
tf.date_partition
FROM
source_database.ethereum_token_transfers AS tf
INNER JOIN source_database.ethereum_traces tc ON
tc.transaction_hash = tf.transaction_hash
AND tc.block_number = tf.block_number
AND tc.date_partition = tf.date_partition
INNER JOIN source_database.ethereum_transactions ts ON
ts.hash = tf.transaction_hash
AND ts.block_number = tf.block_number
AND ts.date_partition = tf.date_partition
WHERE tf.block_number >= filter_value
AND tc.status = 1
AND tf.value > 0
),
erc20_transactions_final AS (
SELECT DISTINCT
t.block_number,
t.timestamp,
t.hash,
t.from_address,
t.to_address,
t.value / (POWER(10, COALESCE(TRY_CAST(tk.decimals AS decimal), 18) / 2) * POWER(10, COALESCE(TRY_CAST(tk.decimals AS decimal), 18) / 2)) AS current_value,
tk.contract_address,
tk.symbol AS token_symbol,
COALESCE(TRY_CAST(tk.decimals AS decimal), 18) AS token_decimal,
tk.type AS token_type,
t.is_error,
CASE
WHEN c_1.address IS NOT NULL THEN true ELSE false
END AS from_is_contract,
CASE
WHEN c_2.address IS NOT NULL THEN true ELSE false
END AS to_is_contract,
CASE
WHEN r_1.token_address IS NOT NULL THEN true ELSE false
END AS is_rugpull,
CASE
WHEN r_2.token_address IS NOT NULL THEN true ELSE false
END AS to_address_is_rugpull,
SUBSTR(t.from_address, 3, 2) AS from_hash_partition,
SUBSTR(t.to_address, 3, 2) AS to_hash_partition,
t.date_partition
FROM erc20_transactions_source AS t
INNER join tokens_metadata as tk
ON tk.contract_address = t.token_address
AND tk.hash_partition = SUBSTR(t.token_address, 3, 2)
LEFT JOIN source_database.ethereum_contracts AS c_1 -- when the from address is a contract
ON c_1.address = t.from_address
AND c_1.hash_partition = SUBSTR(t.from_address, 3, 2)
LEFT JOIN source_database.ethereum_contracts AS c_2 -- when the to addresses is a contract
ON c_2.address = t.to_address
AND c_2.hash_partition = SUBSTR(t.to_address, 3, 2)
LEFT JOIN db_analytics_prod.rugpull_market_data AS r_1 -- when it is a rugpull
ON lower(r_1.token_address) = t.token_address
AND r_1.date_partition = t.date_partition
AND (
(DATE(r_1.date_time) < DATE(FROM_UNIXTIME(t.timestamp)) AND DATE(r_1.date_time) >= DATE(FROM_UNIXTIME(t.timestamp)) - INTERVAL '1' DAY)
OR (DATE(r_1.date_time) = DATE(FROM_UNIXTIME(t.timestamp)))
)
LEFT JOIN db_analytics_prod.rugpull_market_data AS r_2 -- when to_address is a rugpull
ON lower(r_2.token_address) = t.to_address
AND r_2.date_partition = t.date_partition
AND (
(DATE(r_2.date_time) < DATE(FROM_UNIXTIME(t.timestamp)) AND DATE(r_2.date_time) >= DATE(FROM_UNIXTIME(t.timestamp)) - INTERVAL '1' DAY)
OR (DATE(r_2.date_time) = DATE(FROM_UNIXTIME(t.timestamp)))
)
)
select CAST(uuid() AS varchar) AS uuid, * from erc20_transactions_final AS source
WHERE NOT EXISTS (
SELECT 1 FROM target_database.table_name as target
WHERE target.hash = source.hash
AND target.block_number = source.block_number
AND target.from_address = source.from_address
AND target.to_address = source.to_address
AND target.date_partition = source.date_partition
);