Skip to content

Commit

Permalink
Merge pull request #313 from ethstorage/integratiointest
Browse files Browse the repository at this point in the history
Create action to run Integration test regularly
ping-ke authored Sep 30, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 85774d3 + 8d709fa commit 396dd13
Showing 8 changed files with 583 additions and 5 deletions.
65 changes: 65 additions & 0 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Integration Test
on:
schedule:
- cron: '1 8 * * 0,2,4'

jobs:
es-node-integration-test:
runs-on: self-hosted
timeout-minutes: 2880

steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'

- name: Use Node.js
uses: actions/setup-node@v3
with:
node-version: '20.x'

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1
with:
version: nightly

- name: Deploy Contracts
run: |
cd ../storage-contracts-v1
git checkout main
git pull
npm install
git submodule init && git submodule update
npx hardhat run scripts/deployL2-it.js --network qkc_testnet >> deploy.log
echo ES_NODE_CONTRACT_ADDRESS=`cat .caddr` >> "$GITHUB_ENV"
- name: Build and Run Bootnode Node
run: |
cd ../es-node
make
./run-l2-it-rpc.sh > es-node-it-bootnode.log&
- name: Set ENV Parameters
run: |
echo ES_NODE_UPLOADER_PRIVATE_KEY=`cat ../uploader.key` >> "$GITHUB_ENV"
echo ES_NODE_SIGNER_PRIVATE_KEY=`cat ../private.key` >> "$GITHUB_ENV"
echo ES_NODE_STORAGE_MINER=0x5C935469C5592Aeeac3372e922d9bCEabDF8830d >> "$GITHUB_ENV"
- name: Upload Blobs
run: |
cd ./integration_tests/scripts
npm install --force
node ituploader.js 10800 true > upload.log
cp .data ../../cmd/integration-test-server/.data
- name: Test
run: |
./run-l2-it.sh > es-node-it.log&
cd ./integration_tests/scripts
node ituploader.js 12288 false > upload2.log&
cd ../../cmd/integration-test-server
go build
./integration-test-server --contract_addr $ES_NODE_CONTRACT_ADDRESS > itserver.log
317 changes: 317 additions & 0 deletions cmd/integration-test-server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
// Copyright 2022-2023, EthStorage.
// For license information, see https://github.com/ethstorage/es-node/blob/main/LICENSE

package main

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"math"
"net/http"
"os"
"time"

"github.com/crate-crypto/go-proto-danksharding-crypto/eth"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethstorage/go-ethstorage/cmd/es-utils/utils"
es "github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/node"
prv "github.com/ethstorage/go-ethstorage/ethstorage/prover"
)

const (
expectedSaidHelloTime = 10 * time.Minute
expectedStateRefreshTime = 5 * time.Minute
executionTime = 2 * time.Hour

kvEntries = 8192
kvSize = 32 * 4096
dataSize = 31 * 4096

rpcEndpoint = "http://127.0.0.1:9595"
uploadedDataFile = ".data"
shardFile0 = "../../es-data-it/shard-0.dat"
shardFile1 = "../../es-data-it/shard-1.dat"
)

var (
portFlag = flag.Int("port", 9096, "Listener port for the es-node to report node status")
contractAddr = flag.String("contract_addr", "", "EthStorage contract address")
)

var (
errorMessages = make([]string, 0)
lastQueryTime = time.Now()
lastRecord *node.NodeState
hasConnectedPeer = false
testLog = log.New("IntegrationTest")
prover = prv.NewKZGProver(testLog)
contractAddress = common.Address{}
)

func HelloHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
addErrorMessage(fmt.Sprintf("Read Hello body failed with error %s", err.Error()))
return
}
log.Info("Get hello from node", "id", string(body))

if time.Since(lastQueryTime) > expectedSaidHelloTime {
addErrorMessage(fmt.Sprintf("Get Hello message later then expect time %v real value %v", expectedSaidHelloTime, time.Since(lastQueryTime)))
}
lastQueryTime = time.Now()
log.Info("Get Hello request from es-node", "id", string(body))

answer := `{"status":"ok"}`
w.Write([]byte(answer))
}

