Skip to content

Commit

Permalink
feat(admin): add a new flag replica-selector to the varlogadm
Browse files Browse the repository at this point in the history
It added a new flag, `replica-selector`, to the varlogadm. The flag sets the global default replica
selector.

Updates #393
  • Loading branch information
ijsong committed Apr 6, 2023
1 parent 1892ae9 commit 805f8de
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 21 deletions.
10 changes: 9 additions & 1 deletion cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func newStartCommand() *cli.Command {
flagLogStreamGCTimeout.DurationFlag(false, admin.DefaultLogStreamGCTimeout),
flagDisableAutoLogStreamSync.BoolFlag(),
flagAutoUnseal.BoolFlag(),
flagReplicaSelector,

flagMetadataRepository.StringSliceFlag(true, nil),
flagInitMRConnRetryCount.IntFlag(false, mrmanager.DefaultInitialMRConnectRetryCount),
Expand Down Expand Up @@ -95,10 +96,17 @@ func start(c *cli.Context) error {
return err
}

repfactor := c.Uint(flagReplicationFactor.Name)
repsel, err := admin.NewReplicaSelector(c.String(flagReplicaSelector.Name), mrMgr.ClusterMetadataView(), int(repfactor))
if err != nil {
return err
}

opts := []admin.Option{
admin.WithLogger(logger),
admin.WithListenAddress(c.String(flagListen.Name)),
admin.WithReplicationFactor(c.Uint(flagReplicationFactor.Name)),
admin.WithReplicationFactor(repfactor),
admin.WithReplicaSelector(repsel),
admin.WithLogStreamGCTimeout(c.Duration(flagLogStreamGCTimeout.Name)),
admin.WithMetadataRepositoryManager(mrMgr),
admin.WithStorageNodeManager(snMgr),
Expand Down
12 changes: 12 additions & 0 deletions cmd/varlogadm/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package main

import (
"strings"

"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/admin"
"github.com/kakao/varlog/internal/flags"
)

Expand Down Expand Up @@ -30,6 +35,13 @@ var (
Aliases: []string{"enable-auto-unseal", "with-auto-unseal"},
Envs: []string{"AUTO_UNSEAL", "ENABLE_AUTO_UNSEAL", "WITH_AUTO_UNSEAL"},
}
flagReplicaSelector = &cli.StringFlag{
Name: "replica-selector",
Aliases: []string{"repsel"},
EnvVars: []string{"REPLICA_SELECTOR"},
Value: admin.ReplicaSelectorNameLFU,
Usage: strings.Join([]string{admin.ReplicaSelectorNameRandom, admin.ReplicaSelectorNameLFU}, " | "),
}

flagInitMRConnRetryCount = flags.FlagDesc{
Name: "init-mr-conn-retry-count",
Expand Down
9 changes: 9 additions & 0 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ func (adm *Admin) Serve() error {
}
adm.mu.Unlock()

if ce := adm.logger.Check(zap.InfoLevel, "starting"); ce != nil {
ce.Write(
zap.String("address", adm.serverAddr),
zap.Uint("replicationFactor", adm.replicationFactor),
zap.String("replicationSelector", adm.snSelector.Name()),
zap.Duration("logStreamGCTimeout", adm.logStreamGCTimeout),
)
}

return adm.server.Serve(lis)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (cfg config) validate() error {

func (cfg *config) ensureDefault() error {
if cfg.snSelector == nil {
rs, err := newReplicaSelector(replicaSelectorNameLFU, cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor))
rs, err := NewReplicaSelector(ReplicaSelectorNameLFU, cfg.mrmgr.ClusterMetadataView(), int(cfg.replicationFactor))
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions internal/admin/replica_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type ReplicaSelector interface {
}

const (
replicaSelectorNameRandom = "random"
replicaSelectorNameLFU = "lfu" // least frequently used
ReplicaSelectorNameRandom = "random"
ReplicaSelectorNameLFU = "lfu" // least frequently used
)

func newReplicaSelector(selector string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) {
func NewReplicaSelector(selector string, cmview mrmanager.ClusterMetadataView, repfactor int) (ReplicaSelector, error) {
if repfactor < 1 {
return nil, errors.Wrap(verrors.ErrInvalid, "replica selector: negative replication factor")
}
Expand All @@ -45,9 +45,9 @@ func newReplicaSelector(selector string, cmview mrmanager.ClusterMetadataView, r
}

switch strings.ToLower(selector) {
case replicaSelectorNameRandom:
case ReplicaSelectorNameRandom:
return newRandomReplicaSelector(cmview, repfactor)
case replicaSelectorNameLFU:
case ReplicaSelectorNameLFU:
return newLFUSelector(cmview, repfactor)
default:
return nil, fmt.Errorf("unknown selector: %s", selector)
Expand All @@ -72,7 +72,7 @@ func newRandomReplicaSelector(cmview mrmanager.ClusterMetadataView, repfactor in
}

func (s *randomReplicaSelector) Name() string {
return replicaSelectorNameRandom
return ReplicaSelectorNameRandom
}

func (s *randomReplicaSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
Expand Down Expand Up @@ -131,7 +131,7 @@ func newLFUSelector(cmview mrmanager.ClusterMetadataView, repfactor int) (*lfuSe
}

func (s *lfuSelector) Name() string {
return replicaSelectorNameLFU
return ReplicaSelectorNameLFU
}

func (s *lfuSelector) Select(ctx context.Context) ([]*varlogpb.ReplicaDescriptor, error) {
Expand Down
24 changes: 12 additions & 12 deletions internal/admin/replica_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestReplicaSelector_NewUnknown(t *testing.T) {
cmview := mrmanager.NewMockClusterMetadataView(ctrl)
cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes()

_, err := newReplicaSelector("foo", cmview, 1)
_, err := NewReplicaSelector("foo", cmview, 1)
require.Error(t, err)

}
Expand All @@ -34,8 +34,8 @@ func TestReplicaSelector_New(t *testing.T) {
cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(&varlogpb.MetadataDescriptor{}, nil).AnyTimes()

selectors := []string{
replicaSelectorNameRandom,
replicaSelectorNameLFU,
ReplicaSelectorNameRandom,
ReplicaSelectorNameLFU,
}
tcs := []struct {
name string
Expand All @@ -57,7 +57,7 @@ func TestReplicaSelector_New(t *testing.T) {
for _, tc := range tcs {
for _, selector := range selectors {
t.Run(selector+tc.name, func(t *testing.T) {
_, err := newReplicaSelector(selector, tc.cmview, tc.repfactor)
_, err := NewReplicaSelector(selector, tc.cmview, tc.repfactor)
require.Error(t, err)
})
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestReplicaSelector(t *testing.T) {
tcs := []*testCase{
{
name: "NotEnoughStorageNodes",
selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameRandom, ReplicaSelectorNameLFU},
repfactor: 1,
md: &varlogpb.MetadataDescriptor{},
testf: func(t *testing.T, _ *testCase, s ReplicaSelector) {
Expand All @@ -112,7 +112,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "InvalidClusterMetadataInvalidStorageNodeID",
selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameRandom, ReplicaSelectorNameLFU},
repfactor: 1,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand All @@ -130,7 +130,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "InvalidClusterMetadataNoStoragePath",
selectors: []string{replicaSelectorNameRandom, replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameRandom, ReplicaSelectorNameLFU},
repfactor: 1,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand All @@ -148,7 +148,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "InvalidClusterMetadataNoStorageNode",
selectors: []string{replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameLFU},
repfactor: 1,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "InvalidClusterMetadataNoStorageNodePath",
selectors: []string{replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameLFU},
repfactor: 1,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "Select",
selectors: []string{replicaSelectorNameRandom},
selectors: []string{ReplicaSelectorNameRandom},
repfactor: 2,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand Down Expand Up @@ -246,7 +246,7 @@ func TestReplicaSelector(t *testing.T) {
},
{
name: "Select",
selectors: []string{replicaSelectorNameLFU},
selectors: []string{ReplicaSelectorNameLFU},
repfactor: 3,
md: &varlogpb.MetadataDescriptor{
StorageNodes: []*varlogpb.StorageNodeDescriptor{
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestReplicaSelector(t *testing.T) {
cmview := mrmanager.NewMockClusterMetadataView(ctrl)
cmview.EXPECT().ClusterMetadata(gomock.Any()).Return(tc.md, nil).AnyTimes()

s, err := newReplicaSelector(selector, cmview, tc.repfactor)
s, err := NewReplicaSelector(selector, cmview, tc.repfactor)
require.NoError(t, err)
require.Equal(t, selector, s.Name())
tc.testf(t, tc, s)
Expand Down

0 comments on commit 805f8de

Please sign in to comment.