-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
350 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
defmodule AeMdw.Migrations.EventLogsHash do | ||
# credo:disable-for-this-file | ||
@moduledoc """ | ||
Reindex logs to move the hash out of the key and use built-in encoding for the value part of the records. | ||
""" | ||
|
||
alias AeMdw.Db.RocksDb | ||
alias AeMdw.Collection | ||
alias AeMdw.Db.Model | ||
alias AeMdw.Db.WriteMutation | ||
alias AeMdw.Db.State | ||
|
||
require Model | ||
require Logger | ||
|
||
@range_size 200_000 | ||
|
||
@dialyzer :no_return | ||
|
||
defmodule DeleteKeysMutation do | ||
@moduledoc false | ||
alias AeMdw.Db.Model | ||
alias AeMdw.Db.State | ||
alias AeMdw.Db.RocksDb | ||
|
||
require Model | ||
|
||
@derive AeMdw.Db.Mutation | ||
defstruct [:args] | ||
|
||
def new(args) do | ||
%__MODULE__{args: args} | ||
end | ||
|
||
def execute(%__MODULE__{args: [log_key, data_log_key, idx_log_key]}, state) do | ||
txn = Map.get(state.store, :txn) | ||
:ok = RocksDb.delete(txn, Model.ContractLog, :sext.encode(log_key)) | ||
:ok = RocksDb.delete(txn, Model.DataContractLog, :sext.encode(data_log_key)) | ||
:ok = RocksDb.delete(txn, Model.IdxContractLog, :sext.encode(idx_log_key)) | ||
state | ||
end | ||
end | ||
|
||
@spec run(State.t(), boolean()) :: {:ok, non_neg_integer()} | ||
def run(state, _from_start?) do | ||
case State.prev(state, Model.Tx, nil) do | ||
{:ok, db_last_txi} -> | ||
num_ranges = div(db_last_txi, @range_size) + 1 | ||
|
||
only_encoding? = | ||
match?( | ||
{:ok, {_call_txi, _log_idx, _create_txi}}, | ||
State.next(state, Model.ContractLog, nil) | ||
) | ||
|
||
mutations_tasks = | ||
if only_encoding?, do: &reencoding_logs_mutations/4, else: &hash_logs_mutations/4 | ||
|
||
count = | ||
0..(num_ranges - 1) | ||
|> Enum.map(fn i -> | ||
first_txi = i * @range_size | ||
last_txi = min((i + 1) * @range_size, db_last_txi) | ||
|
||
num_tasks = System.schedulers_online() | ||
amount_per_task = trunc(:math.ceil((last_txi - first_txi) / num_tasks)) | ||
|
||
log("num_tasks: #{num_tasks}, amount_per_task: #{amount_per_task}") | ||
log("first_txi: #{first_txi}, last_txi: #{last_txi}") | ||
|
||
:erlang.garbage_collect() | ||
|
||
range_mutations = | ||
state | ||
|> mutations_tasks.(first_txi, num_tasks, amount_per_task) | ||
|> Task.await_many(60_000 * 20) | ||
|> List.flatten() | ||
|> Enum.uniq() | ||
|
||
count = length(range_mutations) | ||
log("commiting #{count} mutations...") | ||
|
||
range_mutations | ||
|> Enum.chunk_every(240_000) | ||
|> Enum.each(fn write_mutations -> | ||
{ts, _state} = :timer.tc(fn -> _state = State.commit(state, write_mutations) end) | ||
IO.puts("commit: #{inspect({length(write_mutations), ts})}") | ||
end) | ||
|
||
count | ||
end) | ||
|> Enum.sum() | ||
|
||
{:ok, count} | ||
|
||
:none -> | ||
{:ok, 0} | ||
end | ||
end | ||
|
||
defp hash_logs_mutations(state, first_txi, num_tasks, amount_per_task) do | ||
Enum.map(0..(num_tasks - 1), fn i -> | ||
task_first_txi = first_txi + i * amount_per_task | ||
task_last_txi = first_txi + (i + 1) * amount_per_task | ||
cursor = {task_first_txi, 0, 0, <<>>} | ||
boundary = {cursor, {task_last_txi, nil, nil, <<>>}} | ||
|
||
Task.async(fn -> | ||
state | ||
|> Collection.stream(Model.IdxContractLog, :forward, boundary, cursor) | ||
|> Enum.flat_map(fn {call_txi, log_idx, create_txi, evt_hash} = idx_log_key -> | ||
log_key = {create_txi, call_txi, evt_hash, log_idx} | ||
|
||
{:contract_log, ^log_key, ext_contract, args, data} = fetch_old!(log_key) | ||
|
||
m_log = | ||
Model.contract_log( | ||
index: {create_txi, call_txi, log_idx}, | ||
ext_contract: ext_contract, | ||
args: args, | ||
data: data, | ||
hash: evt_hash | ||
) | ||
|
||
m_data_log = Model.data_contract_log(index: {data, call_txi, create_txi, log_idx}) | ||
m_idx_log = Model.idx_contract_log(index: {call_txi, log_idx, create_txi}) | ||
|
||
data_log_key = {data, call_txi, create_txi, evt_hash, log_idx} | ||
|
||
[ | ||
DeleteKeysMutation.new([log_key, data_log_key, idx_log_key]), | ||
WriteMutation.new(Model.ContractLog, m_log), | ||
WriteMutation.new(Model.DataContractLog, m_data_log), | ||
WriteMutation.new(Model.IdxContractLog, m_idx_log) | ||
] | ||
end) | ||
end) | ||
end) | ||
end | ||
|
||
defp reencoding_logs_mutations(state, first_txi, num_tasks, amount_per_task) do | ||
Enum.map(0..(num_tasks - 1), fn i -> | ||
task_first_txi = first_txi + i * amount_per_task | ||
task_last_txi = first_txi + (i + 1) * amount_per_task | ||
cursor = {task_first_txi, 0, 0} | ||
boundary = {cursor, {task_last_txi, nil, nil}} | ||
|
||
Task.async(fn -> | ||
state | ||
|> Collection.stream(Model.IdxContractLog, :forward, boundary, cursor) | ||
|> Enum.flat_map(fn {call_txi, log_idx, create_txi} -> | ||
log_key = {create_txi, call_txi, log_idx} | ||
# rewrite with new encoding | ||
try do | ||
[WriteMutation.new(Model.ContractLog, fetch_old!(log_key))] | ||
rescue | ||
_value -> | ||
[] | ||
end | ||
end) | ||
end) | ||
end) | ||
end | ||
|
||
@spec log(binary()) :: :ok | ||
def log(msg) do | ||
Logger.info(msg) | ||
IO.puts(msg) | ||
end | ||
|
||
defp fetch_old!(index) do | ||
key = :sext.encode(index) | ||
|
||
{:ok, value} = RocksDb.get(Model.ContractLog, key) | ||
record_type = Model.record(Model.ContractLog) | ||
|
||
value | ||
|> :sext.decode() | ||
|> Tuple.insert_at(0, index) | ||
|> Tuple.insert_at(0, record_type) | ||
end | ||
end |
161 changes: 161 additions & 0 deletions
161
priv/migrations/20230713120000_names_nested_restructure.ex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
defmodule AeMdw.Migrations.NamesNestedRestructure do | ||
@moduledoc """ | ||
Re-indexes names and auctions nested references into separate tables. | ||
""" | ||
|
||
alias AeMdw.Collection | ||
alias AeMdw.Db.WriteMutation | ||
alias AeMdw.Db.Model | ||
alias AeMdw.Db.State | ||
|
||
require Model | ||
require Record | ||
|
||
Record.defrecord(:auction_bid, | ||
index: nil, | ||
block_index_txi_idx: nil, | ||
expire_height: nil, | ||
owner: nil, | ||
bids: nil | ||
) | ||
|
||
Record.defrecord(:name, | ||
index: nil, | ||
active: nil, | ||
expire: nil, | ||
claims: nil, | ||
updates: nil, | ||
transfers: nil, | ||
revoke: nil, | ||
auction_timeout: nil, | ||
owner: nil, | ||
previous: nil | ||
) | ||
|
||
@chunk_size 1_000 | ||
|
||
@spec run(State.t(), boolean()) :: {:ok, non_neg_integer()} | ||
def run(state, _from_start?) do | ||
count = | ||
[Model.ActiveName, Model.InactiveName, Model.AuctionBid] | ||
|> Enum.map(fn table -> | ||
state | ||
|> Collection.stream(table, nil) | ||
|> Stream.map(&{&1, table}) | ||
end) | ||
|> Collection.merge(:forward) | ||
|> Stream.transform("", fn {plain_name, source}, prev_char -> | ||
first_char = String.at(plain_name, 0) | ||
|
||
if first_char != prev_char do | ||
IO.puts("Processing names and auctions starting with #{first_char}..") | ||
end | ||
|
||
{[{plain_name, source}], first_char} | ||
end) | ||
|> Stream.map(fn {plain_name, source} -> | ||
{State.fetch!(state, source, plain_name), source} | ||
end) | ||
|> Stream.map(fn {old_record, source} -> | ||
with {:ok, new_record, nested_mutations} <- restructure_record(old_record, source) do | ||
{:ok, [WriteMutation.new(source, new_record) | nested_mutations]} | ||
end | ||
end) | ||
|> Stream.filter(&match?({:ok, _mutations}, &1)) | ||
|> Stream.chunk_every(@chunk_size) | ||
|> Stream.map(fn ok_mutations -> | ||
mutations = Enum.flat_map(ok_mutations, fn {:ok, mutations} -> mutations end) | ||
|
||
_new_state = State.commit(state, mutations) | ||
|
||
length(ok_mutations) | ||
end) | ||
|> Enum.sum() | ||
|
||
{:ok, count} | ||
end | ||
|
||
defp restructure_record(nil, _source), do: {:ok, nil, []} | ||
|
||
defp restructure_record(auction_bid() = auction_bid, Model.AuctionBid) do | ||
auction_bid( | ||
index: plain_name, | ||
block_index_txi_idx: block_index_txi_idx, | ||
expire_height: expire_height, | ||
owner: owner, | ||
bids: bids | ||
) = auction_bid | ||
|
||
new_record = | ||
Model.auction_bid( | ||
index: plain_name, | ||
block_index_txi_idx: block_index_txi_idx, | ||
expire_height: expire_height, | ||
owner: owner | ||
) | ||
|
||
nested_mutations = | ||
Enum.map(bids, fn {_bi, bid_txi_idx} -> | ||
WriteMutation.new( | ||
Model.AuctionBidClaim, | ||
Model.auction_bid_claim(index: {plain_name, expire_height, bid_txi_idx}) | ||
) | ||
end) | ||
|
||
{:ok, new_record, nested_mutations} | ||
end | ||
|
||
defp restructure_record(name() = name, source) do | ||
name( | ||
index: plain_name, | ||
active: active, | ||
expire: expire, | ||
claims: claims, | ||
updates: updates, | ||
transfers: transfers, | ||
revoke: revoke, | ||
auction_timeout: auction_timeout, | ||
owner: owner, | ||
previous: previous | ||
) = name | ||
|
||
claims_mutations = | ||
Enum.map(claims, fn {_bi, txi_idx} -> | ||
WriteMutation.new(Model.NameClaim, Model.name_claim(index: {plain_name, active, txi_idx})) | ||
end) | ||
|
||
updates_mutations = | ||
Enum.map(updates, fn {_bi, txi_idx} -> | ||
WriteMutation.new( | ||
Model.NameUpdate, | ||
Model.name_update(index: {plain_name, active, txi_idx}) | ||
) | ||
end) | ||
|
||
transfers_mutations = | ||
Enum.map(transfers, fn {_bi, txi_idx} -> | ||
WriteMutation.new( | ||
Model.NameTransfer, | ||
Model.name_transfer(index: {plain_name, active, txi_idx}) | ||
) | ||
end) | ||
|
||
nested_mutations = claims_mutations ++ updates_mutations ++ transfers_mutations | ||
{:ok, new_previous, previous_mutations} = restructure_record(previous, source) | ||
|
||
new_record = | ||
Model.name( | ||
index: plain_name, | ||
active: active, | ||
expire: expire, | ||
revoke: revoke, | ||
auction_timeout: auction_timeout, | ||
owner: owner, | ||
previous: new_previous | ||
) | ||
|
||
{:ok, new_record, previous_mutations ++ nested_mutations} | ||
end | ||
|
||
defp restructure_record(_record, _source), do: :already_restructured | ||
end |