Skip to content

Commit

Permalink
Merge pull request #1 from rated-network/fix/minor-stuff
Browse files Browse the repository at this point in the history
Fix/minor stuff
  • Loading branch information
ariskk authored Mar 4, 2022
2 parents df868d3 + 875d8fe commit 469faa9
Show file tree
Hide file tree
Showing 19 changed files with 427 additions and 154 deletions.
3 changes: 3 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
max-line-length = 99
ignore = E731, W503, E203
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI

on:
pull_request:
branches:
- main

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up py
uses: actions/setup-python@v1
with:
python-version: 3.9

- name: Install dependencies
working-directory: ./
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Lint
working-directory: ./
run: |
black --check ./
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
*.sqlite
*.csv
venv
__pycache__/
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ installed.
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
pip install -r requirements-dev.txt
```

### k-NN Classifier
Expand Down Expand Up @@ -80,7 +81,18 @@ testdata_proc
└── 0x7fedb0da9699c93ce66966555c6719e1159ae7b3220c7053a08c8f50e2f3f56f.json
```

You can then use this directory as the datadir argument to `./knn_classifier.py`.
You can then use this directory as the datadir argument to `./knn_classifier.py`:

```
./knn_classifier.py testdata_proc --classify testdata
```

If you then want to use the classifier to build an sqlite database:

```
./build_db.py --db-path block_db.sqlite --classify-dir testdata --data-dir testdata_proc
```


### Running the API server

Expand Down
2 changes: 2 additions & 0 deletions api_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import requests


def post_block_rewards(url, block_rewards):
res = requests.post(f"{url}/classify", json=block_rewards)
res.raise_for_status()


def get_sync_gaps(url):
res = requests.get(f"{url}/sync/gaps")
res.raise_for_status()
Expand Down
62 changes: 45 additions & 17 deletions api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
import falcon

from multi_classifier import MultiClassifier
from build_db import open_block_db, get_blocks_per_client, get_sync_status, get_sync_gaps, \
update_block_db, get_validator_blocks, get_blocks
from build_db import (
open_block_db,
get_blocks_per_client,
get_sync_status,
get_sync_gaps,
update_block_db,
get_validator_blocks,
get_blocks,
)

DATA_DIR = "./data/mainnet/training"
BLOCK_DB = "./block_db.sqlite"
BN_URL = "http://localhost:5052"
SELF_URL = "http://localhost:8000"


class Classify:
def __init__(self, classifier, block_db):
self.classifier = classifier
Expand All @@ -23,21 +31,24 @@ def on_post(self, req, resp):
resp.code = falcon.HTTP_400
return

results = []

# Check required fields
for block_reward in block_rewards:
if ("block_root" not in block_reward or
"attestation_rewards" not in block_reward or
"per_attestation_rewards" not in block_reward["attestation_rewards"]):
resp.text = json.dumps({"error": "input JSON is not a block reward"})
resp.code = falcon.HTTP_400
return
if (
"block_root" not in block_reward
or "attestation_rewards" not in block_reward
or "per_attestation_rewards" not in block_reward["attestation_rewards"]
):
resp.text = json.dumps({"error": "input JSON is not a block reward"})
resp.code = falcon.HTTP_400
return

update_block_db(self.block_db, self.classifier, block_rewards)
print(f"Processed {len(block_rewards)} block{'' if block_rewards == [] else 's'}")
print(
f"Processed {len(block_rewards)} block{'' if block_rewards == [] else 's'}"
)
resp.text = "OK"


class BlocksPerClient:
def __init__(self, block_db):
self.block_db = block_db
Expand All @@ -50,6 +61,7 @@ def on_get(self, req, resp, start_epoch, end_epoch=None):
blocks_per_client = get_blocks_per_client(self.block_db, start_slot, end_slot)
resp.text = json.dumps(blocks_per_client, ensure_ascii=False)


class SyncStatus:
def __init__(self, block_db):
self.block_db = block_db
Expand All @@ -58,6 +70,7 @@ def on_get(self, req, resp):
sync_status = get_sync_status(self.block_db)
resp.text = json.dumps(sync_status, ensure_ascii=False)


class SyncGaps:
def __init__(self, block_db):
self.block_db = block_db
Expand All @@ -66,14 +79,18 @@ def on_get(self, req, resp):
gaps = get_sync_gaps(self.block_db)
resp.text = json.dumps(gaps, ensure_ascii=False)


class ValidatorBlocks:
def __init__(self, block_db):
self.block_db = block_db

def on_get(self, req, resp, validator_index, since_slot=None):
validator_blocks = get_validator_blocks(self.block_db, validator_index, since_slot)
validator_blocks = get_validator_blocks(
self.block_db, validator_index, since_slot
)
resp.text = json.dumps(validator_blocks, ensure_ascii=False)


