diff --git a/lib/ae_mdw/node/db.ex b/lib/ae_mdw/node/db.ex index 5bd95da39..b9d36ea8c 100644 --- a/lib/ae_mdw/node/db.ex +++ b/lib/ae_mdw/node/db.ex @@ -274,15 +274,18 @@ defmodule AeMdw.Node.Db do defp block_accounts_tree(mb_hash) do {:value, micro_block} = :aec_db.find_block(mb_hash) - header = :aec_blocks.to_header(micro_block) - {:ok, hash} = :aec_headers.hash_header(header) - consensus_mod = :aec_headers.consensus_module(header) - node = {:node, header, hash, :micro} + + node = :aec_chain_state.wrap_block(micro_block) prev_hash = :aec_block_insertion.node_prev_hash(node) {:value, trees_in, _tree, _difficulty, _fees, _fraud} = :aec_db.find_block_state_and_data(prev_hash, true) + consensus_mod = + micro_block + |> :aec_blocks.to_header() + |> :aec_headers.consensus_module() + node |> consensus_mod.state_pre_transform_micro_node(trees_in) |> :aec_trees.accounts() diff --git a/priv/migrations/20230710171001_event_logs_hash.ex b/priv/migrations/20230710171001_event_logs_hash.ex new file mode 100644 index 000000000..f61691563 --- /dev/null +++ b/priv/migrations/20230710171001_event_logs_hash.ex @@ -0,0 +1,182 @@ +defmodule AeMdw.Migrations.EventLogsHash do + # credo:disable-for-this-file + @moduledoc """ + Reindex logs to move the hash out of the key and use built-in encoding for the value part of the records. + """ + + alias AeMdw.Db.RocksDb + alias AeMdw.Collection + alias AeMdw.Db.Model + alias AeMdw.Db.WriteMutation + alias AeMdw.Db.State + + require Model + require Logger + + @range_size 200_000 + + @dialyzer :no_return + + defmodule DeleteKeysMutation do + @moduledoc false + alias AeMdw.Db.Model + alias AeMdw.Db.State + alias AeMdw.Db.RocksDb + + require Model + + @derive AeMdw.Db.Mutation + defstruct [:args] + + def new(args) do + %__MODULE__{args: args} + end + + def execute(%__MODULE__{args: [log_key, data_log_key, idx_log_key]}, state) do + txn = Map.get(state.store, :txn) + :ok = RocksDb.delete(txn, Model.ContractLog, :sext.encode(log_key)) + :ok = RocksDb.delete(txn, Model.DataContractLog, :sext.encode(data_log_key)) + :ok = RocksDb.delete(txn, Model.IdxContractLog, :sext.encode(idx_log_key)) + state + end + end + + @spec run(State.t(), boolean()) :: {:ok, non_neg_integer()} + def run(state, _from_start?) do + case State.prev(state, Model.Tx, nil) do + {:ok, db_last_txi} -> + num_ranges = div(db_last_txi, @range_size) + 1 + + only_encoding? = + match?( + {:ok, {_call_txi, _log_idx, _create_txi}}, + State.next(state, Model.ContractLog, nil) + ) + + mutations_tasks = + if only_encoding?, do: &reencoding_logs_mutations/4, else: &hash_logs_mutations/4 + + count = + 0..(num_ranges - 1) + |> Enum.map(fn i -> + first_txi = i * @range_size + last_txi = min((i + 1) * @range_size, db_last_txi) + + num_tasks = System.schedulers_online() + amount_per_task = trunc(:math.ceil((last_txi - first_txi) / num_tasks)) + + log("num_tasks: #{num_tasks}, amount_per_task: #{amount_per_task}") + log("first_txi: #{first_txi}, last_txi: #{last_txi}") + + :erlang.garbage_collect() + + range_mutations = + state + |> mutations_tasks.(first_txi, num_tasks, amount_per_task) + |> Task.await_many(60_000 * 20) + |> List.flatten() + |> Enum.uniq() + + count = length(range_mutations) + log("commiting #{count} mutations...") + + range_mutations + |> Enum.chunk_every(240_000) + |> Enum.each(fn write_mutations -> + {ts, _state} = :timer.tc(fn -> _state = State.commit(state, write_mutations) end) + IO.puts("commit: #{inspect({length(write_mutations), ts})}") + end) + + count + end) + |> Enum.sum() + + {:ok, count} + + :none -> + {:ok, 0} + end + end + + defp hash_logs_mutations(state, first_txi, num_tasks, amount_per_task) do + Enum.map(0..(num_tasks - 1), fn i -> + task_first_txi = first_txi + i * amount_per_task + task_last_txi = first_txi + (i + 1) * amount_per_task + cursor = {task_first_txi, 0, 0, <<>>} + boundary = {cursor, {task_last_txi, nil, nil, <<>>}} + + Task.async(fn -> + state + |> Collection.stream(Model.IdxContractLog, :forward, boundary, cursor) + |> Enum.flat_map(fn {call_txi, log_idx, create_txi, evt_hash} = idx_log_key -> + log_key = {create_txi, call_txi, evt_hash, log_idx} + + {:contract_log, ^log_key, ext_contract, args, data} = fetch_old!(log_key) + + m_log = + Model.contract_log( + index: {create_txi, call_txi, log_idx}, + ext_contract: ext_contract, + args: args, + data: data, + hash: evt_hash + ) + + m_data_log = Model.data_contract_log(index: {data, call_txi, create_txi, log_idx}) + m_idx_log = Model.idx_contract_log(index: {call_txi, log_idx, create_txi}) + + data_log_key = {data, call_txi, create_txi, evt_hash, log_idx} + + [ + DeleteKeysMutation.new([log_key, data_log_key, idx_log_key]), + WriteMutation.new(Model.ContractLog, m_log), + WriteMutation.new(Model.DataContractLog, m_data_log), + WriteMutation.new(Model.IdxContractLog, m_idx_log) + ] + end) + end) + end) + end + + defp reencoding_logs_mutations(state, first_txi, num_tasks, amount_per_task) do + Enum.map(0..(num_tasks - 1), fn i -> + task_first_txi = first_txi + i * amount_per_task + task_last_txi = first_txi + (i + 1) * amount_per_task + cursor = {task_first_txi, 0, 0} + boundary = {cursor, {task_last_txi, nil, nil}} + + Task.async(fn -> + state + |> Collection.stream(Model.IdxContractLog, :forward, boundary, cursor) + |> Enum.flat_map(fn {call_txi, log_idx, create_txi} -> + log_key = {create_txi, call_txi, log_idx} + # rewrite with new encoding + try do + [WriteMutation.new(Model.ContractLog, fetch_old!(log_key))] + rescue + _value -> + [] + end + end) + end) + end) + end + + @spec log(binary()) :: :ok + def log(msg) do + Logger.info(msg) + IO.puts(msg) + end + + defp fetch_old!(index) do + key = :sext.encode(index) + + {:ok, value} = RocksDb.get(Model.ContractLog, key) + record_type = Model.record(Model.ContractLog) + + value + |> :sext.decode() + |> Tuple.insert_at(0, index) + |> Tuple.insert_at(0, record_type) + end +end diff --git a/priv/migrations/20230713120000_names_nested_restructure.ex b/priv/migrations/20230713120000_names_nested_restructure.ex new file mode 100644 index 000000000..613c982ed --- /dev/null +++ b/priv/migrations/20230713120000_names_nested_restructure.ex @@ -0,0 +1,161 @@ +defmodule AeMdw.Migrations.NamesNestedRestructure do + @moduledoc """ + Re-indexes names and auctions nested references into separate tables. + """ + + alias AeMdw.Collection + alias AeMdw.Db.WriteMutation + alias AeMdw.Db.Model + alias AeMdw.Db.State + + require Model + require Record + + Record.defrecord(:auction_bid, + index: nil, + block_index_txi_idx: nil, + expire_height: nil, + owner: nil, + bids: nil + ) + + Record.defrecord(:name, + index: nil, + active: nil, + expire: nil, + claims: nil, + updates: nil, + transfers: nil, + revoke: nil, + auction_timeout: nil, + owner: nil, + previous: nil + ) + + @chunk_size 1_000 + + @spec run(State.t(), boolean()) :: {:ok, non_neg_integer()} + def run(state, _from_start?) do + count = + [Model.ActiveName, Model.InactiveName, Model.AuctionBid] + |> Enum.map(fn table -> + state + |> Collection.stream(table, nil) + |> Stream.map(&{&1, table}) + end) + |> Collection.merge(:forward) + |> Stream.transform("", fn {plain_name, source}, prev_char -> + first_char = String.at(plain_name, 0) + + if first_char != prev_char do + IO.puts("Processing names and auctions starting with #{first_char}..") + end + + {[{plain_name, source}], first_char} + end) + |> Stream.map(fn {plain_name, source} -> + {State.fetch!(state, source, plain_name), source} + end) + |> Stream.map(fn {old_record, source} -> + with {:ok, new_record, nested_mutations} <- restructure_record(old_record, source) do + {:ok, [WriteMutation.new(source, new_record) | nested_mutations]} + end + end) + |> Stream.filter(&match?({:ok, _mutations}, &1)) + |> Stream.chunk_every(@chunk_size) + |> Stream.map(fn ok_mutations -> + mutations = Enum.flat_map(ok_mutations, fn {:ok, mutations} -> mutations end) + + _new_state = State.commit(state, mutations) + + length(ok_mutations) + end) + |> Enum.sum() + + {:ok, count} + end + + defp restructure_record(nil, _source), do: {:ok, nil, []} + + defp restructure_record(auction_bid() = auction_bid, Model.AuctionBid) do + auction_bid( + index: plain_name, + block_index_txi_idx: block_index_txi_idx, + expire_height: expire_height, + owner: owner, + bids: bids + ) = auction_bid + + new_record = + Model.auction_bid( + index: plain_name, + block_index_txi_idx: block_index_txi_idx, + expire_height: expire_height, + owner: owner + ) + + nested_mutations = + Enum.map(bids, fn {_bi, bid_txi_idx} -> + WriteMutation.new( + Model.AuctionBidClaim, + Model.auction_bid_claim(index: {plain_name, expire_height, bid_txi_idx}) + ) + end) + + {:ok, new_record, nested_mutations} + end + + defp restructure_record(name() = name, source) do + name( + index: plain_name, + active: active, + expire: expire, + claims: claims, + updates: updates, + transfers: transfers, + revoke: revoke, + auction_timeout: auction_timeout, + owner: owner, + previous: previous + ) = name + + claims_mutations = + Enum.map(claims, fn {_bi, txi_idx} -> + WriteMutation.new(Model.NameClaim, Model.name_claim(index: {plain_name, active, txi_idx})) + end) + + updates_mutations = + Enum.map(updates, fn {_bi, txi_idx} -> + WriteMutation.new( + Model.NameUpdate, + Model.name_update(index: {plain_name, active, txi_idx}) + ) + end) + + transfers_mutations = + Enum.map(transfers, fn {_bi, txi_idx} -> + WriteMutation.new( + Model.NameTransfer, + Model.name_transfer(index: {plain_name, active, txi_idx}) + ) + end) + + nested_mutations = claims_mutations ++ updates_mutations ++ transfers_mutations + {:ok, new_previous, previous_mutations} = restructure_record(previous, source) + + new_record = + Model.name( + index: plain_name, + active: active, + expire: expire, + revoke: revoke, + auction_timeout: auction_timeout, + owner: owner, + previous: new_previous + ) + + {:ok, new_record, previous_mutations ++ nested_mutations} + end + + defp restructure_record(_record, _source), do: :already_restructured +end