diff --git a/cmd/abigen/main.go b/cmd/abigen/main.go index 2c65209b009..35d23bd1d15 100644 --- a/cmd/abigen/main.go +++ b/cmd/abigen/main.go @@ -19,13 +19,14 @@ package main import ( "encoding/json" "fmt" - "github.com/ledgerwatch/erigon-lib/common" "io" "os" "path/filepath" "regexp" "strings" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/log/v3" "github.com/urfave/cli/v2" diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go index 8010f269668..a9181e48c9d 100644 --- a/cmd/caplin-phase1/main.go +++ b/cmd/caplin-phase1/main.go @@ -60,7 +60,7 @@ func runCaplinNode(cliCtx *cli.Context) error { if err != nil { log.Error("[Phase1] Could not initialize caplin", "err", err) } - if _, err := debug.Setup(cliCtx, true /* root logger */); err != nil { + if _, _, err := debug.Setup(cliCtx, true /* root logger */); err != nil { return err } log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(cfg.LogLvl), log.StderrHandler)) diff --git a/cmd/devnet/devnet/node.go b/cmd/devnet/devnet/node.go index 5f4c62c4de6..91cc90271ee 100644 --- a/cmd/devnet/devnet/node.go +++ b/cmd/devnet/devnet/node.go @@ -4,12 +4,14 @@ import ( context "context" "fmt" "math/big" + "net/http" "sync" "github.com/c2h5oh/datasize" "github.com/ledgerwatch/erigon/cmd/devnet/accounts" "github.com/ledgerwatch/erigon/cmd/devnet/args" "github.com/ledgerwatch/erigon/cmd/devnet/requests" + "github.com/ledgerwatch/erigon/diagnostics" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/node/nodecfg" "github.com/ledgerwatch/erigon/params" @@ -129,6 +131,7 @@ func (n *node) ChainID() *big.Int { func (n *node) run(ctx *cli.Context) error { var logger log.Logger var err error + var metricsMux *http.ServeMux defer n.done() defer func() { @@ -141,7 +144,7 @@ func (n *node) run(ctx *cli.Context) error { n.Unlock() }() - if logger, err = debug.Setup(ctx, false /* rootLogger */); err != nil { + if logger, metricsMux, err = debug.Setup(ctx, false /* rootLogger */); err != nil { return err } @@ -166,6 +169,10 @@ func (n *node) run(ctx *cli.Context) error { n.ethNode, err = enode.New(n.nodeCfg, n.ethCfg, logger) + if metricsMux != nil { + diagnostics.Setup(ctx, metricsMux, n.ethNode) + } + n.Lock() if n.startErr != nil { n.startErr <- err diff --git a/cmd/devnet/main.go b/cmd/devnet/main.go index 0d6d482b8a2..f4479ac25bb 100644 --- a/cmd/devnet/main.go +++ b/cmd/devnet/main.go @@ -107,7 +107,7 @@ var ( } metricsURLsFlag = cli.StringSliceFlag{ - Name: "metrics.urls", + Name: "debug.urls", Usage: "internal flag", } @@ -199,7 +199,7 @@ func action(ctx *cli.Context) error { if metrics { // TODO should get this from the network as once we have multiple nodes we'll need to iterate the // nodes and create a series of urls - for the moment only one is supported - ctx.Set("metrics.urls", fmt.Sprintf("http://localhost:%d/debug/metrics/", ctx.Int("metrics.port"))) + ctx.Set("metrics.urls", fmt.Sprintf("http://localhost:%d/debug/", ctx.Int("metrics.port"))) } // start the network with each node in a go routine diff --git a/cmd/erigon/main.go b/cmd/erigon/main.go index a77ae422bed..aff45cbd504 100644 --- a/cmd/erigon/main.go +++ b/cmd/erigon/main.go @@ -3,12 +3,14 @@ package main import ( "errors" "fmt" + "net/http" "os" "path/filepath" "reflect" "strings" "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon/diagnostics" "github.com/ledgerwatch/erigon/metrics" "github.com/ledgerwatch/log/v3" "github.com/pelletier/go-toml" @@ -55,7 +57,9 @@ func runErigon(cliCtx *cli.Context) error { var logger log.Logger var err error - if logger, err = debug.Setup(cliCtx, true /* root logger */); err != nil { + var metricsMux *http.ServeMux + + if logger, metricsMux, err = debug.Setup(cliCtx, true /* root logger */); err != nil { return err } @@ -73,6 +77,11 @@ func runErigon(cliCtx *cli.Context) error { log.Error("Erigon startup", "err", err) return err } + + if metricsMux != nil { + diagnostics.Setup(cliCtx, metricsMux, ethNode) + } + err = ethNode.Serve() if err != nil { log.Error("error while serving an Erigon node", "err", err) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 95113ef77ea..814e5eaf693 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -813,10 +813,27 @@ var ( Usage: "Max allowed page size for search methods", Value: 25, } + + DiagnosticsURLFlag = cli.StringFlag{ + Name: "diagnostics.url", + Usage: "URL of the diagnostics system provided by the support team", + } + + DiagnosticsInsecureFlag = cli.BoolFlag{ + Name: "diagnostics.insecure", + Usage: "Allows communication with diagnostics system using self-signed TLS certificates", + } + + DiagnosticsSessionsFlag = cli.StringSliceFlag{ + Name: "diagnostics.ids", + Usage: "Comma separated list of support session ids to connect to", + } ) var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag} +var DiagnosticsFlags = []cli.Flag{&DiagnosticsURLFlag, &DiagnosticsURLFlag, &DiagnosticsSessionsFlag} + // setNodeKey loads a node key from command line flags if provided, // otherwise it tries to load it from datadir, // otherwise it generates a new key in datadir. diff --git a/cmd/utils/flags/flags.go b/cmd/utils/flags/flags.go index 58887f9f8e3..be0a8a396b7 100644 --- a/cmd/utils/flags/flags.go +++ b/cmd/utils/flags/flags.go @@ -305,6 +305,10 @@ func (b *bigValue) Set(s string) error { return nil } +func (b *bigValue) Get() any { + return b.String() +} + // GlobalBig returns the value of a BigFlag from the global flag set. func GlobalBig(ctx *cli.Context, name string) *big.Int { val := ctx.Generic(name) diff --git a/diagnostics/block_body_download.go b/diagnostics/block_body_download_stats.go similarity index 87% rename from diagnostics/block_body_download.go rename to diagnostics/block_body_download_stats.go index bdc2a03ed2e..4903e1a8c99 100644 --- a/diagnostics/block_body_download.go +++ b/diagnostics/block_body_download_stats.go @@ -10,7 +10,7 @@ import ( ) func SetupBlockBodyDownload(metricsMux *http.ServeMux) { - metricsMux.HandleFunc("/debug/metrics/block_body_download", func(w http.ResponseWriter, r *http.Request) { + metricsMux.HandleFunc("/block_body_download", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") writeBlockBodyDownload(w, r) }) diff --git a/diagnostics/cmd_line.go b/diagnostics/cmd_line.go index 419b365a823..db4d9dcfdf5 100644 --- a/diagnostics/cmd_line.go +++ b/diagnostics/cmd_line.go @@ -1,22 +1,29 @@ package diagnostics import ( - "fmt" - "io" "net/http" "os" + "strconv" + "strings" ) func SetupCmdLineAccess(metricsMux *http.ServeMux) { - metricsMux.HandleFunc("/debug/metrics/cmdline", func(w http.ResponseWriter, r *http.Request) { + metricsMux.HandleFunc("/cmdline", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - writeCmdLine(w) - }) -} + w.Header().Set("Content-Type", "application/json") + + var space []byte -func writeCmdLine(w io.Writer) { - fmt.Fprintf(w, "SUCCESS\n") - for _, arg := range os.Args { - fmt.Fprintf(w, "%s\n", arg) - } + w.Write([]byte{'"'}) + for _, arg := range os.Args { + if len(space) > 0 { + w.Write(space) + } else { + space = []byte(" ") + } + + w.Write([]byte(strings.Trim(strconv.Quote(arg), `"`))) + } + w.Write([]byte{'"'}) + }) } diff --git a/diagnostics/db.go b/diagnostics/db.go new file mode 100644 index 00000000000..6769b29425e --- /dev/null +++ b/diagnostics/db.go @@ -0,0 +1,258 @@ +package diagnostics + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "path/filepath" + "strings" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" + "github.com/ledgerwatch/erigon/common/paths" + "github.com/urfave/cli/v2" +) + +func SetupDbAccess(ctx *cli.Context, metricsMux *http.ServeMux) { + var dataDir string + if ctx.IsSet("datadir") { + dataDir = ctx.String("datadir") + } else { + dataDir = paths.DataDirForNetwork(paths.DefaultDataDir(), ctx.String("chain")) + } + metricsMux.HandleFunc("/dbs", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Content-Type", "application/json") + writeDbList(w, dataDir) + }) + metricsMux.HandleFunc("/dbs/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + + urlPath := r.URL.Path + + if !strings.HasPrefix(urlPath, "/dbs/") { + http.Error(w, fmt.Sprintf(`Unexpected path prefix: expected: "/dbs/..." got: "%s"`, urlPath), http.StatusNotFound) + return + } + + pathParts := strings.Split(urlPath[5:], "/") + + if len(pathParts) < 1 { + http.Error(w, fmt.Sprintf(`Unexpected path len: expected: "{db}/tables" got: "%s"`, urlPath), http.StatusNotFound) + return + } + + var dbname string + var sep string + + for len(pathParts) > 0 { + dbname += sep + pathParts[0] + + if sep == "" { + sep = "/" + } + + pathParts = pathParts[1:] + + if pathParts[0] == "tables" { + break + } + + if len(pathParts) < 2 { + http.Error(w, fmt.Sprintf(`Unexpected path part: expected: "tables" got: "%s"`, pathParts[0]), http.StatusNotFound) + return + } + } + + switch len(pathParts) { + case 1: + writeDbTables(w, r, dataDir, dbname) + case 2: + offset, err := offsetValue(r.URL.Query()) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + limit, err := limitValue(r.URL.Query(), 0) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + writeDbRead(w, r, dataDir, dbname, pathParts[1], nil, offset, limit) + case 3: + key, err := base64.URLEncoding.DecodeString(pathParts[2]) + + if err != nil { + http.Error(w, fmt.Sprintf(`key "%s" argument should be base64url encoded: %v`, pathParts[2], err), http.StatusBadRequest) + return + } + + offset, err := offsetValue(r.URL.Query()) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + limit, err := limitValue(r.URL.Query(), 0) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + writeDbRead(w, r, dataDir, dbname, pathParts[1], key, offset, limit) + + default: + http.Error(w, fmt.Sprintf(`Unexpected path parts: "%s"`, strings.Join(pathParts[2:], "/")), http.StatusNotFound) + } + }) +} + +func writeDbList(w http.ResponseWriter, dataDir string) { + w.Header().Set("Content-Type", "application/json") + m := mdbx.PathDbMap() + dbs := make([]string, 0, len(m)) + for path := range m { + dbs = append(dbs, strings.ReplaceAll(strings.TrimPrefix(path, dataDir)[1:], "\\", "/")) + } + + json.NewEncoder(w).Encode(dbs) +} + +func writeDbTables(w http.ResponseWriter, r *http.Request, dataDir string, dbname string) { + m := mdbx.PathDbMap() + db, ok := m[filepath.Join(dataDir, dbname)] + if !ok { + http.Error(w, fmt.Sprintf(`"%s" is not in the list of allowed dbs`, dbname), http.StatusNotFound) + return + } + type table struct { + Name string `json:"name"` + Count uint64 `json:"count"` + Size uint64 `json:"size"` + } + + var tables []table + + if err := db.View(context.Background(), func(tx kv.Tx) error { + var e error + buckets, e := tx.ListBuckets() + if e != nil { + return e + } + + for _, bucket := range buckets { + size, e := tx.BucketSize(bucket) + if e != nil { + return e + } + + var count uint64 + + if e := db.View(context.Background(), func(tx kv.Tx) error { + c, e := tx.Cursor(bucket) + if e != nil { + return e + } + defer c.Close() + count, e = c.Count() + if e != nil { + return e + } + + return nil + }); e != nil { + return e + } + + tables = append(tables, table{bucket, count, size}) + } + + return nil + }); err != nil { + http.Error(w, fmt.Sprintf(`failed to list tables in "%s": %v`, dbname, err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(tables) +} + +func writeDbRead(w http.ResponseWriter, r *http.Request, dataDir string, dbname string, table string, key []byte, offset int64, limit int64) { + m := mdbx.PathDbMap() + db, ok := m[filepath.Join(dataDir, dbname)] + if !ok { + fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", dbname) + return + } + + var results [][2][]byte + var count uint64 + + if err := db.View(context.Background(), func(tx kv.Tx) error { + c, e := tx.Cursor(table) + if e != nil { + return e + } + defer c.Close() + + count, e = c.Count() + + if e != nil { + return e + } + + var k, v []byte + if key == nil { + if k, v, e = c.First(); e != nil { + return e + } + } else if k, v, e = c.Seek(key); e != nil { + return e + } + + var pos int64 + + for e == nil && k != nil && pos < offset { + //TODO - not sure if this is a good idea it may be slooooow + k, _, e = c.Next() + pos++ + } + + for e == nil && k != nil && (limit == 0 || int64(len(results)) < limit) { + results = append(results, [2][]byte{k, v}) + k, v, e = c.Next() + } + return nil + }); err != nil { + fmt.Fprintf(w, "ERROR: reading table %s in %s: %v\n", table, dbname, err) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write([]byte("{")) + fmt.Fprintf(w, `"offset":%d`, offset) + if limit > 0 { + fmt.Fprintf(w, `,"limit":%d`, limit) + } + fmt.Fprintf(w, `,"count":%d`, count) + if len(results) > 0 { + var comma string + w.Write([]byte(`,"results":{`)) + for _, result := range results { + fmt.Fprintf(w, `%s"%s":"%s"`, comma, base64.URLEncoding.EncodeToString(result[0]), base64.URLEncoding.EncodeToString(result[1])) + + if comma == "" { + comma = "," + } + } + w.Write([]byte("}")) + } + w.Write([]byte("}")) +} diff --git a/diagnostics/db_access.go b/diagnostics/db_access.go deleted file mode 100644 index b3c12cd57d0..00000000000 --- a/diagnostics/db_access.go +++ /dev/null @@ -1,139 +0,0 @@ -package diagnostics - -import ( - "context" - "encoding/hex" - "fmt" - "io" - "net/http" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon-lib/kv/mdbx" - "github.com/ledgerwatch/erigon/common/paths" - "github.com/urfave/cli/v2" -) - -func SetupDbAccess(ctx *cli.Context, metricsMux *http.ServeMux) { - var dataDir string - if ctx.IsSet("datadir") { - dataDir = ctx.String("datadir") - } else { - dataDir = paths.DataDirForNetwork(paths.DefaultDataDir(), ctx.String("chain")) - } - metricsMux.HandleFunc("/debug/metrics/db/list", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - writeDbList(w, dataDir) - }) - metricsMux.HandleFunc("/debug/metrics/db/tables", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - writeDbTables(w, r, dataDir) - }) - metricsMux.HandleFunc("/debug/metrics/db/read", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - writeDbRead(w, r, dataDir) - }) -} - -func writeDbList(w io.Writer, dataDir string) { - fmt.Fprintf(w, "SUCCESS\n") - m := mdbx.PathDbMap() - for path := range m { - fmt.Fprintf(w, "%s\n", path) - } -} - -func writeDbTables(w io.Writer, r *http.Request, dataDir string) { - if err := r.ParseForm(); err != nil { - fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err) - return - } - path := r.Form.Get("path") - if path == "" { - fmt.Fprintf(w, "ERROR: path argument is required - specify the relative path to an MDBX database directory") - return - } - m := mdbx.PathDbMap() - db, ok := m[path] - if !ok { - fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", path) - return - } - var tables []string - if err := db.View(context.Background(), func(tx kv.Tx) error { - var e error - tables, e = tx.ListBuckets() - if e != nil { - return e - } - return nil - }); err != nil { - fmt.Fprintf(w, "ERROR: listing tables in %s: %v\n", path, err) - return - } - fmt.Fprintf(w, "SUCCESS\n") - for _, table := range tables { - fmt.Fprintf(w, "%s\n", table) - } -} - -func writeDbRead(w io.Writer, r *http.Request, dataDir string) { - if err := r.ParseForm(); err != nil { - fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err) - return - } - path := r.Form.Get("path") - if path == "" { - fmt.Fprintf(w, "ERROR: path argument is required - specify the relative path to an MDBX database directory") - return - } - m := mdbx.PathDbMap() - db, ok := m[path] - if !ok { - fmt.Fprintf(w, "ERROR: path %s is not in the list of allowed paths", path) - return - } - table := r.Form.Get("table") - if table == "" { - fmt.Fprintf(w, "ERROR: table argument is required - specify the table to read from") - return - } - var key []byte - var err error - keyHex := r.Form.Get("key") - if keyHex != "" { - if key, err = hex.DecodeString(keyHex); err != nil { - fmt.Fprintf(w, "ERROR: key [%s] argument may only contain hexadecimal digits: %v\n", keyHex, err) - return - } - } - var results []string - if err := db.View(context.Background(), func(tx kv.Tx) error { - c, e := tx.Cursor(table) - if e != nil { - return e - } - defer c.Close() - var k, v []byte - if key == nil { - if k, v, e = c.First(); err != nil { - return e - } - } else if k, v, e = c.Seek(key); e != nil { - return e - } - count := 0 - for e == nil && k != nil && count < 256 { - results = append(results, fmt.Sprintf("%x | %x", k, v)) - count++ - k, v, e = c.Next() - } - return nil - }); err != nil { - fmt.Fprintf(w, "ERROR: reading table %s in %s: %v\n", table, path, err) - return - } - fmt.Fprintf(w, "SUCCESS\n") - for _, result := range results { - fmt.Fprintf(w, "%s\n", result) - } -} diff --git a/diagnostics/flags.go b/diagnostics/flags.go index c737b02b702..9cdf0267031 100644 --- a/diagnostics/flags.go +++ b/diagnostics/flags.go @@ -1,23 +1,55 @@ package diagnostics import ( - "fmt" - "io" + "encoding/json" "net/http" "github.com/urfave/cli/v2" ) func SetupFlagsAccess(ctx *cli.Context, metricsMux *http.ServeMux) { - metricsMux.HandleFunc("/debug/metrics/flags", func(w http.ResponseWriter, r *http.Request) { + metricsMux.HandleFunc("/flags", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - writeFlags(w, ctx) - }) -} + w.Header().Set("Content-Type", "application/json") + flags := map[string]interface{}{} + + ctxFlags := map[string]struct{}{} + + for _, flagName := range ctx.FlagNames() { + ctxFlags[flagName] = struct{}{} + } + + for _, flag := range ctx.App.Flags { + name := flag.Names()[0] + value := ctx.Value(name) -func writeFlags(w io.Writer, ctx *cli.Context) { - fmt.Fprintf(w, "SUCCESS\n") - for _, flagName := range ctx.FlagNames() { - fmt.Fprintf(w, "%s=%v\n", flagName, ctx.Value(flagName)) - } + switch typed := value.(type) { + case string: + if typed == "" { + continue + } + case cli.UintSlice: + value = typed.Value() + } + + var usage string + + if docFlag, ok := flag.(cli.DocGenerationFlag); ok { + usage = docFlag.GetUsage() + } + + _, inCtx := ctxFlags[name] + + flags[name] = struct { + Value interface{} `json:"value,omitempty"` + Usage string `json:"usage,omitempty"` + Default bool `json:"default"` + }{ + Value: value, + Usage: usage, + Default: !inCtx, + } + } + json.NewEncoder(w).Encode(flags) + }) } diff --git a/diagnostics/header_downloader_stats.go b/diagnostics/header_downloader_stats.go index 222f92fb261..a388d6fb4ae 100644 --- a/diagnostics/header_downloader_stats.go +++ b/diagnostics/header_downloader_stats.go @@ -10,7 +10,7 @@ import ( ) func SetupHeaderDownloadStats(metricsMux *http.ServeMux) { - metricsMux.HandleFunc("/debug/metrics/headers_download", func(w http.ResponseWriter, r *http.Request) { + metricsMux.HandleFunc("/headers_download", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") writeHeaderDownload(w, r) }) diff --git a/diagnostics/logs.go b/diagnostics/logs.go new file mode 100644 index 00000000000..72196aa79a4 --- /dev/null +++ b/diagnostics/logs.go @@ -0,0 +1,190 @@ +package diagnostics + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strconv" + + "github.com/urfave/cli/v2" + + "github.com/ledgerwatch/erigon/turbo/logging" +) + +func SetupLogsAccess(ctx *cli.Context, metricsMux *http.ServeMux) { + dirPath := ctx.String(logging.LogDirPathFlag.Name) + if dirPath == "" { + datadir := ctx.String("datadir") + if datadir != "" { + dirPath = filepath.Join(datadir, "logs") + } + } + if dirPath == "" { + return + } + metricsMux.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + writeLogsList(w, dirPath) + }) + metricsMux.HandleFunc("/logs/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + writeLogsRead(w, r, dirPath) + }) +} + +func writeLogsList(w http.ResponseWriter, dirPath string) { + entries, err := os.ReadDir(dirPath) + if err != nil { + http.Error(w, fmt.Sprintf("Failed to list directory %s: %v", dirPath, err), http.StatusInternalServerError) + return + } + + infos := make([]fs.FileInfo, 0, len(entries)) + + for _, entry := range entries { + fileInfo, err := os.Stat(filepath.Join(dirPath, entry.Name())) + if err != nil { + http.Error(w, fmt.Sprintf("Can't stat file %s: %v", entry.Name(), err), http.StatusInternalServerError) + return + } + if fileInfo.IsDir() { + continue + } + infos = append(infos, fileInfo) + } + + type file struct { + Name string `json:"name"` + Size int64 `json:"size"` + } + + files := make([]file, len(infos)) + + for _, fileInfo := range infos { + files = append(files, file{Name: fileInfo.Name(), Size: fileInfo.Size()}) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(files) +} + +func writeLogsRead(w http.ResponseWriter, r *http.Request, dirPath string) { + file := path.Base(r.URL.Path) + + if file == "/" || file == "." { + http.Error(w, "file is required - specify the name of log file to read", http.StatusBadRequest) + return + } + + offset, err := offsetValue(r.URL.Query()) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + fileInfo, err := os.Stat(filepath.Join(dirPath, file)) + + if err != nil { + http.Error(w, fmt.Sprintf("Can't stat file %s: %v", file, err), http.StatusInternalServerError) + return + } + + if fileInfo.IsDir() { + http.Error(w, fmt.Sprintf("%s is a directory, needs to be a file", file), http.StatusInternalServerError) + return + } + + if offset > fileInfo.Size() { + http.Error(w, fmt.Sprintf("offset %d must not be greater than this file size %d", offset, fileInfo.Size()), http.StatusBadRequest) + return + } + + f, err := os.Open(filepath.Join(dirPath, file)) + + if err != nil { + http.Error(w, fmt.Sprintf("Can't opening file %s: %v\n", file, err), http.StatusInternalServerError) + return + } + + limit, err := limitValue(r.URL.Query(), fileInfo.Size()) + + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + buf := make([]byte, limit) + + if _, err := f.Seek(offset, 0); err != nil { + http.Error(w, fmt.Sprintf("seek failed for file: %s to %d: %v", file, offset, err), http.StatusInternalServerError) + return + } + + var n int + var readTotal int + + for n, err = f.Read(buf[readTotal:]); err == nil && readTotal < len(buf); n, err = f.Read(buf[readTotal:]) { + readTotal += n + } + + if err != nil && !errors.Is(err, io.EOF) { + http.Error(w, fmt.Sprintf("Reading failed for: %s at %d: %v\n", file, readTotal, err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", strconv.FormatInt(int64(readTotal), 10)) + w.Header().Set("X-Offset", strconv.FormatInt(offset, 10)) + w.Header().Set("X-Limit", strconv.FormatInt(limit, 10)) + w.Header().Set("X-Size", strconv.FormatInt(fileInfo.Size(), 10)) + w.Write(buf[:readTotal]) +} + +func limitValue(values url.Values, def int64) (int64, error) { + limitStr := values.Get("limit") + + var limit int64 + var err error + + if limitStr == "" { + limit = def + } else { + limit, err = strconv.ParseInt(limitStr, 10, 64) + } + + if err != nil { + return 0, fmt.Errorf("limit %s is not a int64 number: %v", limitStr, err) + } + + return limit, nil +} + +func offsetValue(values url.Values) (int64, error) { + + offsetStr := values.Get("offset") + + var offset int64 + var err error + + if offsetStr != "" { + offset, err = strconv.ParseInt(offsetStr, 10, 64) + + if err != nil { + return 0, fmt.Errorf("offset %s is not a int64 number: %v", offsetStr, err) + } + } + + if offset < 0 { + return 0, fmt.Errorf("offset %d must be non-negative", offset) + } + + return offset, nil +} diff --git a/diagnostics/logs_access.go b/diagnostics/logs_access.go deleted file mode 100644 index 5380daf4abb..00000000000 --- a/diagnostics/logs_access.go +++ /dev/null @@ -1,122 +0,0 @@ -package diagnostics - -import ( - "errors" - "fmt" - "io" - "io/fs" - "net/http" - "os" - "path/filepath" - "strconv" - - "github.com/urfave/cli/v2" - - "github.com/ledgerwatch/erigon/turbo/logging" -) - -func SetupLogsAccess(ctx *cli.Context, metricsMux *http.ServeMux) { - dirPath := ctx.String(logging.LogDirPathFlag.Name) - if dirPath == "" { - datadir := ctx.String("datadir") - if datadir != "" { - dirPath = filepath.Join(datadir, "logs") - } - } - if dirPath == "" { - return - } - metricsMux.HandleFunc("/debug/metrics/logs/list", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - writeLogsList(w, dirPath) - }) - metricsMux.HandleFunc("/debug/metrics/logs/read", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - writeLogsRead(w, r, dirPath) - }) -} - -func writeLogsList(w io.Writer, dirPath string) { - entries, err := os.ReadDir(dirPath) - if err != nil { - fmt.Fprintf(w, "ERROR: listing directory %s: %v\n", dirPath, err) - return - } - //nolint: prealloc - var infos []fs.FileInfo - for _, entry := range entries { - fileInfo, err := os.Stat(filepath.Join(dirPath, entry.Name())) - if err != nil { - fmt.Fprintf(w, "ERROR: stat file %s: %v\n", entry.Name(), err) - return - } - if fileInfo.IsDir() { - continue - } - infos = append(infos, fileInfo) - } - fmt.Fprintf(w, "SUCCESS\n") - for _, fileInfo := range infos { - fmt.Fprintf(w, "%s | %d\n", fileInfo.Name(), fileInfo.Size()) - } -} - -func writeLogsRead(w io.Writer, r *http.Request, dirPath string) { - if err := r.ParseForm(); err != nil { - fmt.Fprintf(w, "ERROR: parsing arguments: %v\n", err) - return - } - file := r.Form.Get("file") - if file == "" { - fmt.Fprintf(w, "ERROR: file argument is required - specify the name of log file to read") - return - } - fileInfo, err := os.Stat(filepath.Join(dirPath, file)) - if err != nil { - fmt.Fprintf(w, "ERROR: stat file %s: %v\n", file, err) - return - } - if fileInfo.IsDir() { - fmt.Fprintf(w, "ERROR: %s is a directory, needs to be a file", file) - return - } - offsetStr := r.Form.Get("offset") - if offsetStr == "" { - fmt.Fprintf(w, "ERROR: offset argument is required - specify where to start reading in the file") - return - } - offset, err := strconv.ParseInt(offsetStr, 10, 64) - if err != nil { - fmt.Fprintf(w, "ERROR: offset %s is not a Uint64 number: %v\n", offsetStr, err) - return - } - if offset < 0 { - fmt.Fprintf(w, "ERROR: offset %d must be non-negative\n", offset) - return - } - if offset > fileInfo.Size() { - fmt.Fprintf(w, "ERROR: offset %d must be no greater than file size %d\n", offset, fileInfo.Size()) - return - } - f, err := os.Open(filepath.Join(dirPath, file)) - if err != nil { - fmt.Fprintf(w, "ERROR: opening file %s: %v\n", file, err) - return - } - var buf [16 * 1024]byte - if _, err := f.Seek(offset, 0); err != nil { - fmt.Fprintf(w, "ERROR: seeking in file: %s to %d: %v\n", file, offset, err) - return - } - var n int - var readTotal int - for n, err = f.Read(buf[readTotal:]); err == nil && readTotal < len(buf); n, err = f.Read(buf[readTotal:]) { - readTotal += n - } - if err != nil && !errors.Is(err, io.EOF) { - fmt.Fprintf(w, "ERROR: reading in file: %s at %d: %v\n", file, readTotal, err) - return - } - fmt.Fprintf(w, "SUCCESS: %d-%d/%d\n", offset, offset+int64(readTotal), fileInfo.Size()) - w.Write(buf[:readTotal]) -} diff --git a/diagnostics/nodeinfo.go b/diagnostics/nodeinfo.go new file mode 100644 index 00000000000..198aa77d7d2 --- /dev/null +++ b/diagnostics/nodeinfo.go @@ -0,0 +1,26 @@ +package diagnostics + +import ( + "encoding/json" + "net/http" + + "github.com/ledgerwatch/erigon/turbo/node" +) + +func SetupNodeInfoAccess(metricsMux *http.ServeMux, node *node.ErigonNode) { + metricsMux.HandleFunc("/nodeinfo", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + writeNodeInfo(w, node) + }) +} + +func writeNodeInfo(w http.ResponseWriter, node *node.ErigonNode) { + reply, err := node.Backend().NodesInfo(0) + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(reply) +} diff --git a/diagnostics/setup.go b/diagnostics/setup.go new file mode 100644 index 00000000000..97b40933286 --- /dev/null +++ b/diagnostics/setup.go @@ -0,0 +1,28 @@ +package diagnostics + +import ( + "net/http" + "strings" + + "github.com/ledgerwatch/erigon/turbo/node" + "github.com/urfave/cli/v2" +) + +func Setup(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) { + debugMux := http.NewServeMux() + + metricsMux.HandleFunc("/debug/", func(w http.ResponseWriter, r *http.Request) { + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/debug") + r.URL.RawPath = strings.TrimPrefix(r.URL.RawPath, "/debug") + debugMux.ServeHTTP(w, r) + }) + + SetupLogsAccess(ctx, debugMux) + SetupDbAccess(ctx, debugMux) + SetupCmdLineAccess(debugMux) + SetupFlagsAccess(ctx, debugMux) + SetupVersionAccess(debugMux) + SetupBlockBodyDownload(debugMux) + SetupHeaderDownloadStats(debugMux) + SetupNodeInfoAccess(debugMux, node) +} diff --git a/diagnostics/version.go b/diagnostics/version.go index c67c83d8e69..f54bfa73b64 100644 --- a/diagnostics/version.go +++ b/diagnostics/version.go @@ -1,8 +1,7 @@ package diagnostics import ( - "fmt" - "io" + "encoding/json" "net/http" "github.com/ledgerwatch/erigon/params" @@ -11,15 +10,17 @@ import ( const Version = 3 func SetupVersionAccess(metricsMux *http.ServeMux) { - metricsMux.HandleFunc("/debug/metrics/version", func(w http.ResponseWriter, r *http.Request) { + metricsMux.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - writeVersion(w) + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(struct { + Node int `json:"nodeVersion"` + Code string `json:"codeVersion"` + Git string `json:"gitCommit"` + }{ + Node: Version, + Code: params.VersionWithMeta, + Git: params.GitCommit, + }) }) } - -func writeVersion(w io.Writer) { - fmt.Fprintf(w, "SUCCESS\n") - fmt.Fprintf(w, "%d\n", Version) - fmt.Fprintf(w, "%s\n", params.VersionWithMeta) - fmt.Fprintf(w, "%s\n", params.GitCommit) -} diff --git a/rpc/client.go b/rpc/client.go index 06b0bb6eac3..3e61b8a393a 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -119,7 +119,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { func (cc *clientConn) close(err error, inflightReq *requestOp) { cc.handler.close(err, inflightReq) - cc.codec.close() + cc.codec.Close() } type readOp struct { @@ -526,7 +526,7 @@ func (c *Client) reconnect(ctx context.Context) error { c.writeConn = newconn return nil case <-c.didClose: - newconn.close() + newconn.Close() return ErrClientQuit } } @@ -627,7 +627,7 @@ func (c *Client) drainRead() { // read decodes RPC messages from a codec, feeding them into dispatch. func (c *Client) read(codec ServerCodec) { for { - msgs, batch, err := codec.readBatch() + msgs, batch, err := codec.ReadBatch() if _, ok := err.(*json.SyntaxError); ok { codec.writeJSON(context.Background(), errorMessage(&parseError{err.Error()})) } diff --git a/rpc/http.go b/rpc/http.go index df33c5d2ea0..f6812a6cfc3 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -63,12 +63,12 @@ func (hc *httpConn) remoteAddr() string { return hc.url } -func (hc *httpConn) readBatch() ([]*jsonrpcMessage, bool, error) { +func (hc *httpConn) ReadBatch() ([]*jsonrpcMessage, bool, error) { <-hc.closeCh return nil, false, io.EOF } -func (hc *httpConn) close() { +func (hc *httpConn) Close() { hc.closeOnce.Do(func() { close(hc.closeCh) }) } @@ -250,7 +250,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", contentType) codec := newHTTPServerConn(r, w) - defer codec.close() + defer codec.Close() var stream *jsoniter.Stream if !s.disableStreaming { stream = jsoniter.NewStream(jsoniter.ConfigDefault, w, 4096) diff --git a/rpc/json.go b/rpc/json.go index 0893ab549b5..abe2a02facb 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -202,7 +202,7 @@ func (c *jsonCodec) remoteAddr() string { return c.remote } -func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err error) { +func (c *jsonCodec) ReadBatch() (messages []*jsonrpcMessage, batch bool, err error) { // Decode the next JSON object in the input stream. // This verifies basic syntax, etc. var rawmsg json.RawMessage @@ -232,7 +232,7 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}) error { return c.encode(v) } -func (c *jsonCodec) close() { +func (c *jsonCodec) Close() { c.closer.Do(func() { close(c.closeCh) c.conn.Close() diff --git a/rpc/server.go b/rpc/server.go index c43c2788fce..34fba30c225 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -92,7 +92,7 @@ func (s *Server) RegisterName(name string, receiver interface{}) error { // // Note that codec options are no longer supported. func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { - defer codec.close() + defer codec.Close() // Don't serve if server is stopped. if atomic.LoadInt32(&s.run) == 0 { @@ -121,7 +121,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec, stre h.allowSubscribe = false defer h.close(io.EOF, nil) - reqs, batch, err := codec.readBatch() + reqs, batch, err := codec.ReadBatch() if err != nil { if err != io.EOF { codec.writeJSON(ctx, errorMessage(&invalidMessageError{"parse error"})) @@ -146,7 +146,7 @@ func (s *Server) Stop() { if atomic.CompareAndSwapInt32(&s.run, 1, 0) { s.logger.Info("RPC server shutting down") s.codecs.Each(func(c interface{}) bool { - c.(ServerCodec).close() + c.(ServerCodec).Close() return true }) } diff --git a/rpc/types.go b/rpc/types.go index 8ed915a21df..58683829c97 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -54,8 +54,8 @@ type DataError interface { // a RPC session. Implementations must be go-routine safe since the codec can be called in // multiple go-routines concurrently. type ServerCodec interface { - readBatch() (msgs []*jsonrpcMessage, isBatch bool, err error) - close() + ReadBatch() (msgs []*jsonrpcMessage, isBatch bool, err error) + Close() jsonWriter } diff --git a/rpc/websocket.go b/rpc/websocket.go index 6042aff7d6f..1901a9f655a 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -256,8 +256,8 @@ func newWebsocketCodec(conn *websocket.Conn) ServerCodec { return wc } -func (wc *websocketCodec) close() { - wc.jsonCodec.close() +func (wc *websocketCodec) Close() { + wc.jsonCodec.Close() wc.wg.Wait() } diff --git a/tests/state_test.go b/tests/state_test.go index e90f4513201..711294ef653 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -39,7 +39,7 @@ func TestState(t *testing.T) { defer log.Root().SetHandler(log.Root().GetHandler()) log.Root().SetHandler(log.LvlFilterHandler(log.LvlError, log.StderrHandler)) if runtime.GOOS == "windows" || runtime.GOOS == "darwin" { - t.Skip("fix me on win please") // it's too slow on win, need generally improve speed of this tests + t.Skip("fix me on win please") // it's too slow on win and stops on macos, need generally improve speed of this tests } //t.Parallel() diff --git a/turbo/app/backup_cmd.go b/turbo/app/backup_cmd.go index dc7aecda097..92def80a850 100644 --- a/turbo/app/backup_cmd.go +++ b/turbo/app/backup_cmd.go @@ -76,7 +76,7 @@ CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then havin ) func doBackup(cliCtx *cli.Context) error { - logger, err := debug.Setup(cliCtx, true /* rootLogger */) + logger, _, err := debug.Setup(cliCtx, true /* rootLogger */) if err != nil { return err } diff --git a/turbo/app/import_cmd.go b/turbo/app/import_cmd.go index d948e456254..257c262e5bd 100644 --- a/turbo/app/import_cmd.go +++ b/turbo/app/import_cmd.go @@ -54,7 +54,7 @@ func importChain(cliCtx *cli.Context) error { utils.Fatalf("This command requires an argument.") } - logger, err := debug.Setup(cliCtx, true /* rootLogger */) + logger, _, err := debug.Setup(cliCtx, true /* rootLogger */) if err != nil { return err } diff --git a/turbo/app/init_cmd.go b/turbo/app/init_cmd.go index ed4c01ac898..bb48e4fd4fe 100644 --- a/turbo/app/init_cmd.go +++ b/turbo/app/init_cmd.go @@ -37,7 +37,7 @@ It expects the genesis file as argument.`, func initGenesis(ctx *cli.Context) error { var logger log.Logger var err error - if logger, err = debug.Setup(ctx, true /* rootLogger */); err != nil { + if logger, _, err = debug.Setup(ctx, true /* rootLogger */); err != nil { return err } // Make sure we have a valid genesis JSON diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 260cead9ad5..7c7f557cc66 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -169,7 +169,7 @@ func doDiff(cliCtx *cli.Context) error { } func doDecompressSpeed(cliCtx *cli.Context) error { - logger, err := debug.Setup(cliCtx, true /* rootLogger */) + logger, _, err := debug.Setup(cliCtx, true /* rootLogger */) if err != nil { return err } @@ -210,7 +210,7 @@ func doDecompressSpeed(cliCtx *cli.Context) error { func doRam(cliCtx *cli.Context) error { var logger log.Logger var err error - if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { + if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { return err } defer logger.Info("Done") @@ -235,7 +235,7 @@ func doRam(cliCtx *cli.Context) error { } func doIndicesCommand(cliCtx *cli.Context) error { - logger, err := debug.Setup(cliCtx, true /* rootLogger */) + logger, _, err := debug.Setup(cliCtx, true /* rootLogger */) if err != nil { return err } @@ -285,7 +285,7 @@ func doIndicesCommand(cliCtx *cli.Context) error { func doUncompress(cliCtx *cli.Context) error { var logger log.Logger var err error - if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { + if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { return err } ctx := cliCtx.Context @@ -338,7 +338,7 @@ func doUncompress(cliCtx *cli.Context) error { func doCompress(cliCtx *cli.Context) error { var err error var logger log.Logger - if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { + if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { return err } ctx := cliCtx.Context @@ -388,7 +388,7 @@ func doCompress(cliCtx *cli.Context) error { func doRetireCommand(cliCtx *cli.Context) error { var logger log.Logger var err error - if logger, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { + if logger, _, err = debug.Setup(cliCtx, true /* rootLogger */); err != nil { return err } defer logger.Info("Done") diff --git a/turbo/app/support_cmd.go b/turbo/app/support_cmd.go index bf5fb09aec8..4a3d54fd17e 100644 --- a/turbo/app/support_cmd.go +++ b/turbo/app/support_cmd.go @@ -1,19 +1,23 @@ package app import ( - "bufio" "bytes" "context" "crypto/tls" - "encoding/binary" + "encoding/json" "fmt" "io" "net/http" + "net/url" "os" "os/signal" + "strconv" "syscall" "time" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" + "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/log/v3" "github.com/urfave/cli/v2" "golang.org/x/net/http2" @@ -24,30 +28,36 @@ var ( Name: "diagnostics.url", Usage: "URL of the diagnostics system provided by the support team, include unique session PIN", } - metricsURLsFlag = cli.StringSliceFlag{ - Name: "metrics.urls", - Usage: "Comma separated list of URLs to the metrics endpoints thats are being diagnosed", + + debugURLsFlag = cli.StringSliceFlag{ + Name: "debug.urls", + Usage: "Comma separated list of URLs to the debug endpoints thats are being diagnosed", } + insecureFlag = cli.BoolFlag{ Name: "insecure", Usage: "Allows communication with diagnostics system using self-signed TLS certificates", } + + sessionsFlag = cli.StringSliceFlag{ + Name: "diagnostics.sessions", + Usage: "Comma separated list of support session ids to connect to", + } ) var supportCommand = cli.Command{ Action: MigrateFlags(connectDiagnostics), Name: "support", Usage: "Connect Erigon instance to a diagnostics system for support", - ArgsUsage: "--diagnostics.url --metrics.urls ", + ArgsUsage: "--diagnostics.url --ids --metrics.urls ", Flags: []cli.Flag{ - &metricsURLsFlag, + &debugURLsFlag, &diagnosticsURLFlag, + &sessionsFlag, &insecureFlag, }, //Category: "SUPPORT COMMANDS", - Description: ` -The support command connects a running Erigon instances to a diagnostics system specified -by the URL.`, + Description: `The support command connects a running Erigon instances to a diagnostics system specified by the URL.`, } const Version = 1 @@ -63,10 +73,9 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - metricsURLs := cliCtx.StringSlice(metricsURLsFlag.Name) - metricsURL := metricsURLs[0] // TODO: Generalise + debugURLs := cliCtx.StringSlice(debugURLsFlag.Name) - diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + diagnosticsUrl := cliCtx.String(diagnosticsURLFlag.Name) + "/bridge" // Create TLS configuration with the certificate of the server insecure := cliCtx.Bool(insecureFlag.Name) @@ -74,9 +83,11 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error { InsecureSkipVerify: insecure, //nolint:gosec } + sessionIds := cliCtx.StringSlice(sessionsFlag.Name) + // Perform the requests in a loop (reconnect) for { - if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, metricsURL, logger); err != nil { + if err := tunnel(ctx, cancel, sigs, tlsConfig, diagnosticsUrl, sessionIds, debugURLs, logger); err != nil { return err } select { @@ -91,19 +102,35 @@ func ConnectDiagnostics(cliCtx *cli.Context, logger log.Logger) error { } } -var successLine = []byte("SUCCESS") +type conn struct { + io.ReadCloser + *io.PipeWriter +} + +func (c *conn) Close() error { + c.ReadCloser.Close() + c.PipeWriter.Close() + return nil +} + +func (c *conn) SetWriteDeadline(time time.Time) error { + return nil +} // tunnel operates the tunnel from diagnostics system to the metrics URL for one http/2 request // needs to be called repeatedly to implement re-connect logic -func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, metricsURL string, logger log.Logger) error { +func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, tlsConfig *tls.Config, diagnosticsUrl string, sessionIds []string, debugURLs []string, logger log.Logger) error { diagnosticsClient := &http.Client{Transport: &http2.Transport{TLSClientConfig: tlsConfig}} defer diagnosticsClient.CloseIdleConnections() metricsClient := &http.Client{} defer metricsClient.CloseIdleConnections() - // Create a request object to send to the server - reader, writer := io.Pipe() + ctx1, cancel1 := context.WithCancel(ctx) defer cancel1() + + // Create a request object to send to the server + reader, writer := io.Pipe() + go func() { select { case <-sigs: @@ -113,7 +140,74 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, reader.Close() writer.Close() }() + + type enode struct { + Enode string `json:"enode,omitempty"` + Enr string `json:"enr,omitempty"` + Ports *types.NodeInfoPorts `json:"ports,omitempty"` + ListenerAddr string `json:"listener_addr,omitempty"` + } + + type info struct { + Id string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Protocols json.RawMessage `json:"protocols,omitempty"` + Enodes []enode `json:"enodes,omitempty"` + } + + type node struct { + debugURL string + info *info + } + + nodes := map[string]*node{} + + for _, debugURL := range debugURLs { + debugResponse, err := metricsClient.Get(debugURL + "/debug/nodeinfo") + + if err != nil { + return err + } + + if debugResponse.StatusCode != http.StatusOK { + return fmt.Errorf("debug request to %s failed: %s", debugURL, debugResponse.Status) + } + + var reply remote.NodesInfoReply + + err = json.NewDecoder(debugResponse.Body).Decode(&reply) + + debugResponse.Body.Close() + + if err != nil { + return err + } + + for _, ni := range reply.NodesInfo { + if n, ok := nodes[ni.Id]; ok { + n.info.Enodes = append(n.info.Enodes, enode{ + Enode: ni.Enode, + Enr: ni.Enr, + Ports: ni.Ports, + ListenerAddr: ni.ListenerAddr, + }) + } else { + nodes[ni.Id] = &node{debugURL, &info{ + Id: ni.Id, + Name: ni.Name, + Protocols: ni.Protocols, + Enodes: []enode{{ + Enode: ni.Enode, + Enr: ni.Enr, + Ports: ni.Ports, + ListenerAddr: ni.ListenerAddr, + }}}} + } + } + } + req, err := http.NewRequestWithContext(ctx1, http.MethodPost, diagnosticsUrl, reader) + if err != nil { return err } @@ -121,66 +215,198 @@ func tunnel(ctx context.Context, cancel context.CancelFunc, sigs chan os.Signal, // Create a connection // Apply given context to the sent request resp, err := diagnosticsClient.Do(req) + if err != nil { return err } - defer resp.Body.Close() - defer writer.Close() - // Apply the connection context on the request context - var metricsBuf bytes.Buffer - r := bufio.NewReaderSize(resp.Body, 4096) - line, isPrefix, err := r.ReadLine() - if err != nil { - return fmt.Errorf("reading first line: %v", err) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("support request to %s failed: %s", diagnosticsUrl, resp.Status) } - if isPrefix { - return fmt.Errorf("request too long") - } - if !bytes.Equal(line, successLine) { - return fmt.Errorf("connecting to diagnostics system, first line [%s]", line) + + type connectionInfo struct { + Version uint64 `json:"version"` + Sessions []string `json:"sessions"` + Nodes []*info `json:"nodes"` } - var versionBytes [8]byte - binary.BigEndian.PutUint64(versionBytes[:], Version) - if _, err = writer.Write(versionBytes[:]); err != nil { - return fmt.Errorf("sending version: %v", err) + + err = json.NewEncoder(writer).Encode(&connectionInfo{ + Version: Version, + Sessions: sessionIds, + Nodes: func() (replies []*info) { + for _, node := range nodes { + replies = append(replies, node.info) + } + + return replies + }(), + }) + + if err != nil { + return err } logger.Info("Connected") - for line, isPrefix, err = r.ReadLine(); err == nil && !isPrefix; line, isPrefix, err = r.ReadLine() { - metricsBuf.Reset() - metricsResponse, err := metricsClient.Get(metricsURL + string(line)) - if err != nil { - fmt.Fprintf(&metricsBuf, "ERROR: Requesting metrics url [%s], query [%s], err: %v", metricsURL, line, err) - } else { - // Buffer the metrics response, and relay it back to the diagnostics system, prepending with the size - if _, err := io.Copy(&metricsBuf, metricsResponse.Body); err != nil { - metricsBuf.Reset() - fmt.Fprintf(&metricsBuf, "ERROR: Extracting metrics url [%s], query [%s], err: %v", metricsURL, line, err) + codec := rpc.NewCodec(&conn{ + ReadCloser: resp.Body, + PipeWriter: writer, + }) + defer codec.Close() + + for { + requests, _, err := codec.ReadBatch() + + select { + case <-ctx.Done(): + return nil + default: + if err != nil { + logger.Info("Breaking connection", "err", err) + return nil } - metricsResponse.Body.Close() } - var sizeBuf [4]byte - binary.BigEndian.PutUint32(sizeBuf[:], uint32(metricsBuf.Len())) - if _, err = writer.Write(sizeBuf[:]); err != nil { - logger.Error("Problem relaying metrics prefix len", "url", metricsURL, "query", line, "err", err) - break + + var requestId string + + if err = json.Unmarshal(requests[0].ID, &requestId); err != nil { + logger.Error("Invalid request id", "err", err) + continue } - if _, err = writer.Write(metricsBuf.Bytes()); err != nil { - logger.Error("Problem relaying", "url", metricsURL, "query", line, "err", err) - break + + nodeRequest := struct { + NodeId string `json:"nodeId"` + QueryParams url.Values `json:"queryParams"` + }{} + + if err = json.Unmarshal(requests[0].Params, &nodeRequest); err != nil { + logger.Error("Invalid node request", "err", err, "id", requestId) + continue } - } - if err != nil { - select { - case <-ctx.Done(): - default: - logger.Error("Breaking connection", "err", err) + + type responseError struct { + Code int64 `json:"code"` + Message string `json:"message"` + Data *json.RawMessage `json:"data,omitempty"` + } + + type nodeResponse struct { + Id string `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *responseError `json:"error,omitempty"` + Last bool `json:"last,omitempty"` + } + + if node, ok := nodes[nodeRequest.NodeId]; ok { + err := func() error { + var queryString string + + if len(nodeRequest.QueryParams) > 0 { + queryString = "?" + nodeRequest.QueryParams.Encode() + } + + debugURL := node.debugURL + "/debug/" + requests[0].Method + queryString + + debugResponse, err := metricsClient.Get(debugURL) + + if err != nil { + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: http.StatusFailedDependency, + Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err), + }, + Last: true, + }) + } + + defer debugResponse.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(debugResponse.Body) + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: int64(resp.StatusCode), + Message: fmt.Sprintf("Request for metrics method [%s] failed: %s", debugURL, string(body)), + }, + Last: true, + }) + } + + buffer := &bytes.Buffer{} + + switch debugResponse.Header.Get("Content-Type") { + case "application/json": + if _, err := io.Copy(buffer, debugResponse.Body); err != nil { + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: http.StatusInternalServerError, + Message: fmt.Sprintf("Request for metrics method [%s] failed: %v", debugURL, err), + }, + Last: true, + }) + } + case "application/octet-stream": + if _, err := io.Copy(buffer, debugResponse.Body); err != nil { + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: int64(http.StatusInternalServerError), + Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err), + }, + Last: true, + }) + } + + offset, _ := strconv.ParseInt(debugResponse.Header.Get("X-Offset"), 10, 64) + size, _ := strconv.ParseInt(debugResponse.Header.Get("X-Size"), 10, 64) + + data, err := json.Marshal(struct { + Offset int64 `json:"offset"` + Size int64 `json:"size"` + Data []byte `json:"chunk"` + }{ + Offset: offset, + Size: size, + Data: buffer.Bytes(), + }) + + buffer = bytes.NewBuffer(data) + + if err != nil { + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: int64(http.StatusInternalServerError), + Message: fmt.Sprintf("Can't copy metrics response for [%s]: %s", debugURL, err), + }, + Last: true, + }) + } + + default: + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Error: &responseError{ + Code: int64(http.StatusInternalServerError), + Message: fmt.Sprintf("Unhandled content type: %s, from: %s", debugResponse.Header.Get("Content-Type"), debugURL), + }, + Last: true, + }) + } + + return json.NewEncoder(writer).Encode(&nodeResponse{ + Id: requestId, + Result: json.RawMessage(buffer.Bytes()), + Last: true, + }) + }() + + if err != nil { + return err + } } } - if isPrefix { - logger.Error("Request too long, circuit breaker") - } - return nil } diff --git a/turbo/debug/flags.go b/turbo/debug/flags.go index 7e534c88a5f..e3ee9818720 100644 --- a/turbo/debug/flags.go +++ b/turbo/debug/flags.go @@ -31,7 +31,6 @@ import ( "gopkg.in/yaml.v2" "github.com/ledgerwatch/erigon/common/fdlimit" - "github.com/ledgerwatch/erigon/diagnostics" "github.com/ledgerwatch/erigon/metrics" "github.com/ledgerwatch/erigon/turbo/logging" ) @@ -176,7 +175,7 @@ func SetupCobra(cmd *cobra.Command, filePrefix string) log.Logger { // Setup initializes profiling and logging based on the CLI flags. // It should be called as early as possible in the program. -func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) { +func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, *http.ServeMux, error) { // ensure we've read in config file details before setting up metrics etc. if err := SetFlagsFromConfigFile(ctx); err != nil { log.Warn("failed setting config flags from yaml/toml file", "err", err) @@ -188,13 +187,13 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) { if traceFile := ctx.String(traceFlag.Name); traceFile != "" { if err := Handler.StartGoTrace(traceFile); err != nil { - return logger, err + return logger, nil, err } } if cpuFile := ctx.String(cpuprofileFlag.Name); cpuFile != "" { if err := Handler.StartCPUProfile(cpuFile); err != nil { - return logger, err + return logger, nil, err } } pprofEnabled := ctx.Bool(pprofFlag.Name) @@ -208,13 +207,6 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) { metricsPort := ctx.Int(metricsPortFlag.Name) metricsAddress = fmt.Sprintf("%s:%d", metricsAddr, metricsPort) metricsMux = metrics.Setup(metricsAddress, logger) - diagnostics.SetupLogsAccess(ctx, metricsMux) - diagnostics.SetupDbAccess(ctx, metricsMux) - diagnostics.SetupCmdLineAccess(metricsMux) - diagnostics.SetupFlagsAccess(ctx, metricsMux) - diagnostics.SetupVersionAccess(metricsMux) - diagnostics.SetupBlockBodyDownload(metricsMux) - diagnostics.SetupHeaderDownloadStats(metricsMux) } // pprof server @@ -228,7 +220,8 @@ func Setup(ctx *cli.Context, rootLogger bool) (log.Logger, error) { StartPProf(address, nil) } } - return logger, nil + + return logger, metricsMux, nil } func StartPProf(address string, metricsMux *http.ServeMux) { diff --git a/turbo/node/node.go b/turbo/node/node.go index e33061915d7..ed7d1d5db98 100644 --- a/turbo/node/node.go +++ b/turbo/node/node.go @@ -34,6 +34,10 @@ func (eri *ErigonNode) Serve() error { return nil } +func (eri *ErigonNode) Backend() *eth.Ethereum { + return eri.backend +} + func (eri *ErigonNode) Close() { eri.stack.Close() }