Skip to content
This repository has been archived by the owner on Feb 24, 2025. It is now read-only.

Commit

Permalink
Obasilakis/save voter active votes to db (#581)
Browse files Browse the repository at this point in the history
* Add celo_voter_votes table

* Add voters_activated_votes_in_last_epoch function

* Add last_non_zero_voter_votes function and format

* Add voter votes to celo_pending_epoch_operations

* Add celo_voter_votes runner and fetcher

* Buffer blocks to fetch epoch rewards and voter votes for

* Add CeloVoterVotes fetcher to supervision tree

* Fix query in voters_activated_votes_in_last_epoch

* Pass array of blocks instead of single blocks

* Add hash and number to account/group pairs in CeloVoterVotes fetcher pipeline

* Lower max batch size

* Remove IO.inspects

* Make sure pending operations are falsified after votes are fetched

* Remove duplicates before fetching votes from archive node

* Reduce max_batch_size to 1 for voter votes fetcher

Also dialyze

* Make sure epoch blocks are ordered when buffered into voter votes fetcher

* Format, credo, dialyze and remove deprecated test

* Fix block fetcher tests

* Set max concurrency to 1

* Change poll_interval to 60 min for voter votes fetcher

* Stop buffering epoch blocks in the block fetcher

The block runner takes care of that

* Add PR to CHANGELOG.md

* Keep large integers consistent

* Set max concurrency and max batch size back to default

The new way of getting account/group pairs doesn't require sequential processing

* Retry unique blocks

* Get all account/group pairs with activated votes instead of just last epoch

* Remove leftover tag from test

* Avoid homonymous multis

* Set default celo_pending_epoch_operations to false

* Format and dialyze

* Fix test

* Lower max_batch_size

* Fetch and save vote only for accounts that have activated votes

* Credo

* Use event block_number since block_hash doesn't exist anymore

* Format
  • Loading branch information
obasilakis authored Mar 17, 2022
1 parent f609297 commit 3393ebf
Show file tree
Hide file tree
Showing 28 changed files with 836 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Current

### Features
- [#581](https://github.com/celo-org/blockscout/pull/581) - Save voter active votes to db

### Fixes
- [#5066](https://github.com/blockscout/blockscout/pull/5066) - Fix read contract page bug
Expand Down
27 changes: 27 additions & 0 deletions apps/explorer/lib/explorer/celo/account_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,33 @@ defmodule Explorer.Celo.AccountReader do
])
end

def active_votes(%{
block_hash: block_hash,
block_number: block_number,
group_hash: group_hash,
account_hash: account_hash
}) do
data =
call_methods([
{:election, "getActiveVotesForGroupByAccount", [to_string(group_hash), to_string(account_hash)], block_number}
])

case data["getActiveVotesForGroupByAccount"] do
{:ok, [active]} ->
{:ok,
%{
account_hash: account_hash,
active_votes: active,
block_hash: block_hash,
block_number: block_number,
group_hash: group_hash
}}

_ ->
:error
end
end

def voter_data(group_address, voter_address) do
data =
call_methods([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule Explorer.Celo.ContractEvents.Election.ValidatorGroupVoteActivatedEvent
);
"""

alias Explorer.Celo.ContractEvents.Common
alias Explorer.Chain.{Block, CeloContractEvent}
alias Explorer.Repo

use Explorer.Celo.ContractEvents.Base,
name: "ValidatorGroupVoteActivated",
topic: "0x45aac85f38083b18efe2d441a65b9c1ae177c78307cb5a5d4aec8f7dbcaeabfe"
Expand All @@ -18,4 +22,21 @@ defmodule Explorer.Celo.ContractEvents.Election.ValidatorGroupVoteActivatedEvent
event_param(:group, :address, :indexed)
event_param(:value, {:uint, 256}, :unindexed)
event_param(:units, {:uint, 256}, :unindexed)

def get_account_group_pairs_with_activated_votes(block_number) do
query =
from(
event in CeloContractEvent,
where: event.name == "ValidatorGroupVoteActivated",
where: event.block_number < ^block_number,
select: %{
account_hash: json_extract_path(event.params, ["account"]),
group_hash: json_extract_path(event.params, ["group"])
}
)

query
|> Repo.all()
|> Enum.map(&%{account_hash: Common.ca(&1.account_hash), group_hash: Common.ca(&1.group_hash)})
end
end
2 changes: 1 addition & 1 deletion apps/explorer/lib/explorer/celo/util.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ defmodule Explorer.Celo.Util do
end

def epoch_by_block_number(bn) do
div(bn, 17280)
div(bn, 17_280)
end
end
2 changes: 1 addition & 1 deletion apps/explorer/lib/explorer/celo/voter_rewards.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Explorer.Celo.VoterRewards do
]

alias Explorer.Celo.{ContractEvents, Events, Util}
alias Explorer.Chain.{Block, CeloContractEvent, CeloValidatorGroupVotes, Wei}
alias Explorer.Chain.{Block, CeloContractEvent}
alias Explorer.Repo

alias ContractEvents.{Election, EventMap}
Expand Down
2 changes: 1 addition & 1 deletion apps/explorer/lib/explorer/celo/voter_rewards_for_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ defmodule Explorer.Celo.VoterRewardsForGroup do

def amount_activated_or_revoked_last_day(voter_activated_or_revoked, block_number) do
voter_activated_or_revoked
|> Enum.filter(&(&1.block_number < block_number && &1.block_number >= block_number - 17280))
|> Enum.filter(&(&1.block_number < block_number && &1.block_number >= block_number - 17_280))
|> Enum.reduce(0, fn x, acc ->
if x.event == @validator_group_vote_activated do
acc + x.amount_activated_or_revoked
Expand Down
18 changes: 18 additions & 0 deletions apps/explorer/lib/explorer/chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2863,6 +2863,24 @@ defmodule Explorer.Chain do
Repo.stream_reduce(query, initial, reducer)
end

@spec stream_blocks_with_unfetched_voter_votes(
initial :: accumulator,
reducer :: (entry :: term(), accumulator -> accumulator)
) :: {:ok, accumulator}
when accumulator: term()
def stream_blocks_with_unfetched_voter_votes(initial, reducer) when is_function(reducer, 2) do
query =
from(
b in Block,
join: celo_pending_ops in assoc(b, :celo_pending_epoch_operations),
where: celo_pending_ops.fetch_voter_votes,
select: %{block_number: b.number, block_hash: b.hash},
order_by: [asc: b.number]
)

Repo.stream_reduce(query, initial, reducer)
end

def remove_nonconsensus_blocks_from_pending_ops(block_hashes) do
query =
from(
Expand Down
41 changes: 23 additions & 18 deletions apps/explorer/lib/explorer/chain/celo_pending_epoch_operation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,24 @@ defmodule Explorer.Chain.CeloPendingEpochOperation do
alias Explorer.Chain.{Block, Hash}
alias Explorer.Repo

@required_attrs ~w(block_hash fetch_epoch_rewards)a
@required_attrs ~w(block_hash fetch_epoch_rewards fetch_validator_group_data fetch_voter_votes)a

@typedoc """
* `block_hash` - the hash of the epoch block that has pending operations.
* `fetch_epoch_rewards` - if the epoch rewards should be fetched (or not)
* `fetch_voter_votes` - if the voter votes should be fetched (or not)
"""
@type t :: %__MODULE__{
block_hash: Hash.Full.t(),
fetch_epoch_rewards: boolean()
fetch_epoch_rewards: boolean(),
fetch_voter_votes: boolean()
}

@primary_key false
schema "celo_pending_epoch_operations" do
field(:fetch_epoch_rewards, :boolean)
field(:fetch_validator_group_data, :boolean)
field(:fetch_voter_votes, :boolean)

timestamps()

Expand All @@ -43,6 +46,7 @@ defmodule Explorer.Chain.CeloPendingEpochOperation do
update: [
set: [
fetch_epoch_rewards: celo_epoch_pending_ops.fetch_epoch_rewards or fragment("EXCLUDED.fetch_epoch_rewards"),
fetch_voter_votes: celo_epoch_pending_ops.fetch_voter_votes or fragment("EXCLUDED.fetch_voter_votes"),
# Don't update `block_hash` as it is used for the conflict target
inserted_at: celo_epoch_pending_ops.inserted_at,
updated_at: fragment("EXCLUDED.updated_at")
Expand All @@ -52,27 +56,28 @@ defmodule Explorer.Chain.CeloPendingEpochOperation do
)
end

@spec falsify_or_delete_celo_pending_epoch_operation(
@spec falsify_celo_pending_epoch_operation(
Hash.Full.t(),
:fetch_epoch_rewards | :fetch_validator_group_data
:fetch_epoch_rewards | :fetch_validator_group_data | :fetch_voter_votes
) :: __MODULE__.t()
def falsify_or_delete_celo_pending_epoch_operation(block_hash, operation_type) do
def falsify_celo_pending_epoch_operation(block_hash, operation_type) do
celo_pending_operation = Repo.get(__MODULE__, block_hash)

new_celo_pending_operation = Map.put(celo_pending_operation, operation_type, false)

%{fetch_epoch_rewards: new_fetch_epoch_rewards, fetch_validator_group_data: new_fetch_validator_group_data} =
new_celo_pending_operation
%{
fetch_epoch_rewards: new_fetch_epoch_rewards,
fetch_validator_group_data: new_fetch_validator_group_data,
fetch_voter_votes: new_fetch_voter_votes
} = new_celo_pending_operation

if new_fetch_epoch_rewards || new_fetch_validator_group_data == true do
celo_pending_operation
|> changeset(%{
block_hash: block_hash,
fetch_epoch_rewards: new_fetch_epoch_rewards,
fetch_validator_group_data: new_fetch_validator_group_data
})
|> Repo.update()
else
Repo.delete(celo_pending_operation)
end
celo_pending_operation
|> changeset(%{
block_hash: block_hash,
fetch_epoch_rewards: new_fetch_epoch_rewards,
fetch_validator_group_data: new_fetch_validator_group_data,
fetch_voter_votes: new_fetch_voter_votes
})
|> Repo.update()
end
end
76 changes: 76 additions & 0 deletions apps/explorer/lib/explorer/chain/celo_voter_votes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Explorer.Chain.CeloVoterVotes do
@moduledoc """
Tracks individual active votes for every voter on each epoch.
"""

use Explorer.Schema

alias Explorer.Chain.{Block, Hash, Wei}
alias Explorer.Repo

@required_attrs ~w(account_hash block_hash block_number active_votes group_hash)a

@typedoc """
* `account_hash` - the hash of the voter's account.
* `active_votes` - the number of the voter's active votes for a specific group.
* `block_hash` - the hash of the block.
* `block_number` - the number of the block.
* `group_hash` - the hash of the group.
"""
@type t :: %__MODULE__{
account_hash: Hash.Address.t(),
active_votes: Wei.t(),
block_hash: Hash.Full.t(),
block_number: integer,
group_hash: Hash.Address.t()
}

@primary_key false
schema "celo_voter_votes" do
field(:active_votes, Wei)
field(:block_number, :integer)
field(:group_hash, Hash.Address)

timestamps()

belongs_to(:block, Block, foreign_key: :block_hash, primary_key: true, references: :hash, type: Hash.Full)

belongs_to(:addresses, Explorer.Chain.Address,
foreign_key: :account_hash,
references: :hash,
type: Hash.Address
)
end

def changeset(%__MODULE__{} = celo_voter_votes, attrs) do
celo_voter_votes
|> cast(attrs, @required_attrs)
|> validate_required(@required_attrs)
|> foreign_key_constraint(:block_hash)
|> foreign_key_constraint(:group_hash)
|> foreign_key_constraint(:account_hash)
|> unique_constraint(
[:account_hash, :block_hash, :group_hash],
name: :celo_voter_votes_account_hash_block_hash_group_hash_index
)
end

def previous_epoch_non_zero_voter_votes(epoch_block_number) do
zero_votes = %Explorer.Chain.Wei{value: Decimal.new(0)}
previous_epoch_block_number = epoch_block_number - 17_280

query =
from(
votes in __MODULE__,
where: votes.block_number == ^previous_epoch_block_number,
where: votes.active_votes != ^zero_votes,
select: %{
account_hash: votes.account_hash,
group_hash: votes.group_hash
}
)

query
|> Repo.all()
end
end
4 changes: 2 additions & 2 deletions apps/explorer/lib/explorer/chain/import/runner/blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ defmodule Explorer.Chain.Import.Runner.Blocks do
|> Enum.map(& &1.number)
|> MapSet.new()
|> MapSet.to_list()
|> Enum.filter(&(rem(&1, 17280) == 0))
|> Enum.filter(&(rem(&1, 17_280) == 0))
|> Enum.map(&Enum.find(changes_list, fn block -> block.number == &1 end))
|> Enum.map(&%{block_hash: &1.hash, fetch_epoch_rewards: true})
|> Enum.map(&%{block_hash: &1.hash, fetch_epoch_rewards: true, fetch_voter_votes: true})

Import.insert_changes_list(
repo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ defmodule Explorer.Chain.Import.Runner.CeloEpochRewards do
end)

multi_chain
|> Multi.run(:delete_celo_pending, fn _, _ ->
|> Multi.run(:falsify_fetch_epoch_rewards, fn _, _ ->
changes_list
|> Enum.each(fn reward ->
CeloPendingEpochOperation.falsify_or_delete_celo_pending_epoch_operation(
CeloPendingEpochOperation.falsify_celo_pending_epoch_operation(
reward.block_hash,
:fetch_epoch_rewards
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ defmodule Explorer.Chain.Import.Runner.CeloValidatorGroupVotes do
end)

multi_chain
|> Multi.run(:delete_celo_pending, fn _, _ ->
|> Multi.run(:falsify_fetch_validator_group_data, fn _, _ ->
changes =
changes_list
|> Enum.each(fn reward ->
CeloPendingEpochOperation.falsify_or_delete_celo_pending_epoch_operation(
CeloPendingEpochOperation.falsify_celo_pending_epoch_operation(
reward.block_hash,
:fetch_validator_group_data
)
Expand Down
Loading

0 comments on commit 3393ebf

Please sign in to comment.