Skip to content
This repository has been archived by the owner on Dec 13, 2022. It is now read-only.

feat(ETH): Loading Ethereum Data #48

Merged
merged 8 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[x] Check that an update mutation won't cause duplicate XIDs.
[x] Deal with nested objects and their upserts.
[ ] NumUids in deletion operation doesn't return any value.
[x] NumUids in deletion operation doesn't return any value.
[ ] Without upsert, we seem to be adding duplicate records.

#### Inverse
Expand All @@ -33,3 +33,11 @@

[ ] Separate out values from UIDs. Posting shouldn't be storing both.
[ ] Don't use value hashes as UIDs for postings.

### Deletions

[ ] This doesn't work right now. Fix it: `<subject:"0x2466c53" predicate:"Account.Outgoing" object_value:"\t_STAR_ALL" op:DEL >`

### Badger

[ ] Have a debug endpoint to inspect Badger table overlaps.
1 change: 0 additions & 1 deletion edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,6 @@ func doMutate(ctx context.Context, qc *queryContext, resp *pb.Response) error {

qc.span.Annotatef(nil, "Applying mutations: %+v", mu)
resp.Txn, err = query.ApplyMutations(ctx, mu)
glog.Infof("ApplyMutations returned: %+v %v\n", resp.Txn, err)
qc.span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err)

// calculateMutationMetrics calculate cost for the mutation.
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/outcaste-io/outserv
go 1.16

// replace github.com/outcaste-io/ristretto => /home/mrjn/source/ristretto
// replace github.com/outcaste-io/badger/v3 => /home/mrjn/source/badger

