Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Bus integration #259

Merged
merged 39 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ff32ee1
Initial commit
hweawer Aug 27, 2024
ab00045
Change pauser parsing
hweawer Aug 27, 2024
da2cfe6
Merge branch 'develop' into data-bus-integration
hweawer Aug 27, 2024
937dcc3
Fix import
hweawer Aug 27, 2024
c69e7de
Fix import
hweawer Aug 27, 2024
4de6787
New mark for manual testing
hweawer Aug 27, 2024
f5655e4
small fixes
hweawer Aug 28, 2024
3615e8f
Support multiple topics
hweawer Aug 28, 2024
5e1276a
Splitting the signature, fixing the schema
hweawer Aug 28, 2024
5bfc39b
Fix all the type conversions in schema
hweawer Aug 28, 2024
566a4aa
V3 pause
hweawer Aug 28, 2024
90155aa
Deq
hweawer Aug 29, 2024
9b60f2b
Message types
hweawer Aug 29, 2024
55c4a55
Fix a bug with multi-topic parsing
hweawer Aug 29, 2024
bb4dd3e
Test with mocks
hweawer Aug 29, 2024
1114ddb
Fix the ordering
hweawer Aug 30, 2024
f525f67
Remove PingMessageDataBusSchema
hweawer Aug 30, 2024
4a9ff6f
Partially fix comments
hweawer Aug 30, 2024
3b9d862
Fixes
hweawer Sep 2, 2024
7e59516
Remove field
hweawer Sep 2, 2024
650a08b
Signature split
hweawer Sep 2, 2024
4731925
Buffer in the abstract class
hweawer Sep 2, 2024
8db867a
Fix fake
hweawer Sep 2, 2024
c6cb01b
Change signature parsing
hweawer Sep 3, 2024
2d86a87
Integration test
hweawer Sep 3, 2024
54ba420
Setup integration tests
hweawer Sep 3, 2024
f9796fc
Fix a bug with looping on a single block
hweawer Sep 4, 2024
9bf3920
Fix sender
hweawer Sep 4, 2024
9341074
Rework buffer for providers
hweawer Sep 5, 2024
dcce2f9
Comment fixes
hweawer Sep 5, 2024
2d4a2f4
Fix comments
hweawer Sep 6, 2024
ef2d312
Change data bus parsing
hweawer Sep 16, 2024
ed1449b
Fix types in signatures
hweawer Sep 16, 2024
2883936
Upgrade curl version
hweawer Sep 23, 2024
32d107a
Update src/transport/msg_providers/onchain_transport.py
hweawer Sep 30, 2024
ed60189
Update src/transport/msg_providers/onchain_transport.py
hweawer Sep 30, 2024
114b884
Update src/transport/msg_providers/onchain_transport.py
hweawer Sep 30, 2024
306e3c5
Fix comments
hweawer Sep 30, 2024
0d06f6b
Rewrite
hweawer Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ LIDO_LOCATOR=0x1eDf09b5023DC86737b59dE68a8130De878984f5
# Görli: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b
DEPOSIT_CONTRACT=0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b

# rabbit / kafka / rabbit,kafka
# rabbit / onchain_transport
MESSAGE_TRANSPORTS=rabbit

# rabbit secrets
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ jobs:
MELLOW_CONTRACT_ADDRESS: "0x182Cb3A76B0EFaCb25255F9594B5807460882fa4"
ANVIL_PATH: ""

- name: Integration tests with pytest, chiado testnet
run: |
poetry run pytest tests -m integration_chiado
env:
WEB3_RPC_ENDPOINTS: "https://gnosis-chiado-rpc.publicnode.com"
ONCHAIN_TRANSPORT_RPC_ENDPOINTS: "https://gnosis-chiado-rpc.publicnode.com"
ONCHAIN_TRANSPORT_ADDRESS: "0x42E1DEfC18388E3AA1fCADa851499A11405cf37f"
DEPOSIT_CONTRACT: "0x4242424242424242424242424242424242424242"
LIDO_LOCATOR: "0x28FAB2059C713A7F9D8c86Db49f9bb0e96Af1ef8"
MELLOW_CONTRACT_ADDRESS: "0x182Cb3A76B0EFaCb25255F9594B5807460882fa4"
ANVIL_PATH: ""

- name: Integration tests with pytest, mainnet fork
if: success() || failure()
run: |
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends -qq \
gcc=4:12.2.0-3 \
libffi-dev=3.4.4-1 \
g++=4:12.2.0-3 \
curl=7.88.1-10+deb12u6 \
curl=7.88.1-10+deb12u7 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

