From b01baf6552e55e513fc1de5791de7d209614038a Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 15 Aug 2023 11:53:05 +0400 Subject: [PATCH] node: Make Peapod's flush interval configurable There may be a need to tune time interval b/w batch writes to disk in Peapod component. Add storage node's config with `flush_interval` key of type duration defaulting to 10ms. Signed-off-by: Leonard Lyubich --- cmd/blobovnicza-to-peapod/main.go | 3 +- cmd/neofs-node/config.go | 16 ++++++--- cmd/neofs-node/config/engine/config_test.go | 4 ++- .../engine/shard/blobstor/peapod/config.go | 34 +++++++++++++++++++ config/example/node.env | 1 + config/example/node.json | 5 +-- config/example/node.yaml | 1 + docs/storage-node-configuration.md | 1 + .../blobstor/common/storage_test.go | 3 +- .../blobstor/peapod/peapod.go | 20 ++++++++--- .../blobstor/peapod/peapod_test.go | 9 ++--- 11 files changed, 77 insertions(+), 20 deletions(-) create mode 100644 cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go diff --git a/cmd/blobovnicza-to-peapod/main.go b/cmd/blobovnicza-to-peapod/main.go index 2380da2f4d..80b8ba5d33 100644 --- a/cmd/blobovnicza-to-peapod/main.go +++ b/cmd/blobovnicza-to-peapod/main.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" @@ -67,7 +68,7 @@ func main() { } ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db") - ppd := peapod.New(ppdPath, perm) + ppd := peapod.New(ppdPath, perm, 10*time.Millisecond) var compressCfg compression.Config compressCfg.Enabled = sc.Compress() diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 3d419ead7f..74d143f6f7 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -25,6 +25,7 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" @@ -159,9 +160,11 @@ func (c *shardCfg) id() string { type subStorageCfg struct { // common for all storages - typ string - path string - perm fs.FileMode + typ string + path string + perm fs.FileMode + + // tree-specific (FS and blobovnicza) depth uint64 noSync bool @@ -169,6 +172,9 @@ type subStorageCfg struct { size uint64 width uint64 openedCacheSize int + + // Peapod-specific + flushInterval time.Duration } // readConfig fills applicationConfiguration with raw configuration values @@ -266,6 +272,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() case peapod.Type: + peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i])) + sCfg.flushInterval = peapodCfg.FlushInterval() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -736,7 +744,7 @@ func (c *cfg) shardOpts() []shardOptsWithID { }) case peapod.Type: ss = append(ss, blobstor.SubStorage{ - Storage: peapod.New(sRead.path, sRead.perm), + Storage: peapod.New(sRead.path, sRead.perm, sRead.flushInterval), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, diff --git a/cmd/neofs-node/config/engine/config_test.go b/cmd/neofs-node/config/engine/config_test.go index 4080458370..dc49f9a946 100644 --- a/cmd/neofs-node/config/engine/config_test.go +++ b/cmd/neofs-node/config/engine/config_test.go @@ -10,6 +10,7 @@ import ( shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" + peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod" piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama" configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod" @@ -133,10 +134,11 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 102400, sc.SmallSizeLimit()) require.Equal(t, 2, len(ss)) - + ppd := peapodconfig.From((*config.Config)(ss[0])) require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path()) require.EqualValues(t, 0644, ss[0].Perm()) require.EqualValues(t, peapod.Type, ss[0].Type()) + require.EqualValues(t, 30*time.Millisecond, ppd.FlushInterval()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0644, ss[1].Perm()) diff --git a/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go new file mode 100644 index 0000000000..cd01d21ca9 --- /dev/null +++ b/cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go @@ -0,0 +1,34 @@ +package peapodconfig + +import ( + "time" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" +) + +// Config is a wrapper over the config section +// which provides access to Peapod configurations. +type Config config.Config + +// Various Peapod config defaults. +const ( + // DefaultFlushInterval is a default time interval between Peapod's batch writes + // to disk. + DefaultFlushInterval = 10 * time.Millisecond +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// FlushInterval returns the value of "flush_interval" config parameter. +// +// Returns DefaultFlushInterval if the value is not a positive duration. +func (x *Config) FlushInterval() time.Duration { + d := config.DurationSafe((*config.Config)(x), "flush_interval") + if d > 0 { + return d + } + return DefaultFlushInterval +} diff --git a/config/example/node.env b/config/example/node.env index f9e1d49f84..5449b6567a 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -155,6 +155,7 @@ NEOFS_STORAGE_SHARD_1_SMALL_OBJECT_SIZE=102400 NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=peapod NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/peapod.db NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PERM=0644 +NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_FLUSH_INTERVAL=30ms ### FSTree config NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH=tmp/1/blob diff --git a/config/example/node.json b/config/example/node.json index 1e33668c90..fa11730f57 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -201,10 +201,7 @@ "type": "peapod", "path": "tmp/1/blob/peapod.db", "perm": "0644", - "size": 4194304, - "depth": 1, - "width": 4, - "opened_cache_capacity": 50 + "flush_interval": "30ms" }, { "type": "fstree", diff --git a/config/example/node.yaml b/config/example/node.yaml index f0823a172d..fde52a969f 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -196,6 +196,7 @@ storage: blobstor: - type: peapod path: tmp/1/blob/peapod.db # path to Peapod database + flush_interval: 30ms # time interval between batch writes to disk (defaults to 10ms) - type: fstree path: tmp/1/blob # blobstor path no_sync: true diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index ec30143bb0..6eccf54569 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -220,6 +220,7 @@ blobstor: |---------------------|-----------|---------------|-------------------------------------------------------| | `path` | `string` | | Path to the Peapod database file. | | `perm` | file mode | `0660` | Default permission for created files and directories. | +| `flush_interval` | `duration`| `10ms` | Time interval between batch writes to disk. | ### `gc` subsection diff --git a/pkg/local_object_storage/blobstor/common/storage_test.go b/pkg/local_object_storage/blobstor/common/storage_test.go index 793862cbb3..33f494f913 100644 --- a/pkg/local_object_storage/blobstor/common/storage_test.go +++ b/pkg/local_object_storage/blobstor/common/storage_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "path/filepath" "testing" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" @@ -43,7 +44,7 @@ func TestCopy(t *testing.T) { require.NoError(t, src.Close()) - dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600) + dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600, 10*time.Millisecond) err := common.Copy(dst, src) require.NoError(t, err) diff --git a/pkg/local_object_storage/blobstor/peapod/peapod.go b/pkg/local_object_storage/blobstor/peapod/peapod.go index 18d6569db8..20aa8c595e 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod.go @@ -43,6 +43,8 @@ type Peapod struct { path string perm fs.FileMode + flushInterval time.Duration + compress *compression.Config readOnly bool @@ -63,18 +65,24 @@ var errMissingRootBucket = errors.New("missing root bucket") // New creates new Peapod instance to be located at the given path with // specified permissions. -func New(path string, perm fs.FileMode) *Peapod { +// +// Specified flush interval MUST be positive (see Init). +func New(path string, perm fs.FileMode, flushInterval time.Duration) *Peapod { + if flushInterval <= 0 { + panic(fmt.Sprintf("non-positive flush interval %v", flushInterval)) + } return &Peapod{ path: path, perm: perm, + + flushInterval: flushInterval, } } func (x *Peapod) flushLoop() { defer close(x.chFlushDone) - const flushInterval = 10 * time.Millisecond - t := time.NewTimer(flushInterval) + t := time.NewTimer(x.flushInterval) defer t.Stop() for { @@ -88,7 +96,7 @@ func (x *Peapod) flushLoop() { x.flushCurrentBatch(true) - interval := flushInterval - time.Since(st) + interval := x.flushInterval - time.Since(st) if interval <= 0 { interval = time.Microsecond } @@ -207,7 +215,9 @@ func (x *Peapod) Open(readOnly bool) error { return nil } -// Init initializes internal structure of the underlying database. +// Init initializes internal structure of the underlying database and runs +// flushing routine. The routine writes data batches into disk once per time +// interval configured in New. func (x *Peapod) Init() error { if x.readOnly { // no extra actions needed in read-only mode diff --git a/pkg/local_object_storage/blobstor/peapod/peapod_test.go b/pkg/local_object_storage/blobstor/peapod/peapod_test.go index 199825e205..f428a72c41 100644 --- a/pkg/local_object_storage/blobstor/peapod/peapod_test.go +++ b/pkg/local_object_storage/blobstor/peapod/peapod_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest" @@ -23,20 +24,20 @@ func TestGeneric(t *testing.T) { } blobstortest.TestAll(t, func(t *testing.T) common.Storage { - return peapod.New(newPath(), 0600) + return peapod.New(newPath(), 0600, 10*time.Millisecond) }, 2048, 16*1024) t.Run("info", func(t *testing.T) { path := newPath() blobstortest.TestInfo(t, func(t *testing.T) common.Storage { - return peapod.New(path, 0600) + return peapod.New(path, 0600, 10*time.Millisecond) }, peapod.Type, path) }) } func TestControl(t *testing.T) { blobstortest.TestControl(t, func(t *testing.T) common.Storage { - return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600) + return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600, 10*time.Millisecond) }, 2048, 2048) } @@ -73,7 +74,7 @@ func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) { } func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod { - ppd := peapod.New(path, 0600) + ppd := peapod.New(path, 0600, 10*time.Millisecond) require.NoError(tb, ppd.Open(readOnly)) require.NoError(tb, ppd.Init())