Skip to content

Commit

Permalink
Merge pull request #1738 from OffchainLabs/rpc-memory-throttle
Browse files Browse the repository at this point in the history
Reject RPC requests if memory limit is exceeded
  • Loading branch information
Tristan-Wilson authored Jul 7, 2023
2 parents ca73cbc + 95aeec7 commit ae00aa6
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 1 deletion.
4 changes: 4 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/execution"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -329,6 +330,7 @@ type Config struct {
TxLookupLimit uint64 `koanf:"tx-lookup-limit"`
TransactionStreamer TransactionStreamerConfig `koanf:"transaction-streamer" reload:"hot"`
Maintenance MaintenanceConfig `koanf:"maintenance" reload:"hot"`
ResourceManagement resourcemanager.Config `koanf:"resource-mgmt" reload:"hot"`
}

func (c *Config) Validate() error {
Expand Down Expand Up @@ -402,6 +404,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet, feedInputEnable bool, feed
f.Uint64(prefix+".tx-lookup-limit", ConfigDefault.TxLookupLimit, "retain the ability to lookup transactions by hash for the past N blocks (0 = all blocks)")
TransactionStreamerConfigAddOptions(prefix+".transaction-streamer", f)
MaintenanceConfigAddOptions(prefix+".maintenance", f)
resourcemanager.ConfigAddOptions(prefix+".resource-mgmt", f)

archiveMsg := fmt.Sprintf("retain past block state (deprecated, please use %v.caching.archive)", prefix)
f.Bool(prefix+".archive", ConfigDefault.Archive, archiveMsg)
Expand All @@ -428,6 +431,7 @@ var ConfigDefault = Config{
TxLookupLimit: 126_230_400, // 1 year at 4 blocks per second
Caching: execution.DefaultCachingConfig,
TransactionStreamer: DefaultTransactionStreamerConfig,
ResourceManagement: resourcemanager.DefaultConfig,
}

func ConfigDefaultL1Test() *Config {
Expand Down
207 changes: 207 additions & 0 deletions arbnode/resourcemanager/resource_management.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package resourcemanager

import (
"bufio"
"errors"
"fmt"
"net/http"
"os"
"regexp"
"strconv"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/spf13/pflag"
)

var (
limitCheckDurationHistogram = metrics.NewRegisteredHistogram("arb/rpc/limitcheck/duration", nil, metrics.NewBoundedHistogramSample())
limitCheckSuccessCounter = metrics.NewRegisteredCounter("arb/rpc/limitcheck/success", nil)
limitCheckFailureCounter = metrics.NewRegisteredCounter("arb/rpc/limitcheck/failure", nil)
)

// Init adds the resource manager's httpServer to a custom hook in geth.
// Geth will add it to the stack of http.Handlers so that it is run
// prior to RPC request handling.
//
// Must be run before the go-ethereum stack is set up (ethereum/go-ethereum/node.New).
func Init(conf *Config) {
if conf.MemoryLimitPercent > 0 {
node.WrapHTTPHandler = func(srv http.Handler) (http.Handler, error) {
return newHttpServer(srv, newLimitChecker(conf)), nil
}
}
}

// Config contains the configuration for resourcemanager functionality.
// Currently only a memory limit is supported, other limits may be added
// in the future.
type Config struct {
MemoryLimitPercent int `koanf:"mem-limit-percent" reload:"hot"`
}

// DefaultConfig has the defaul resourcemanager configuration,
// all limits are disabled.
var DefaultConfig = Config{
MemoryLimitPercent: 0,
}

// ConfigAddOptions adds the configuration options for resourcemanager.
func ConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Int(prefix+".mem-limit-percent", DefaultConfig.MemoryLimitPercent, "RPC calls are throttled if system memory utilization exceeds this percent value, zero (default) is disabled")
}

// httpServer implements http.Handler and wraps calls to inner with a resource
// limit check.
type httpServer struct {
inner http.Handler
c limitChecker
}

func newHttpServer(inner http.Handler, c limitChecker) *httpServer {
return &httpServer{inner: inner, c: c}
}

// ServeHTTP passes req to inner unless any configured system resource
// limit is exceeded, in which case it returns a HTTP 429 error.
func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
start := time.Now()
exceeded, err := s.c.isLimitExceeded()
limitCheckDurationHistogram.Update(time.Since(start).Nanoseconds())
if err != nil {
log.Error("Error checking memory limit", "err", err, "checker", s.c)
} else if exceeded {
http.Error(w, "Too many requests", http.StatusTooManyRequests)
limitCheckFailureCounter.Inc(1)
return
}

limitCheckSuccessCounter.Inc(1)
s.inner.ServeHTTP(w, req)
}

type limitChecker interface {
isLimitExceeded() (bool, error)
String() string
}

// newLimitChecker attempts to auto-discover the mechanism by which it
// can check system limits. Currently Cgroups V1 is supported,
// with Cgroups V2 likely to be implmemented next. If no supported
// mechanism is discovered, it logs an error and fails open, ie
// it creates a trivialLimitChecker that does no checks.
func newLimitChecker(conf *Config) limitChecker {
c := newCgroupsV1MemoryLimitChecker(DefaultCgroupsV1MemoryDirectory, conf.MemoryLimitPercent)
if isSupported(c) {
log.Info("Cgroups v1 detected, enabling memory limit RPC throttling")
return c
}

log.Error("No method for determining memory usage and limits was discovered, disabled memory limit RPC throttling")
return &trivialLimitChecker{}
}