class MultipleValidatorsBlocks:
def __init__(self, block_db):
self.block_db = block_db
Expand All @@ -88,18 +105,23 @@ def on_post(self, req, resp, since_slot=None):
return

# I love type checking.
if type(validator_indices) != list or any(type(x) != int for x in validator_indices):
resp.text = json.dumps({"error": f"request must be a list of integers"})
if type(validator_indices) != list or any(
type(x) != int for x in validator_indices
):
resp.text = json.dumps({"error": "request must be a list of integers"})
resp.code = falcon.HTTP_400
return

all_blocks = {}
for validator_index in validator_indices:
validator_blocks = get_validator_blocks(self.block_db, validator_index, since_slot)
validator_blocks = get_validator_blocks(
self.block_db, validator_index, since_slot
)
all_blocks[validator_index] = validator_blocks

resp.text = json.dumps(all_blocks, ensure_ascii=False)


class Blocks:
def __init__(self, block_db):
self.block_db = block_db
Expand All @@ -108,6 +130,7 @@ def on_get(self, req, resp, start_slot, end_slot=None):
blocks = get_blocks(self.block_db, start_slot, end_slot)
resp.text = json.dumps(blocks, ensure_ascii=False)


app = application = falcon.App()

print("Initialising classifier, this could take a moment...")
Expand All @@ -117,10 +140,15 @@ def on_get(self, req, resp, start_slot, end_slot=None):
block_db = open_block_db(BLOCK_DB)

app.add_route("/classify", Classify(classifier, block_db))
app.add_route("/blocks_per_client/{start_epoch:int}/{end_epoch:int}", BlocksPerClient(block_db))
app.add_route(
"/blocks_per_client/{start_epoch:int}/{end_epoch:int}", BlocksPerClient(block_db)
)
app.add_route("/blocks_per_client/{start_epoch:int}", BlocksPerClient(block_db))
app.add_route("/validator/{validator_index:int}/blocks", ValidatorBlocks(block_db))
app.add_route("/validator/{validator_index:int}/blocks/{since_slot:int}", ValidatorBlocks(block_db))
app.add_route(
"/validator/{validator_index:int}/blocks/{since_slot:int}",
ValidatorBlocks(block_db),
)
app.add_route("/validator/blocks", MultipleValidatorsBlocks(block_db))
app.add_route("/validator/blocks/{since_slot:int}", MultipleValidatorsBlocks(block_db))
app.add_route("/blocks/{start_slot:int}", Blocks(block_db))
Expand Down
15 changes: 12 additions & 3 deletions background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
BLOCKPRINT_URL = "http://localhost:8000"

EVENT_URL_PATH = "eth/v1/events?topics=block_reward"
HEADERS = { "Accept": "text/event-stream" }
HEADERS = {"Accept": "text/event-stream"}

BACKFILL_WAIT_SECONDS = 60
FAIL_WAIT_SECONDS = 5


class BlockRewardListener:
def __init__(self, bn_url, blockprint_url):
self.bn_url = bn_url
Expand All @@ -43,14 +44,18 @@ def run(self):
print(f"Block listener failed with: {e}")
time.sleep(FAIL_WAIT_SECONDS)


def explode_gap(start_slot, end_slot, sprp):
next_boundary = (start_slot // sprp + 1) * sprp

if end_slot > next_boundary:
return [(start_slot, next_boundary)] + explode_gap(next_boundary + 1, end_slot, sprp)
return [(start_slot, next_boundary)] + explode_gap(
next_boundary + 1, end_slot, sprp
)
else:
return [(start_slot, end_slot)]


def explode_gaps(gaps, sprp=2048):
"Divide sync gaps into manageable chunks aligned to Lighthouse's restore points"
exploded = []
Expand All @@ -62,6 +67,7 @@ def explode_gaps(gaps, sprp=2048):

return exploded


class Backfiller:
def __init__(self, bn_url, blockprint_url):
self.bn_url = bn_url
Expand All @@ -75,7 +81,9 @@ def run(self):

for (start_slot, end_slot) in chunks:
print(f"Downloading backfill blocks {start_slot}..={end_slot}")
block_rewards = download_block_rewards(start_slot, end_slot, beacon_node=self.bn_url)
block_rewards = download_block_rewards(
start_slot, end_slot, beacon_node=self.bn_url
)

print(f"Classifying backfill blocks {start_slot}..={end_slot}")
post_block_rewards(self.blockprint_url, block_rewards)
Expand All @@ -88,6 +96,7 @@ def run(self):
print(f"Backfiller failed with: {e}")
time.sleep(FAIL_WAIT_SECONDS)


if __name__ == "__main__":
listener_task = lambda: BlockRewardListener(BN_URL, BLOCKPRINT_URL).run()
multiprocessing.Process(target=listener_task, name="block_listener").start()
Expand Down
Loading

0 comments on commit 469faa9

Please sign in to comment.