func ReportStateHandler(w http.ResponseWriter, r *http.Request) {
if time.Since(lastQueryTime) > expectedStateRefreshTime*2 {
addErrorMessage(fmt.Sprintf("Get Hello message later then expect time %v real value %v",
expectedStateRefreshTime*2, time.Since(lastQueryTime)))
}
lastQueryTime = time.Now()

body, err := io.ReadAll(r.Body)
if err != nil {
addErrorMessage(fmt.Sprintf("Read ReportState body failed with error %s", err.Error()))
return
}

state := &node.NodeState{}
err = json.Unmarshal(body, state)
if err != nil {
addErrorMessage(fmt.Sprintf("Parse node state failed with error %s", err.Error()))
w.Write([]byte(fmt.Sprintf(`{"status":"error", "err message":"%s"}`, err.Error())))
return
}

// If no state updated
log.Info("Get state from peer", "peer id", state.Id, "Version", state.Version)
if lastRecord != nil {
checkState(lastRecord, state)
}

lastRecord = state

w.Write([]byte(`{"status":"ok"}`))
}

func checkState(oldState, newState *node.NodeState) {
if len(oldState.Shards) != len(newState.Shards) {
addErrorMessage(fmt.Sprintf("shards count mismatch between two state, new %d, old %d", len(newState.Shards), len(oldState.Shards)))
return
}

for _, shardState := range newState.Shards {
check := false
for _, oldShardState := range oldState.Shards {
if shardState.ShardId != oldShardState.ShardId {
continue
}
check = true
if shardState.SyncState.PeerCount > 0 {
hasConnectedPeer = true
}

if oldShardState.SyncState.SyncProgress < 10000 &&
(shardState.SyncState.BlobsSynced <= oldShardState.SyncState.BlobsSynced ||
shardState.SyncState.SyncProgress <= oldShardState.SyncState.SyncProgress) {
addErrorMessage(fmt.Sprintf("es-node sync progress do not increase in %f minutes, "+
"old synced: %d, new synced %d; old progress: %d, new progress: %d", expectedStateRefreshTime.Minutes(), oldShardState.SyncState.BlobsSynced,
shardState.SyncState.BlobsSynced, oldShardState.SyncState.SyncProgress, shardState.SyncState.SyncProgress))
}
if oldShardState.SyncState.FillEmptyProgress < 10000 &&
(shardState.SyncState.EmptyFilled < oldShardState.SyncState.EmptyFilled ||
shardState.SyncState.FillEmptyProgress < oldShardState.SyncState.FillEmptyProgress) {
addErrorMessage(fmt.Sprintf("es-node fill empty progress do not increase in %f minutes, "+
"old filled: %d, new filled %d; old progress: %d, new progress: %d", expectedStateRefreshTime.Minutes(), oldShardState.SyncState.EmptyFilled,
shardState.SyncState.EmptyFilled, oldShardState.SyncState.FillEmptyProgress, shardState.SyncState.FillEmptyProgress))
}

if oldShardState.SyncState.FillEmptyProgress == 10000 && oldShardState.SyncState.SyncProgress == 10000 &&
(shardState.MiningState.MiningPower == 0 || shardState.MiningState.SamplingTime == 0) {
addErrorMessage("Mining should be start after sync done.")
}
}
if !check {
addErrorMessage(fmt.Sprintf("Shard %d in the new state do not exist in the old state", shardState.ShardId))
}
}
}

func checkFinalState(state *node.NodeState) {
if state == nil {
addErrorMessage("No state submitted during the test")
return
}
if !hasConnectedPeer {
addErrorMessage("es-node peer count should larger than 0")
}

log.Info("Final state", "id", state.Id, "version", state.Version)
for _, shardState := range state.Shards {
if shardState.SyncState.SyncProgress != 10000 {
addErrorMessage("Sync should be finished during the test")
}
if shardState.SyncState.FillEmptyProgress != 10000 {
addErrorMessage("Fill should be finished during the test")
}
if shardState.MiningState.SamplingTime == 0 || shardState.MiningState.MiningPower == 0 {
addErrorMessage("Mining should be start after sync done.")
}
if shardState.SubmissionState.LastSucceededTime == 0 || shardState.SubmissionState.Succeeded == 0 {
addErrorMessage("At lease one block should be mined successfully during the test.")
}
if shardState.SubmissionState.Failed > 0 {
addErrorMessage(fmt.Sprintf("%d submission failed during the test.", shardState.SubmissionState.Failed))
}
log.Info("Final state", "id", state.Id, "shard", shardState.ShardId, "miner", shardState.Miner, "sync progress",
shardState.SyncState.SyncProgress, "fill progress", shardState.SyncState.FillEmptyProgress, "mining power",
shardState.MiningState.MiningPower, "sampling time", shardState.MiningState.SamplingTime, "succeeded submission",
shardState.SubmissionState.Succeeded, "failed submission", shardState.SubmissionState.Failed, "dropped submission",
shardState.SubmissionState.Dropped, "last succeeded time", shardState.SubmissionState.LastSucceededTime)
}
}

