Skip to content

Commit

Permalink
Merge pull request #187 from ethstorage/2shards
Browse files Browse the repository at this point in the history
Mining multiple shards support
  • Loading branch information
syntrust authored Sep 9, 2024
2 parents 2eac50b + 2da0c0d commit 1e469eb
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 173 deletions.
21 changes: 8 additions & 13 deletions cmd/es-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"context"
"fmt"
"math/big"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -231,20 +230,16 @@ func EsNodeInit(ctx *cli.Context) error {
log.Info("Storage config loaded", "storageCfg", storageCfg)
var shardIdxList []uint64
if len(shardIndexes) > 0 {
// check existense of shard indexes but add shard 0 anyway
out:
for i := 0; i < len(shardIndexes); i++ {
shard := uint64(shardIndexes[i])
if shard > 0 {
diff, err := getDifficulty(cctx, client, l1Contract, shard)
if err != nil {
log.Error("Failed to get shard info from contract", "error", err)
return err
}
if diff != nil && diff.Cmp(big.NewInt(0)) == 0 {
return fmt.Errorf("Shard not exist: %d", shard)
new := uint64(shardIndexes[i])
// prevent duplicated
for _, s := range shardIdxList {
if s == new {
continue out
}
}
shardIdxList = append(shardIdxList, shard)
shardIdxList = append(shardIdxList, new)
}
} else {
// get shard indexes of length shardLen from contract
Expand All @@ -254,7 +249,7 @@ func EsNodeInit(ctx *cli.Context) error {
return err
}
if len(shardList) == 0 {
return fmt.Errorf("No shard indexes found")
return fmt.Errorf("no shard indexes found")
}
shardIdxList = shardList
}
Expand Down
16 changes: 12 additions & 4 deletions cmd/es-node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func getShardList(ctx context.Context, client *ethclient.Client, contract common
}

func getDifficulty(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) (*big.Int, error) {
res, err := getMiningInfo(ctx, client, contract, shardIdx)
if err != nil {
return nil, err
}
return res[1].(*big.Int), nil
}

func getMiningInfo(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) ([]interface{}, error) {
uint256Type, _ := abi.NewType("uint256", "", nil)
dataField, _ := abi.Arguments{{Type: uint256Type}}.Pack(new(big.Int).SetUint64(shardIdx))
h := crypto.Keccak256Hash([]byte(`infos(uint256)`))
Expand All @@ -136,10 +144,10 @@ func getDifficulty(ctx context.Context, client *ethclient.Client, contract commo
{Type: uint256Type},
}.UnpackValues(bs)
if res == nil || len(res) < 3 {
log.Error("Query difficulty by shard", "error", "invalid result", "result", res)
log.Error("Query mining info by shard", "error", "invalid result", "result", res)
return nil, fmt.Errorf("invalid result: %v", res)
}
return res[1].(*big.Int), nil
return res, nil
}

func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir string, encodingType int) ([]string, error) {
Expand All @@ -154,8 +162,8 @@ func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir s
for _, shardIdx := range shardIdxList {
dataFile := filepath.Join(datadir, fmt.Sprintf(fileName, shardIdx))
if _, err := os.Stat(dataFile); err == nil {
log.Error("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
return nil, err
log.Warn("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
continue
}
if cfg.ChunkSize == 0 {
return nil, fmt.Errorf("chunk size should not be 0")
Expand Down
61 changes: 52 additions & 9 deletions cmd/es-utils/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,24 @@ func SendBlobTx(
}
}

maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
var blobPrice *uint256.Int
if maxFeePerDataGas != "" {
maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
}
blobPrice = maxFeePerDataGas256
} else {
blobBaseFee, err := queryBlobBaseFee(client)
if err != nil {
log.Crit("Error getting blob base fee", "error", err)
}
log.Info("Query blob base fee done", "blobBaseFee", blobBaseFee)
blobBaseFee256, nok := uint256.FromBig(blobBaseFee)
if nok {
log.Crit("Error converting blob base fee to uint256", "blobBaseFee", blobBaseFee)
}
blobPrice = blobBaseFee256
}
var blobs []kzg4844.Blob
if needEncoding {
Expand Down Expand Up @@ -159,7 +174,7 @@ func SendBlobTx(
To: to,
Value: value256,
Data: calldataBytes,
BlobFeeCap: maxFeePerDataGas256,
BlobFeeCap: blobPrice,
BlobHashes: versionedHashes,
Sidecar: sideCar,
}
Expand Down Expand Up @@ -300,6 +315,8 @@ func UploadBlobs(
}
signer := crypto.PubkeyToAddress(key.PublicKey)
var keys []common.Hash
var blobIndex []*big.Int
var lengthes []*big.Int

var blobs []kzg4844.Blob
if needEncoding {
Expand All @@ -309,10 +326,23 @@ func UploadBlobs(
}
for i, blob := range blobs {
keys = append(keys, genKey(signer, i, blob[:]))
blobIndex = append(blobIndex, new(big.Int).SetUint64(uint64(i)))
lengthes = append(lengthes, new(big.Int).SetUint64(BlobSize))
}
log.Info("blobs", "keys", keys, "blobIndexes", blobIndex, "sizes", lengthes)
bytes32Array, _ := abi.NewType("bytes32[]", "", nil)
dataField, _ := abi.Arguments{{Type: bytes32Array}}.Pack(keys)
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[])"))
uint256Array, _ := abi.NewType("uint256[]", "", nil)
args := abi.Arguments{
{Type: bytes32Array},
{Type: uint256Array},
{Type: uint256Array},
}
dataField, err := args.Pack(keys, blobIndex, lengthes)
if err != nil {
log.Error("Failed to pack data", "err", err)
return nil, nil, err
}
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[],uint256[],uint256[])"))
calldata := "0x" + common.Bytes2Hex(append(h[0:4], dataField...))
tx := SendBlobTx(
rpc,
Expand All @@ -325,7 +355,7 @@ func UploadBlobs(
5000000,
"",
"",
"300000000",
"",
chainID,
calldata,
)
Expand Down Expand Up @@ -371,10 +401,23 @@ func UploadBlobs(
log.Info("Timed out for receipt, query contract for data hash...")
}
// if wait receipt timed out or failed, query contract for data hash
return getKvInfo(pc, contractAddr, len(blobs))
return getKvInfo(pc, len(blobs))
}

func queryBlobBaseFee(l1 *ethclient.Client) (*big.Int, error) {
var hex string
err := l1.Client().CallContext(context.Background(), &hex, "eth_blobBaseFee")
if err != nil {
return nil, err
}
blobBaseFee, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, errors.New("invalid blob base fee")
}
return blobBaseFee, nil
}

func getKvInfo(pc *eth.PollingClient, contractAddr common.Address, blobLen int) ([]uint64, []common.Hash, error) {
func getKvInfo(pc *eth.PollingClient, blobLen int) ([]uint64, []common.Hash, error) {
lastIdx, err := pc.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64())
if err != nil {
return nil, nil, err
Expand Down
11 changes: 7 additions & 4 deletions ethstorage/miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (w *worker) taskLoop(taskCh chan *taskItem) {
w.lg.Info("Mine task success", "shard", ti.shardIdx, "thread", ti.thread, "block", ti.blockNumber)
}
case <-w.exitCh:
w.lg.Warn("Worker is exiting from task loop...")
w.lg.Debug("Worker is exiting from task loop...")
return
}
}
Expand Down Expand Up @@ -459,6 +459,9 @@ func (w *worker) resultLoop() {
errorCache = append(errorCache, err)
case <-w.exitCh:
w.lg.Warn("Worker is exiting from result loop...")
for _, e := range errorCache {
w.lg.Error("Mining error since es-node launched", "err", e)
}
return
}
}
Expand Down Expand Up @@ -548,19 +551,19 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
return false, err
}
if t.requiredDiff.Cmp(new(big.Int).SetBytes(hash1.Bytes())) >= 0 {
w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "nonce", nonce)
w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "block", t.blockNumber, "timestamp", t.mineTime, "randao", t.mixHash, "nonce", nonce, "hash0", hash0, "hash1", hash1, "sampleIdxs", sampleIdxs)
dataSet, kvIdxs, sampleIdxsInKv, encodingKeys, encodedSamples, err := w.getMiningData(t.task, sampleIdxs)
if err != nil {
w.lg.Error("Get sample data failed", "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "err", err.Error())
return false, err
}
w.lg.Info("Got sample data", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
w.lg.Info("Got sample data", "shard", t.shardIdx, "block", t.blockNumber, "encodedSamples", encodedSamples)
masks, decodeProof, inclusiveProofs, err := w.prover.GetStorageProof(dataSet, encodingKeys, sampleIdxsInKv)
if err != nil {
w.lg.Error("Get storage proof error", "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "error", err.Error())
return false, fmt.Errorf("get proof err: %v", err)
}
w.lg.Info("Got storage proof", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
w.lg.Info("Got storage proof", "shard", t.shardIdx, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
newResult := &result{
blockNumber: t.blockNumber,
startShardId: t.shardIdx,
Expand Down
4 changes: 2 additions & 2 deletions ethstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *StorageManager) CommitBlobs(kvIndices []uint64, blobs [][]byte, commits
for i := 0; i < len(kvIndices); i++ {
encodedBlob, success, err := s.shardManager.TryEncodeKV(kvIndices[i], blobs[i], commits[i])
if !success || err != nil {
log.Warn("Blob encode failed", "index", kvIndices[i], "err", err.Error())
log.Warn("Blob encode failed", "index", kvIndices[i], "err", err)
continue
}
encodedBlobs[i] = encodedBlob
Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *StorageManager) CommitEmptyBlobs(start, limit uint64) (uint64, uint64,
for i := start; i <= limit; i++ {
encodedBlob, success, err := s.shardManager.TryEncodeKV(i, emptyBs, hash)
if !success || err != nil {
log.Warn("Blob encode failed", "index", i, "err", err.Error())
log.Warn("Blob encode failed", "index", i, "err", err)
break
}
encodedBlobs = append(encodedBlobs, encodedBlob)
Expand Down
2 changes: 1 addition & 1 deletion init-l2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
./init.sh \
--l1.rpc http://65.109.20.29:8545 \
--storage.l1contract 0x64003adbdf3014f7E38FC6BE752EB047b95da89A \
$@
$@
23 changes: 11 additions & 12 deletions init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ zkp_impl=1
zkp_mode=2

remaining_args=""
shards="--shard_index 0"

while [ $# -gt 0 ]; do
if [[ $1 == --miner.zk-prover-impl ]]; then
Expand All @@ -24,6 +25,9 @@ while [ $# -gt 0 ]; do
zkp_mode=$2
shift 2
else
if [[ $1 == --shard_index ]]; then
shards=""
fi
remaining_args="$remaining_args $1"
shift
fi
Expand Down Expand Up @@ -100,22 +104,17 @@ if [ "$zkp_impl" = 1 ]; then
fi

data_dir="./es-data"
storage_file_0="$data_dir/shard-0.dat"

es_node_init="$executable init --shard_index 0 \
es_node_init="$executable init $shards \
--datadir $data_dir \
--l1.rpc http://88.99.30.186:8545 \
--storage.l1contract 0x804C520d3c084C805E37A35E90057Ac32831F96f \
$remaining_args"

# create data file for shard 0 if not yet
if [ ! -e $storage_file_0 ]; then
if $es_node_init ; then
echo "√ Initialized ${storage_file_0} successfully"
else
echo "Error: failed to initialize ${storage_file_0}"
exit 1
fi
else
echo "Warning: storage file ${storage_file_0} already exists, skip initialization."
# es-node will skip init if data files already exist
if $es_node_init ; then
echo "√ Initialized data files successfully."
else
echo "Error: failed to initialize data files."
exit 1
fi
3 changes: 3 additions & 0 deletions integration_tests/gen_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ func generateRandomContent(sizeInKB int) []byte {
}
return []byte(content)
}
func generateRandomBlobs(blobLen int) []byte {
return generateRandomContent(128 * 31 / 32 * blobLen)
}
Loading

0 comments on commit 1e469eb

Please sign in to comment.