// trivialLimitChecker checks no limits, so its limits are never exceeded.
type trivialLimitChecker struct{}

func (_ trivialLimitChecker) isLimitExceeded() (bool, error) {
return false, nil
}

func (_ trivialLimitChecker) String() string { return "trivial" }

const DefaultCgroupsV1MemoryDirectory = "/sys/fs/cgroup/memory/"

type cgroupsV1MemoryLimitChecker struct {
cgroupDir string
memoryLimitPercent int

limitFile, usageFile, statsFile string
}

func newCgroupsV1MemoryLimitChecker(cgroupDir string, memoryLimitPercent int) *cgroupsV1MemoryLimitChecker {
return &cgroupsV1MemoryLimitChecker{
cgroupDir: cgroupDir,
memoryLimitPercent: memoryLimitPercent,
limitFile: cgroupDir + "/memory.limit_in_bytes",
usageFile: cgroupDir + "/memory.usage_in_bytes",
statsFile: cgroupDir + "/memory.stat",
}
}

func isSupported(c limitChecker) bool {
_, err := c.isLimitExceeded()
return err == nil
}

// isLimitExceeded checks if the system memory used exceeds the limit
// scaled by the configured memoryLimitPercent.
//
// See the following page for details of calculating the memory used,
// which is reported as container_memory_working_set_bytes in prometheus:
// https://mihai-albert.com/2022/02/13/out-of-memory-oom-in-kubernetes-part-3-memory-metrics-sources-and-tools-to-collect-them/
func (c *cgroupsV1MemoryLimitChecker) isLimitExceeded() (bool, error) {
var limit, usage, inactive int
var err error
limit, err = readIntFromFile(c.limitFile)
if err != nil {
return false, err
}
usage, err = readIntFromFile(c.usageFile)
if err != nil {
return false, err
}
inactive, err = readInactive(c.statsFile)
if err != nil {
return false, err
}
return usage-inactive >= ((limit * c.memoryLimitPercent) / 100), nil
}

func readIntFromFile(fileName string) (int, error) {
file, err := os.Open(fileName)
if err != nil {
return 0, err
}

var limit int
if _, err = fmt.Fscanf(file, "%d", &limit); err != nil {
return 0, err
}
return limit, nil
}

var re = regexp.MustCompile(`total_inactive_file (\d+)`)

func readInactive(fileName string) (int, error) {
file, err := os.Open(fileName)
if err != nil {
return 0, err
}

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()

matches := re.FindStringSubmatch(line)

if len(matches) >= 2 {
inactive, err := strconv.Atoi(matches[1])
if err != nil {
return 0, err
}
return inactive, nil
}
}

return 0, errors.New("total_inactive_file not found in " + fileName)
}

func (c cgroupsV1MemoryLimitChecker) String() string {
return "CgroupsV1MemoryLimitChecker"
}
78 changes: 78 additions & 0 deletions arbnode/resourcemanager/resource_management_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package resourcemanager

import (
"fmt"
"os"
"testing"
)

func updateFakeCgroupv1Files(c *cgroupsV1MemoryLimitChecker, limit, usage, inactive int) error {
limitFile, err := os.Create(c.limitFile)
if err != nil {
return err
}
_, err = fmt.Fprintf(limitFile, "%d\n", limit)
if err != nil {
return err
}

usageFile, err := os.Create(c.usageFile)
if err != nil {
return err
}
_, err = fmt.Fprintf(usageFile, "%d\n", usage)
if err != nil {
return err
}

statsFile, err := os.Create(c.statsFile)
if err != nil {
return err
}
_, err = fmt.Fprintf(statsFile, `total_cache 1029980160
total_rss 1016209408
total_inactive_file %d
total_active_file 321544192
`, inactive)
if err != nil {
return err
}
return nil
}

func TestCgroupsv1MemoryLimit(t *testing.T) {
cgroupDir := t.TempDir()
c := newCgroupsV1MemoryLimitChecker(cgroupDir, 95)
_, err := c.isLimitExceeded()
if err == nil {
t.Error("Should fail open if can't read files")
}

err = updateFakeCgroupv1Files(c, 1000, 1000, 51)
if err != nil {
t.Error(err)
}
exceeded, err := c.isLimitExceeded()
if err != nil {
t.Error(err)
}
if exceeded {
t.Error("Expected under limit")
}

err = updateFakeCgroupv1Files(c, 1000, 1000, 50)
if err != nil {
t.Error(err)
}
exceeded, err = c.isLimitExceeded()
if err != nil {
t.Error(err)
}
if !exceeded {
t.Error("Expected over limit")
}

}
3 changes: 3 additions & 0 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"github.com/offchainlabs/nitro/arbnode"
"github.com/offchainlabs/nitro/arbnode/execution"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/conf"
"github.com/offchainlabs/nitro/cmd/genericconf"
Expand Down Expand Up @@ -321,6 +322,8 @@ func mainImpl() int {
nodeConfig.Node.TxLookupLimit = 0
}

resourcemanager.Init(&nodeConfig.Node.ResourceManagement)

stack, err := node.New(&stackConf)
if err != nil {
flag.Usage()
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
Submodule go-ethereum updated 1 files
+12 −1 node/rpcstack.go

0 comments on commit ae00aa6

Please sign in to comment.