func createShardManager() (*es.ShardManager, error) {
sm := es.NewShardManager(contractAddress, kvSize, kvEntries, kvSize)
df0, err := es.OpenDataFile(shardFile0)
if err != nil {
return nil, err
}
err = sm.AddDataFileAndShard(df0)
if err != nil {
return nil, err
}

df1, err := es.OpenDataFile(shardFile1)
if err != nil {
return nil, err
}
err = sm.AddDataFileAndShard(df1)
if err != nil {
return nil, err
}

return sm, nil
}

func verifyData() error {
file, err := os.OpenFile(uploadedDataFile, os.O_RDONLY, 0755)
if err != nil {
return err
}
defer file.Close()

fileScanner := bufio.NewScanner(file)
fileScanner.Buffer(make([]byte, dataSize*2), kvSize*2)
fileScanner.Split(bufio.ScanLines)

sm, err := createShardManager()
if err != nil {
return err
}

client, err := rpc.DialHTTP(rpcEndpoint)
if err != nil {
return err
}
defer client.Close()

i := uint64(0)
for fileScanner.Scan() {
expectedData := common.Hex2Bytes(fileScanner.Text())
blob := utils.EncodeBlobs(expectedData)[0]
commit, _, _ := sm.TryReadMeta(i)
data, _, err := sm.TryRead(i, kvSize, common.BytesToHash(commit))
if err != nil {
return errors.New(fmt.Sprintf("read %d from shard fail with err: %s", i, err.Error()))
}
if bytes.Compare(blob[:], data) != 0 {
return errors.New(fmt.Sprintf("compare shard data %d fail, expected data %s; data: %s",
i, common.Bytes2Hex(blob[:64]), common.Bytes2Hex(data[:64])))
}

rpcdata, err := downloadBlobFromRPC(client, i, common.BytesToHash(commit))
if err != nil {
return errors.New(fmt.Sprintf("get data %d from rpc fail with err: %s", i, err.Error()))
}
if bytes.Compare(blob[:], rpcdata) != 0 {
return errors.New(fmt.Sprintf("compare rpc data %d fail, expected data %s; data: %s",
i, common.Bytes2Hex(blob[:64]), common.Bytes2Hex(rpcdata[:64])))
}
i++
}
return nil
}

func downloadBlobFromRPC(client *rpc.Client, kvIndex uint64, hash common.Hash) ([]byte, error) {
var result hexutil.Bytes
err := client.Call(&result, "es_getBlob", kvIndex, hash, 0, 0, 4096*32)
if err != nil {
return nil, err
}

var blob kzg4844.Blob
copy(blob[:], result)
commit, err := kzg4844.BlobToCommitment(blob)
if err != nil {
return nil, fmt.Errorf("blobToCommitment failed: %w", err)
}
cmt := common.Hash(eth.KZGToVersionedHash(commit))
if bytes.Compare(cmt[:es.HashSizeInContract], hash[:es.HashSizeInContract]) != 0 {
return nil, fmt.Errorf("invalid blob for %d hash: %s, commit: %s", kvIndex, hash, cmt)
}

return result, nil
}

func addErrorMessage(errMessage string) {
log.Warn("Add error message", "msg", errMessage)
errorMessages = append(errorMessages, errMessage+"\n")
}

func listenAndServe(port int) error {
http.HandleFunc("/hello", HelloHandler)
http.HandleFunc("/reportstate", ReportStateHandler)
return http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}

func main() {
// Parse the flags and set up the logger to print everything requested
flag.Parse()
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(3), log.StreamHandler(os.Stderr, log.TerminalFormat(true))))

if *portFlag < 0 || *portFlag > math.MaxUint16 {
log.Crit("Invalid port")
}
if *contractAddr == "" {
log.Crit("Invalid contract address")
} else {
contractAddress = common.HexToAddress(*contractAddr)
}

go listenAndServe(*portFlag)