require (
cloud.google.com/go/storage v1.15.0
Expand Down Expand Up @@ -38,7 +39,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v1.3.0
github.com/minio/minio-go/v6 v6.0.55
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220405110642-ade7785339cb
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220426173331-b25bc764af0d
github.com/outcaste-io/dgo/v210 v210.0.0-20220225180226-43bd1b427e86
github.com/outcaste-io/gqlgen v0.13.3
github.com/outcaste-io/gqlparser/v2 v2.2.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,8 @@ github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsq
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/ory/dockertest v3.3.4+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs=
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220405110642-ade7785339cb h1:m3ERC2rjewWi0U3SHecqTnNaEhjaQtuNwsiWD5NHD8o=
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220405110642-ade7785339cb/go.mod h1:P5q0708InZMN6Axrw1ZcwepiiCoPeCqRqIyUGKx2aoI=
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220426173331-b25bc764af0d h1:dOoaepdC2hnmjrkSm4aWVF/pMncuAyNqYKgZbDsQE5A=
github.com/outcaste-io/badger/v3 v3.2202.1-0.20220426173331-b25bc764af0d/go.mod h1:P5q0708InZMN6Axrw1ZcwepiiCoPeCqRqIyUGKx2aoI=
github.com/outcaste-io/dgo/v210 v210.0.0-20220225180226-43bd1b427e86 h1:sUjjQrypjiT1ii7opdvvr4oOmWyfgFNjPaqC7WwQRD8=
github.com/outcaste-io/dgo/v210 v210.0.0-20220225180226-43bd1b427e86/go.mod h1:Zcox8pu5QAEuA7uZAnMw4ekyM768IkQnU+msW5mJ88g=
github.com/outcaste-io/gqlgen v0.13.3 h1:dUhBs+vlNmnbMwd9L/wYG4ZXjRXgP94zPbRNjHYQU+s=
Expand Down
8 changes: 8 additions & 0 deletions graphql/resolve/mu.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ func handleDelete(ctx context.Context, m *schema.Field) ([]uint64, error) {
for _, uid := range uids {
uidHex := x.ToHexString(uid)
for _, f := range m.MutatedType().Fields() {
if strings.HasSuffix(f.DgraphAlias(), "Aggregate") {
// TODO(mrjn): This is a hack. We should figure out how to deal
// with this properly.
continue
}
if f.IsID() {
continue
}
accountForInverse(uidHex, f)
mu.Edges = append(mu.Edges, &pb.Edge{
Subject: uidHex,
Expand Down
1 change: 1 addition & 0 deletions importers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The code in this module is licensed under Apache v2.0.
269 changes: 269 additions & 0 deletions importers/eth/eth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Copyright 2022 Outcaste LLC. Licensed under the Apache License v2.0.

package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"math/big"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"
)

var path = flag.String("geth", "", "Geth IPC path")
var graphql = flag.String("gql", "http://localhost:8080", "GraphQL endpoint")
var dryRun = flag.Bool("dry", false, "If true, don't send txns to GraphQL endpoint")
var numGo = flag.Int("gor", 2, "Number of goroutines to use")
var startBlock = flag.Int64("start", 50000, "Start at block")

var txnMu = `
mutation($Txns: [AddTxnInput!]!) {
addTxn(input: $Txns, upsert: true) {
numUids
}
}
`

type Account struct {
Hash string
}
type Txn struct {
Hash string
Value int64
Block int64
To Account
From Account
}
type Batch struct {
Txns []Txn `json:"Txns"`
}
type GQL struct {
Query string `json:"query"`
Variables Batch `json:"variables"`
}
type Resp struct {
NumUids int `json:"numUids"`
}
type DataResp struct {
Resp Resp `json:"addTxn"`
}
type ErrorResp struct {
Message string `json:"message"`
}
type WResp struct {
Data DataResp `json:"data"`
Errors []ErrorResp `json:"errors"`
}
type Block struct {
Wg sync.WaitGroup
Id int64
txns []Txn
}

func (b *Block) Fill() {
defer b.Wg.Done()
blockNumber := big.NewInt(b.Id)
block, err := client.BlockByNumber(context.Background(), blockNumber)
check(err)
for _, tx := range block.Transactions() {
var to, from Account
if msg, err := tx.AsMessage(types.NewEIP155Signer(chainID), nil); err == nil {
from.Hash = msg.From().Hex()
}

if tx.To() != nil {
to.Hash = tx.To().Hex()
}
txn := Txn{
Hash: tx.Hash().Hex(),
Value: tx.Value().Int64(),
Block: blockNumber.Int64(),
To: to,
From: from,
}
if txn.Value == 0 || len(txn.To.Hash) == 0 || len(txn.From.Hash) == 0 {
continue
}
b.txns = append(b.txns, txn)
}
}

func (b *Block) Txns() []Txn {
b.Wg.Wait()
return b.txns
}

type Chain struct {
blockId int64
numProcessed int64
blockCh chan *Block
}

func (c *Chain) BlockingFill() {
defer close(c.blockCh)

c.blockId = *startBlock - 1
for {
blockId := atomic.AddInt64(&c.blockId, 1)
if blockId >= 14e6 {
return
}
b := &Block{Id: blockId}
b.Wg.Add(1)
go b.Fill()
c.blockCh <- b
}
}

func (c *Chain) processTxns(wg *sync.WaitGroup) {
defer wg.Done()

var txns []Txn
sendTxns := func(txns []Txn) error {
if len(txns) == 0 {
return nil
}
q := GQL{
Query: txnMu,
Variables: Batch{Txns: txns},
}
// fmt.Printf("----------> Txns %d. Sending...\n", len(txns))
data, err := json.Marshal(q)
if err != nil {
return err
}
return sendRequest(data)
}

for block := range c.blockCh {
txns = append(txns, block.Txns()...)
if len(txns) >= 1000 {
check(sendTxns(txns))
txns = txns[:0]
}
atomic.AddInt64(&c.numProcessed, 1)
}
check(sendTxns(txns))
}

func (c *Chain) printMetrics() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

start := time.Now()
for range ticker.C {
maxBlockId := atomic.LoadInt64(&c.blockId)
num := atomic.LoadInt64(&c.numProcessed)
dur := time.Since(start)
fmt.Printf("BlockId: %d Processed: %d [ %s @ %6.1f blocks/min ]\n",
maxBlockId, num,
dur.Round(time.Second), float64(num)/dur.Minutes())
}
}
func accountsFrom(out map[string]Account, blockId int64) {
blockNumber := big.NewInt(blockId)
block, err := client.BlockByNumber(context.Background(), blockNumber)
if err != nil {
log.Fatal(err)
}

for _, tx := range block.Transactions() {
var to, from Account
if msg, err := tx.AsMessage(types.NewEIP155Signer(chainID), nil); err == nil {
from.Hash = msg.From().Hex()
}
if _, has := out[from.Hash]; !has {
out[from.Hash] = from
}

if tx.To() != nil {
to.Hash = tx.To().Hex()
if _, has := out[to.Hash]; !has {
out[to.Hash] = to
}
}
}
}

func sendRequest(data []byte) error {
if *dryRun {
return nil
}
// TODO: Check that the schema is correctly set.
var wr WResp
resp, err := http.Post(fmt.Sprintf("%s/graphql", *graphql),
"application/json", bytes.NewBuffer(data))
if err != nil {
return errors.Wrapf(err, "while posting request")
}
out, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()

if err := json.Unmarshal(out, &wr); err != nil {
return errors.Wrapf(err, "response: %s\n", out)
}
for _, werr := range wr.Errors {
if len(werr.Message) > 0 {
return fmt.Errorf("Got error from GraphQL: %s\n", werr.Message)
}
}

return nil
}

const numBlocks int64 = 64

func check(err error) {
if err != nil {
log.Fatalf("Got error: %v", err)
}
}

var client *ethclient.Client
var chainID *big.Int

func main() {
flag.Parse()

var err error
client, err = ethclient.Dial(*path)
if err != nil {
log.Fatal(err)
}

chainID, err = client.NetworkID(context.Background())
if err != nil {
log.Fatal(err)
}

header, err := client.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Latest: %08d\n", header.Number)
fmt.Printf("Using %d goroutines.\n", *numGo)

chain := Chain{
blockCh: make(chan *Block, 16),
}
var wg sync.WaitGroup
for i := 0; i < *numGo; i++ {
wg.Add(1)
go chain.processTxns(&wg)
}
go chain.printMetrics()
chain.BlockingFill()
wg.Wait()
fmt.Println("DONE")
}
24 changes: 24 additions & 0 deletions importers/eth/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module eth

go 1.17

require (
github.com/ethereum/go-ethereum v1.10.17
github.com/pkg/errors v0.9.1
)

require (
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
)
Loading