Expand Down
33 changes: 15 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

## Description

Depositor and pauser bots are parts of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability).
Depositor and pauser bots are parts
of [Deposit Security Module](https://github.com/lidofinance/lido-improvement-proposals/blob/develop/LIPS/lip-5.md#mitigations-for-deposit-front-running-vulnerability).

**The Depositor Bot** obtains signed deposit messages from Council Daemons.
Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking module.
**The Depositor Bot** obtains signed deposit messages from Council Daemons.
Once a sufficient number of messages is collected to constitute a quorum, the bot proceeds to initiate a deposit into the designated staking
module.
This deposit is executed using the depositBufferedEther function within the "DepositSecurityModule" smart contract.

Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue.
Direct deposit is a mechanism that allows depositors to use side vault facilities for deposits. This process transfers ETH from the vault
and facilitates the deposit to specified in side vault staking module, preventing funds from being stuck in the withdrawal queue.

**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects stealing.
**The Pauser Bot** obtains pause message from Council Daemon and enacts pause deposits on protocol. Pause can occurs when Lido detects
stealing.

**The Unvetting Bot** obtains unvet message from Council Daemon and enacts unvet on the specified node operator.
Unvetting is the proces of decreasing approved depositable signing keys.
Expand All @@ -19,13 +23,13 @@ Unvetting is the proces of decreasing approved depositable signing keys.

- [Running Daemon](#running-daemon)
- [Variables](#variables)
- [Required variables](#required-variables)
- [Additional variables](#additional-variables)
- [Required variables](#required-variables)
- [Additional variables](#additional-variables)
- [Metrics and logs](#metrics-and-logs)
- [Development](#development)
- [Install](#install)
- [Tests](#tests)
- [Release flow](#release-flow)
- [Install](#install)
- [Tests](#tests)
- [Release flow](#release-flow)
- [Annotations to code](#annotations-to-code)

## Running Daemon
Expand Down Expand Up @@ -54,17 +58,10 @@ Unvetting is the proces of decreasing approved depositable signing keys.
| DEPOSIT_CONTRACT | 0x00000000219ab540356cBB839Cbe05303d7705Fa | Ethereum deposit contract address |
| DEPOSIT_MODULES_WHITELIST | 1 | List of staking module's ids in which the depositor bot will make deposits |
| --- | --- | --- |
| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/kafka |
| MESSAGE_TRANSPORTS | - | Transports used in bot. One of/or both: rabbit/onchain_transport |
| RABBIT_MQ_URL | - | RabbitMQ url |
| RABBIT_MQ_USERNAME | - | RabbitMQ username for virtualhost |
| RABBIT_MQ_PASSWORD | - | RabbitMQ password for virtualhost |
| --- | --- _kafka is not used at the moment_ --- | --- |
| KAFKA_BROKER_ADDRESS_1 | - | Kafka servers url and port |
| KAFKA_USERNAME | - | Kafka username |
| KAFKA_PASSWORD | - | Password for kafka |
| KAFKA_NETWORK | - | Network type (mainnet or goerli) |
| KAFKA_TOPIC | - | Kafka topic name (for msg receiving) |
| KAFKA_GROUP_PREFIX | - | Just for staging (staging-) |

### Additional variables

Expand Down
45 changes: 45 additions & 0 deletions interfaces/DataBusContract.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[
{
"anonymous": true,
"inputs": [
{
"indexed": true,
"internalType": "bytes32",
"name": "eventId",
"type": "bytes32"
},
{
"indexed": true,
"internalType": "address",
"name": "sender",
"type": "address"
},
{
"indexed": false,
"internalType": "bytes",
"name": "data",
"type": "bytes"
}
],
"name": "Message",
"type": "event"
},
{
"inputs": [
{
"internalType": "bytes32",
"name": "_eventId",
"type": "bytes32"
},
{
"internalType": "bytes",
"name": "_data",
"type": "bytes"
}
],
"name": "sendMessage",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
}
]
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ pythonpath = [
"tests",
]
markers = [
"unit", # offline
"integration", # hardhat fork
"integration_holesky" # holesky fork
"unit", # offline
"integration", # hardhat fork
"integration_holesky", # holesky fork
"integration_chiado", # chiado(gnosis testnet) run only
]

[tool.ruff]
Expand Down
18 changes: 18 additions & 0 deletions src/blockchain/contracts/data_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from blockchain.contracts.base_interface import ContractInterface
from web3.contract.contract import ContractFunction

logger = logging.getLogger(__name__)


class DataBusContract(ContractInterface):
abi_path = './interfaces/DataBusContract.json'

def send_message(self, event_id: bytes, mes: bytes) -> ContractFunction:
"""
Build send message transaction to Data Bus contract
"""
tx = self.functions.send_message(event_id, mes)
logger.info({'msg': f'Build `send_message({event_id.hex()}, {mes.hex()})` tx.'})
return tx
17 changes: 8 additions & 9 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
)
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import DepositParser, OnchainTransportProvider, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
from transport.msg_types.deposit import DepositMessage, DepositMessageSchema
from transport.msg_types.ping import PingMessageSchema, to_check_sum_address
from transport.types import TransportType
from web3.types import BlockData
from web3_multi_provider import FallbackProvider

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,17 +80,18 @@ def __init__(
if TransportType.RABBIT in variables.MESSAGE_TRANSPORTS:
transports.append(
RabbitProvider(
client='depositor',
routing_keys=[MessageType.PING, MessageType.DEPOSIT],
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}deposit',
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
)
)

Expand Down Expand Up @@ -123,10 +125,7 @@ def execute(self, block: BlockData) -> bool:

return True

def _is_mellow_depositable(
self,
module_id: int
) -> bool:
def _is_mellow_depositable(self, module_id: int) -> bool:
if not variables.MELLOW_CONTRACT_ADDRESS:
return False
try:
Expand Down
12 changes: 7 additions & 5 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PauseV2Parser, PauseV3Parser, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
from transport.msg_types.pause import PauseMessage, PauseMessageSchema
from transport.msg_types.ping import PingMessageSchema, to_check_sum_address
from transport.types import TransportType
from web3.types import BlockData
from web3_multi_provider import FallbackProvider

logger = logging.getLogger(__name__)

Expand All @@ -42,17 +43,18 @@ def __init__(self, w3: Web3):
if TransportType.RABBIT in variables.MESSAGE_TRANSPORTS:
transports.append(
RabbitProvider(
client='pauser',
routing_keys=[MessageType.PING, MessageType.PAUSE],
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}pause',
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)),
parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser],
)
)

Expand Down
12 changes: 7 additions & 5 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.kafka import KafkaMessageProvider
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PingParser, UnvetParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
from transport.msg_storage import MessageStorage
from transport.msg_types.common import get_messages_sign_filter
Expand All @@ -16,6 +16,7 @@
from transport.types import TransportType
from utils.bytes import from_hex_string_to_bytes
from web3.types import BlockData
from web3_multi_provider import FallbackProvider

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,17 +48,18 @@ def prepare_transport_bus(self):
if TransportType.RABBIT in variables.MESSAGE_TRANSPORTS:
transports.append(
RabbitProvider(
client='unvetter',
routing_keys=[MessageType.UNVET, MessageType.PING],
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
)
)

if TransportType.KAFKA in variables.MESSAGE_TRANSPORTS:
if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS:
transports.append(
KafkaMessageProvider(
client=f'{variables.KAFKA_GROUP_PREFIX}unvet',
OnchainTransportProvider(
w3=Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)),
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)),
parsers_providers=[UnvetParser, PingParser],
)
)

Expand Down
31 changes: 12 additions & 19 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
import logging
from typing import Any, List, Optional
from typing import Any

from schema import Schema, SchemaError

Expand All @@ -12,31 +12,24 @@ class BaseMessageProvider(abc.ABC):

MAX_MESSAGES_RECEIVE = 1000

def __init__(self, client: str, message_schema: Schema):
def __init__(self, message_schema: Schema):
self.message_schema = message_schema
self.client = client

def get_messages(self) -> List[dict]:
messages = []
def get_messages(self) -> list[dict]:
"""
Fetches new messages, processes them, and filters out only the valid ones.

for _ in range(self.MAX_MESSAGES_RECEIVE):
msg = self._receive_message()

if msg is None:
break

value = self._process_msg(msg)

if value and self._is_valid(value):
messages.append(value)

return messages
Returns:
List[Dict]: A list of processed and valid messages.
"""
processed = [self._process_msg(m) for m in self._fetch_messages()]
return [msg for msg in processed if msg and self._is_valid(msg)]

@abc.abstractmethod
def _receive_message(self) -> Any:
def _fetch_messages(self) -> list:
raise NotImplementedError('Receive message from transport.')

def _process_msg(self, msg: Any) -> Optional[dict]:
def _process_msg(self, msg: Any) -> dict | None:
# Overwrite this method to add msg serialization.
# Return None if message is not serializable
return msg
Expand Down
Loading
Loading