time.Sleep(executionTime)
checkFinalState(lastRecord)
if err := verifyData(); err != nil {
addErrorMessage(err.Error())
}

if len(errorMessages) > 0 {
log.Crit(fmt.Sprintf("integration test fail %v", errorMessages))
}
}
3 changes: 0 additions & 3 deletions ethstorage/p2p/protocol/syncclient.go
Original file line number Diff line number Diff line change
@@ -895,12 +895,9 @@ func (s *SyncClient) assignFillEmptyBlobTasks() {
s.notifyUpdate()
s.wg.Done()
}()
t := time.Now()
next, err := s.FillFileWithEmptyBlob(start, limit)
if err != nil {
log.Warn("Fill in empty fail", "err", err.Error())
} else {
log.Debug("Fill in empty done", "time", time.Now().Sub(t).Seconds())
}
filled := next - start

6 changes: 4 additions & 2 deletions init.sh
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ zkp_impl=1
# ZK prover mode, 1: one proof per sample, 2: one proof for multiple samples.
# Note: currently only zk prover mode 2 is supported
zkp_mode=2
data_dir="./es-data"

remaining_args=""
shards="--shard_index 0"
@@ -24,6 +25,9 @@ while [ $# -gt 0 ]; do
elif [[ $1 == --miner.zk-prover-mode ]]; then
zkp_mode=$2
shift 2
elif [[ $1 == --datadir ]]; then
data_dir=$2
shift 2
else
if [[ $1 == --shard_index ]]; then
shards=""
@@ -103,8 +107,6 @@ if [ "$zkp_impl" = 1 ]; then

fi

data_dir="./es-data"

es_node_init="$executable init $shards \
--datadir $data_dir \
--l1.rpc http://88.99.30.186:8545 \
96 changes: 96 additions & 0 deletions integration_tests/scripts/ituploader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
const {ethers, Contract} = require("ethers");
const crypto = require('crypto');
const {EthStorage} = require("ethstorage-sdk");
const core = require('@actions/core');
const fs = require('fs');

const dotenv = require("dotenv")
dotenv.config()
const privateKey = process.env.ES_NODE_UPLOADER_PRIVATE_KEY;
const contractAddr = process.env.ES_NODE_CONTRACT_ADDRESS;
const RPC = 'http://65.109.20.29:8545';
const contractABI = [
"function lastKvIdx() public view returns (uint40)"
]

const provider = new ethers.JsonRpcProvider(RPC);
const contract = new Contract(contractAddr, contractABI, provider);
const MAX_BLOB = BigInt(process.argv[2]);
const BATCH_SIZE = 6n;
const NEED_WAIT = (process.argv[3] === 'true');

async function UploadBlobsForIntegrationTest() {
// put blobs
console.log(contractAddr)
const es = await EthStorage.create({
rpc: RPC,
privateKey,
address: contractAddr
})
while (true) {
const currentIndex = await contract.lastKvIdx();
const totalCount = MAX_BLOB - currentIndex;
console.log("Current Number:", currentIndex, " Total Number:", totalCount, "at", new Date().toLocaleTimeString([], { hour: "2-digit", minute: "2-digit", second: "2-digit" }));
if (totalCount <= 0) {
break;
}

let keys = [];
let blobs = [];
for (let i = 0; i < BATCH_SIZE && i < totalCount; i++) {
const buf = crypto.randomBytes(126976);
keys[i] = buf.subarray(0,32).toString('hex')
blobs[i] = buf
}

// write blobs
try {
let status = await es.writeBlobs(keys, blobs);
if (status == false) {
continue
}
console.log(status);
} catch (e) {
console.log("upload blob error:", e.message);
continue
}
for (let i = 0; i < blobs.length; i++) {
fs.writeFileSync(".data", blobs[i].toString('hex')+'\n', { flag: 'a+' });
}
}

if (!NEED_WAIT) {
return
}

let latestBlock
try {
latestBlock = await provider.getBlock();
console.log("latest block number is", latestBlock.number);
} catch (e) {
core.setFailed(`EthStorage: get latest block failed with message: ${e.message}`);
return
}

// wait for blobs finalized
var intervalId = setInterval(async function (){
try {
let finalizedBlock = await provider.getBlock("finalized");
console.log(
"finalized block number is",
finalizedBlock.number,
"at",
new Date().toLocaleTimeString([], { hour: "2-digit", minute: "2-digit", second: "2-digit" })
);
if (latestBlock.number < finalizedBlock.number) {
setTimeout(() => console.log("Upload done!"), 300000)
clearInterval(intervalId);
}
} catch (e) {
console.error(`EthStorage: get finalized block failed!`, e.message);
}
}, 120000);
}

UploadBlobsForIntegrationTest();

21 changes: 21 additions & 0 deletions integration_tests/scripts/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "integration-test",
"version": "1.0.0",
"main": "ituploader.js",
"scripts": {
"build": "rollup -c"
},
"dependencies": {
"dotenv": "^16.4.5",
"ethers": "^6.13.1",
"ethstorage-sdk": "^2.1.1",
"@actions/core": "^1.10.1"
},
"repository": {
"type": "git",
"url": "git+https://github.com/ethstorage/es-node.git"
},
"author": "ethsorage",
"license": "ISC",
"description": ""
}
41 changes: 41 additions & 0 deletions run-l2-it-rpc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/sh

