Skip to content

Commit

Permalink
ethmonitor: streaming blocks support (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkieltyka authored Mar 18, 2024
1 parent a1fcf88 commit e38e491
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 50 deletions.
134 changes: 134 additions & 0 deletions cmd/chain-newheads/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/0xsequence/ethkit/go-ethereum/common/hexutil"
"github.com/0xsequence/ethkit/go-ethereum/rpc"
"github.com/0xsequence/ethkit/util"
)

var (
ETH_NODE_URL = ""
ETH_NODE_WSS_URL = ""
)

func init() {
testConfig, err := util.ReadTestConfig("../../ethkit-test.json")
if err != nil {
panic(err)
}

if testConfig["POLYGON_MAINNET_WSS_URL"] != "" {
ETH_NODE_URL = testConfig["POLYGON_MAINNET_URL"]
ETH_NODE_WSS_URL = testConfig["POLYGON_MAINNET_WSS_URL"]
}
// if testConfig["MAINNET_URL"] != "" {
// ETH_NODE_URL = testConfig["MAINNET_URL"]
// ETH_NODE_WSS_URL = testConfig["MAINNET_WSS_URL"]
// }
if testConfig["ARB_NOVA_WSS_URL"] != "" {
ETH_NODE_URL = testConfig["ARB_NOVA_URL"]
ETH_NODE_WSS_URL = testConfig["ARB_NOVA_WSS_URL"]
}
// if testConfig["AVAX_MAINNET_WSS_URL"] != "" {
// // ETH_NODE_URL = testConfig["ARB_NOVA_URL"]
// ETH_NODE_WSS_URL = testConfig["AVAX_MAINNET_WSS_URL"]
// }
}

func main() {
client, err := rpc.Dial(ETH_NODE_WSS_URL)
if err != nil {
log.Fatal(err)
}

ch := make(chan map[string]interface{})

sub, err := client.EthSubscribe(context.Background(), ch, "newHeads")
if err != nil {
log.Fatal(err)
}

var prevHash string
go func() {
for {
select {

case err := <-sub.Err():
fmt.Println("sub err!", err)
os.Exit(1)

case out := <-ch:
// fmt.Println("===> out:", out)
// spew.Dump(out)

hash, ok := out["hash"].(string)
if !ok {
panic(ok)
}
parentHash, ok := out["parentHash"].(string)
if !ok {
panic(ok)
}
if prevHash != "" {
if prevHash != parentHash {
fmt.Println("REORG!")
}
}
prevHash = hash
num, ok := out["number"].(string)
if !ok {
panic("hmm")
}
blockNumber := hexutil.MustDecodeBig(num)
fmt.Println("hash", hash, "num", blockNumber.String())
}
}
}()

time.Sleep(20 * time.Minute)
sub.Unsubscribe()

// os.Exit(1)

// filter := map[string]interface{}{
// "topics": []string{},
// // "fromBlock": "latest",
// }

// sub, err = client.EthSubscribe(context.Background(), ch, "logs", filter)
// if err != nil {
// log.Fatal(err)
// }

// go func() {
// for {
// select {

// case err := <-sub.Err():
// fmt.Println("sub err!", err)
// os.Exit(1)

// case out := <-ch:
// // fmt.Println("===> out:", out)
// spew.Dump(out)

// removed, ok := out["removed"].(bool)
// if !ok {
// panic("no")
// }
// if removed {
// panic("removed!!!")
// }
// }
// }
// }()

// time.Sleep(2 * time.Minute)
// sub.Unsubscribe()
}
5 changes: 4 additions & 1 deletion cmd/chain-receipts/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

var ETH_NODE_URL = "http://localhost:8545"
var ETH_NODE_WSS_URL = ""

func init() {
testConfig, err := util.ReadTestConfig("../../ethkit-test.json")
Expand All @@ -29,17 +30,19 @@ func init() {

if testConfig["POLYGON_MAINNET_URL"] != "" {
ETH_NODE_URL = testConfig["POLYGON_MAINNET_URL"]
ETH_NODE_WSS_URL = testConfig["POLYGON_MAINNET_WSS_URL"]
}
// if testConfig["MAINNET_URL"] != "" {
// ETH_NODE_URL = testConfig["MAINNET_URL"]
// ETH_NODE_WSS_URL = testConfig["MAINNET_WSS_URL"]
// }
}

func main() {
fmt.Println("chain-receipts start")

// Provider
provider, err := ethrpc.NewProvider(ETH_NODE_URL)
provider, err := ethrpc.NewProvider(ETH_NODE_URL, ethrpc.WithStreaming(ETH_NODE_WSS_URL))
if err != nil {
log.Fatal(err)
}
Expand Down
19 changes: 17 additions & 2 deletions cmd/chain-watch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"math/big"
"os"
"path/filepath"
"sync"
Expand All @@ -20,6 +21,7 @@ import (
)

var ETH_NODE_URL = "http://localhost:8887/polygon"
var ETH_NODE_WSS_URL = ""

const SNAPSHOT_ENABLED = false

Expand All @@ -33,17 +35,23 @@ func init() {

if testConfig["POLYGON_MAINNET_URL"] != "" {
ETH_NODE_URL = testConfig["POLYGON_MAINNET_URL"]
ETH_NODE_WSS_URL = testConfig["POLYGON_MAINNET_WSS_URL"]
}
// if testConfig["MAINNET_URL"] != "" {
// ETH_NODE_URL = testConfig["MAINNET_URL"]
// }
if testConfig["ARB_NOVA_URL"] != "" {
ETH_NODE_URL = testConfig["ARB_NOVA_URL"]
ETH_NODE_WSS_URL = testConfig["ARB_NOVA_WSS_URL"]
}
}

func main() {
fmt.Println("chain-watch start")

// Provider
provider, err := ethrpc.NewProvider(ETH_NODE_URL)
// provider, err := ethrpc.NewProvider(ETH_NODE_URL)
provider, err := ethrpc.NewProvider(ETH_NODE_URL, ethrpc.WithStreaming(ETH_NODE_WSS_URL))
if err != nil {
log.Fatal(err)
}
Expand All @@ -57,7 +65,14 @@ func main() {
monitorOptions.PollingInterval = time.Duration(2000 * time.Millisecond)
monitorOptions.WithLogs = true
monitorOptions.BlockRetentionLimit = 64
monitorOptions.StartBlockNumber = nil // track the head
// monitorOptions.StartBlockNumber = nil // track the head

latestBlock, err := provider.BlockByNumber(context.Background(), nil)
if err != nil {
panic(err)
}

monitorOptions.StartBlockNumber = big.NewInt(0).Sub(latestBlock.Number(), big.NewInt(10))
// monitorOptions.StartBlockNumber = big.NewInt(47496451)
// monitorOptions.Bootstrap = true

Expand Down
4 changes: 0 additions & 4 deletions ethmonitor/README.md

This file was deleted.

Loading

0 comments on commit e38e491

Please sign in to comment.