diff --git a/arbnode/node.go b/arbnode/node.go index f1fc8e9e63..02b9857877 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/execution" + "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" "github.com/offchainlabs/nitro/broadcastclients" @@ -329,6 +330,7 @@ type Config struct { TxLookupLimit uint64 `koanf:"tx-lookup-limit"` TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"` Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"` + ResourceManagement resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"` } func (c *Config) Validate() error { @@ -402,6 +404,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)") TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f) MaintenanceConfigAddOptions(prefix+".maintenance", f) + resourcemanager.ConfigAddOptions(prefix+".resource-mgmt", f) archiveMsg := fmt.Sprintf("retain past block state (deprecated, please use %v.caching.archive)", prefix) f.Bool(prefix+".archive", ConfigDefault.Archive, archiveMsg) @@ -428,6 +431,7 @@ var ConfigDefault = Config{ TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second Caching: execution.DefaultCachingConfig, TransactionStreamer: DefaultTransactionStreamerConfig, + ResourceManagement: resourcemanager.DefaultConfig, } func ConfigDefaultL1Test() *Config { diff --git a/arbnode/resourcemanager/resource_management.go b/arbnode/resourcemanager/resource_management.go new file mode 100644 index 0000000000..acb5355987 --- /dev/null +++ b/arbnode/resourcemanager/resource_management.go @@ -0,0 +1,207 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package resourcemanager + +import ( + "bufio" + "errors" + "fmt" + "net/http" + "os" + "regexp" + "strconv" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/node" + "github.com/spf13/pflag" +) + +var ( + limitCheckDurationHistogram = metrics.NewRegisteredHistogram("arb/rpc/limitcheck/duration", nil, metrics.NewBoundedHistogramSample()) + limitCheckSuccessCounter = metrics.NewRegisteredCounter("arb/rpc/limitcheck/success", nil) + limitCheckFailureCounter = metrics.NewRegisteredCounter("arb/rpc/limitcheck/failure", nil) +) + +// Init adds the resource manager's httpServer to a custom hook in geth. +// Geth will add it to the stack of http.Handlers so that it is run +// prior to RPC request handling. +// +// Must be run before the go-ethereum stack is set up (ethereum/go-ethereum/node.New). +func Init(conf *Config) { + if conf.MemoryLimitPercent > 0 { + node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) { + return newHttpServer(srv, newLimitChecker(conf)), nil + } + } +} + +// Config contains the configuration for resourcemanager functionality. +// Currently only a memory limit is supported, other limits may be added +// in the future. +type Config struct { + MemoryLimitPercent int `koanf:"mem-limit-percent" reload:"hot"` +} + +// DefaultConfig has the defaul resourcemanager configuration, +// all limits are disabled. +var DefaultConfig = Config{ + MemoryLimitPercent: 0, +} + +// ConfigAddOptions adds the configuration options for resourcemanager. +func ConfigAddOptions(prefix string, f *pflag.FlagSet) { + f.Int(prefix+".mem-limit-percent", DefaultConfig.MemoryLimitPercent, "RPC calls are throttled if system memory utilization exceeds this percent value, zero (default) is disabled") +} + +// httpServer implements http.Handler and wraps calls to inner with a resource +// limit check. +type httpServer struct { + inner http.Handler + c limitChecker +} + +func newHttpServer(inner http.Handler, c limitChecker) *httpServer { + return &httpServer{inner: inner, c: c} +} + +// ServeHTTP passes req to inner unless any configured system resource +// limit is exceeded, in which case it returns a HTTP 429 error. +func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + start := time.Now() + exceeded, err := s.c.isLimitExceeded() + limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds()) + if err != nil { + log.Error("Error checking memory limit", "err", err, "checker", s.c) + } else if exceeded { + http.Error(w, "Too many requests", http.StatusTooManyRequests) + limitCheckFailureCounter.Inc(1) + return + } + + limitCheckSuccessCounter.Inc(1) + s.inner.ServeHTTP(w, req) +} + +type limitChecker interface { + isLimitExceeded() (bool, error) + String() string +} + +// newLimitChecker attempts to auto-discover the mechanism by which it +// can check system limits. Currently Cgroups V1 is supported, +// with Cgroups V2 likely to be implmemented next. If no supported +// mechanism is discovered, it logs an error and fails open, ie +// it creates a trivialLimitChecker that does no checks. +func newLimitChecker(conf *Config) limitChecker { + c := newCgroupsV1MemoryLimitChecker(DefaultCgroupsV1MemoryDirectory, conf.MemoryLimitPercent) + if isSupported(c) { + log.Info("Cgroups v1 detected, enabling memory limit RPC throttling") + return c + } + + log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling") + return &trivialLimitChecker{} +} + +// trivialLimitChecker checks no limits, so its limits are never exceeded. +type trivialLimitChecker struct{} + +func (_ trivialLimitChecker) isLimitExceeded() (bool, error) { + return false, nil +} + +func (_ trivialLimitChecker) String() string { return "trivial" } + +const DefaultCgroupsV1MemoryDirectory = "/sys/fs/cgroup/memory/" + +type cgroupsV1MemoryLimitChecker struct { + cgroupDir string + memoryLimitPercent int + + limitFile, usageFile, statsFile string +} + +func newCgroupsV1MemoryLimitChecker(cgroupDir string, memoryLimitPercent int) *cgroupsV1MemoryLimitChecker { + return &cgroupsV1MemoryLimitChecker{ + cgroupDir: cgroupDir, + memoryLimitPercent: memoryLimitPercent, + limitFile: cgroupDir + "/memory.limit_in_bytes", + usageFile: cgroupDir + "/memory.usage_in_bytes", + statsFile: cgroupDir + "/memory.stat", + } +} + +func isSupported(c limitChecker) bool { + _, err := c.isLimitExceeded() + return err == nil +} + +// isLimitExceeded checks if the system memory used exceeds the limit +// scaled by the configured memoryLimitPercent. +// +// See the following page for details of calculating the memory used, +// which is reported as container_memory_working_set_bytes in prometheus: +// https://mihai-albert.com/2022/02/13/out-of-memory-oom-in-kubernetes-part-3-memory-metrics-sources-and-tools-to-collect-them/ +func (c *cgroupsV1MemoryLimitChecker) isLimitExceeded() (bool, error) { + var limit, usage, inactive int + var err error + limit, err = readIntFromFile(c.limitFile) + if err != nil { + return false, err + } + usage, err = readIntFromFile(c.usageFile) + if err != nil { + return false, err + } + inactive, err = readInactive(c.statsFile) + if err != nil { + return false, err + } + return usage-inactive >= ((limit * c.memoryLimitPercent) / 100), nil +} + +func readIntFromFile(fileName string) (int, error) { + file, err := os.Open(fileName) + if err != nil { + return 0, err + } + + var limit int + if _, err = fmt.Fscanf(file, "%d", &limit); err != nil { + return 0, err + } + return limit, nil +} + +var re = regexp.MustCompile(`total_inactive_file (\d+)`) + +func readInactive(fileName string) (int, error) { + file, err := os.Open(fileName) + if err != nil { + return 0, err + } + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + + matches := re.FindStringSubmatch(line) + + if len(matches) >= 2 { + inactive, err := strconv.Atoi(matches[1]) + if err != nil { + return 0, err + } + return inactive, nil + } + } + + return 0, errors.New("total_inactive_file not found in " + fileName) +} + +func (c cgroupsV1MemoryLimitChecker) String() string { + return "CgroupsV1MemoryLimitChecker" +} diff --git a/arbnode/resourcemanager/resource_management_test.go b/arbnode/resourcemanager/resource_management_test.go new file mode 100644 index 0000000000..fe470e706b --- /dev/null +++ b/arbnode/resourcemanager/resource_management_test.go @@ -0,0 +1,78 @@ +// Copyright 2023, Offchain Labs, Inc. +// For license information, see https://github.com/nitro/blob/master/LICENSE + +package resourcemanager + +import ( + "fmt" + "os" + "testing" +) + +func updateFakeCgroupv1Files(c *cgroupsV1MemoryLimitChecker, limit, usage, inactive int) error { + limitFile, err := os.Create(c.limitFile) + if err != nil { + return err + } + _, err = fmt.Fprintf(limitFile, "%d\n", limit) + if err != nil { + return err + } + + usageFile, err := os.Create(c.usageFile) + if err != nil { + return err + } + _, err = fmt.Fprintf(usageFile, "%d\n", usage) + if err != nil { + return err + } + + statsFile, err := os.Create(c.statsFile) + if err != nil { + return err + } + _, err = fmt.Fprintf(statsFile, `total_cache 1029980160 +total_rss 1016209408 +total_inactive_file %d +total_active_file 321544192 +`, inactive) + if err != nil { + return err + } + return nil +} + +func TestCgroupsv1MemoryLimit(t *testing.T) { + cgroupDir := t.TempDir() + c := newCgroupsV1MemoryLimitChecker(cgroupDir, 95) + _, err := c.isLimitExceeded() + if err == nil { + t.Error("Should fail open if can't read files") + } + + err = updateFakeCgroupv1Files(c, 1000, 1000, 51) + if err != nil { + t.Error(err) + } + exceeded, err := c.isLimitExceeded() + if err != nil { + t.Error(err) + } + if exceeded { + t.Error("Expected under limit") + } + + err = updateFakeCgroupv1Files(c, 1000, 1000, 50) + if err != nil { + t.Error(err) + } + exceeded, err = c.isLimitExceeded() + if err != nil { + t.Error(err) + } + if !exceeded { + t.Error("Expected over limit") + } + +} diff --git a/cmd/nitro/nitro.go b/cmd/nitro/nitro.go index 0035171078..3074ca7f87 100644 --- a/cmd/nitro/nitro.go +++ b/cmd/nitro/nitro.go @@ -38,6 +38,7 @@ import ( "github.com/offchainlabs/nitro/arbnode" "github.com/offchainlabs/nitro/arbnode/execution" + "github.com/offchainlabs/nitro/arbnode/resourcemanager" "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/cmd/conf" "github.com/offchainlabs/nitro/cmd/genericconf" @@ -321,6 +322,8 @@ func mainImpl() int { nodeConfig.Node.TxLookupLimit = 0 } + resourcemanager.Init(&nodeConfig.Node.ResourceManagement) + stack, err := node.New(&stackConf) if err != nil { flag.Usage() diff --git a/go-ethereum b/go-ethereum index 8e6a8ad494..28127f5941 160000 --- a/go-ethereum +++ b/go-ethereum @@ -1 +1 @@ -Subproject commit 8e6a8ad4942591011e833e6ebceca6bd668f3db0 +Subproject commit 28127f5941faec6fe5227c29443d2074639495d0