data_dir="./es-data-it-bootnode"
storage_file_0="$data_dir/shard-0.dat"
storage_file_1="$data_dir/shard-1.dat"
zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey"

if test -d ${data_dir} ; then
rm -r ${data_dir}
fi
mkdir ${data_dir}
echo "8714eb2672bb7ab01089a1060150b30bc374a3b00e18926460f169256d126339" > "${data_dir}/esnode_p2p_priv.txt"

./init-l2.sh \
--shard_index 0 \
--shard_index 1 \
--encoding_type=0 \
--datadir $data_dir \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS


exec ./build/bin/es-node \
--network integration \
--datadir $data_dir \
--storage.files $storage_file_0 \
--storage.files $storage_file_1 \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS \
--miner.enabled=false \
--miner.zkey $zkey_file \
--l1.block_time 2 \
--l1.rpc http://65.109.20.29:8545 \
--da.url http://65.109.20.29:8888 \
--randao.url http://88.99.30.186:8545 \
--rpc.port 9595 \
--p2p.listen.udp 30395 \
--p2p.listen.tcp 9295 \
--p2p.priv.path $data_dir/esnode_p2p_priv.txt \
--p2p.peerstore.path $data_dir/esnode_peerstore_db \
--p2p.discovery.path $data_dir/esnode_discovery_db \
--rpc.addr 0.0.0.0

39 changes: 39 additions & 0 deletions run-l2-it.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/bash

data_dir="./es-data-it"
storage_file_0="$data_dir/shard-0.dat"
storage_file_1="$data_dir/shard-1.dat"
zkey_file="./build/bin/snark_lib/zkey/blob_poseidon2.zkey"

if test -d ${data_dir} ; then
rm -r ${data_dir}
fi
mkdir ${data_dir}

./init-l2.sh \
--shard_index 0 \
--shard_index 1 \
--datadir $data_dir \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS


exec ./build/bin/es-node \
--network integration \
--datadir $data_dir \
--storage.files $storage_file_0 \
--storage.files $storage_file_1 \
--storage.l1contract $ES_NODE_CONTRACT_ADDRESS \
--miner.enabled \
--miner.zkey $zkey_file \
--l1.block_time 2 \
--l1.rpc http://65.109.20.29:8545 \
--da.url http://65.109.20.29:8888 \
--randao.url http://88.99.30.186:8545 \
--state.upload.url http://127.0.0.1:9096 \
--rpc.port 9596 \
--p2p.listen.udp 30396 \
--p2p.listen.tcp 9296 \
--p2p.priv.path $data_dir/esnode_p2p_priv.txt \
--p2p.peerstore.path $data_dir/esnode_peerstore_db \
--p2p.discovery.path $data_dir/esnode_discovery_db \
--p2p.bootnodes enr:-Li4QBp6QW2ji7JF-3yijZrQ54PqPZ-Io_xEtMUslxxcmGS5TAXiiU6hypBZbB_atxh2Pc72-MgonzU5_R-_qd_PBXyGAZDucmwzimV0aHN0b3JhZ2XbAYDY15SXhtonBXvE13WNGfkk7Nj9Y4_Qr8GAgmlkgnY0gmlwhFhjHrqJc2VjcDI1NmsxoQJ8KIsZjyfFPHZOR66JORtqr5ax0QU6QmvT6QE0QllVZIN0Y3CCJE-DdWRwgna7

0 comments on commit 396dd13

Please sign in to comment.