Skip to content

Commit

Permalink
server: Move all functions needed for storage bootstrap to storage pa…
Browse files Browse the repository at this point in the history
…ckage

This is prerequestite to move storage bootstrap, splitted to separate PR
to make it easier to review.
  • Loading branch information
serathius committed Aug 3, 2021
1 parent 23b742c commit 83a325a
Show file tree
Hide file tree
Showing 15 changed files with 298 additions and 212 deletions.
3 changes: 2 additions & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2v3"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
"go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/verify"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -303,7 +304,7 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized

quota := ec.QuotaBackendBytes
if quota == 0 {
quota = etcdserver.DefaultQuotaBytes
quota = storage.DefaultQuotaBytes
}

lg.Info(
Expand Down
7 changes: 4 additions & 3 deletions server/etcdserver/api/v3rpc/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage"
)

type quotaKVServer struct {
Expand All @@ -29,7 +30,7 @@ type quotaKVServer struct {
}

type quotaAlarmer struct {
q etcdserver.Quota
q storage.Quota
a Alarmer
id types.ID
}
Expand All @@ -52,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error {
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &quotaKVServer{
NewKVServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()},
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()},
}
}

Expand Down Expand Up @@ -85,6 +86,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ
func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &quotaLeaseServer{
NewLeaseServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()},
quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()},
}
}
7 changes: 4 additions & 3 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/lease"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/mvcc"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -770,7 +771,7 @@ func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error)

type applierV3Capped struct {
applierV3
q backendQuota
q serverstorage.BackendQuota
}

// newApplierV3Capped creates an applyV3 that will reject Puts and transactions
Expand Down Expand Up @@ -949,11 +950,11 @@ func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequ

type quotaApplierV3 struct {
applierV3
q Quota
q serverstorage.Quota
}

func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
return &quotaApplierV3{app, NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")}
return &quotaApplierV3{app, serverstorage.NewBackendQuota(s.Cfg, s.Backend(), "v3-applier")}
}

func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
Expand Down
19 changes: 10 additions & 9 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
Expand Down Expand Up @@ -117,7 +118,7 @@ type bootstrappedServer struct {
st v2store.Store
be backend.Backend
ss *snap.Snapshotter
beHooks *backendHooks
beHooks *serverstorage.BackendHooks
}

func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
Expand All @@ -141,11 +142,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter {
return snap.New(cfg.Logger, cfg.SnapDir())
}

func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *backendHooks, err error) {
func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) {
beExist = fileutil.Exist(cfg.BackendPath())
ci = cindex.NewConsistentIndex(nil)
beHooks = &backendHooks{lg: cfg.Logger, indexer: ci}
be = openBackend(cfg, beHooks)
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
be = serverstorage.OpenBackend(cfg, beHooks)
ci.SetBackend(be)
schema.CreateMetaBucket(be.BatchTx())
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
Expand Down Expand Up @@ -249,7 +250,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st
}, nil
}

func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *backendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) {
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
Expand Down Expand Up @@ -282,7 +283,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
}

if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, err
}
Expand All @@ -293,7 +294,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)

if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
s1, s2 := be.Size(), be.SizeInUse()
Expand Down Expand Up @@ -578,9 +579,9 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry {
}

func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry {
return createConfigChangeEnts(
return serverstorage.CreateConfigChangeEnts(
wal.lg,
getIDs(wal.lg, wal.snapshot, wal.ents),
serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents),
uint64(wal.id),
wal.st.Term,
wal.st.Commit,
Expand Down
8 changes: 1 addition & 7 deletions server/etcdserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,7 @@ var (
Name: "lease_expired_total",
Help: "The total number of expired leases.",
})
quotaBackendBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Name: "quota_backend_bytes",
Help: "Current backend storage quota size in bytes.",
})

currentVersion = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "server",
Expand Down Expand Up @@ -191,7 +186,6 @@ func init() {
prometheus.MustRegister(slowReadIndex)
prometheus.MustRegister(readIndexFailed)
prometheus.MustRegister(leaseExpired)
prometheus.MustRegister(quotaBackendBytes)
prometheus.MustRegister(currentVersion)
prometheus.MustRegister(currentGoVersion)
prometheus.MustRegister(serverID)
Expand Down
108 changes: 0 additions & 108 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@
package etcdserver

import (
"encoding/json"
"expvar"
"fmt"
"log"
"sort"
"sync"
"time"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/contention"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -415,106 +410,3 @@ func (r *raftNode) advanceTicks(ticks int) {
r.tick()
}
}

// getIDs returns an ordered set of IDs included in the given snapshot and
// the entries. The given snapshot/entries can contain three kinds of
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
ids := make(map[uint64]bool)
if snap != nil {
for _, id := range snap.Metadata.ConfState.Voters {
ids[id] = true
}
}
for _, e := range ents {
if e.Type != raftpb.EntryConfChange {
continue
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
switch cc.Type {
case raftpb.ConfChangeAddLearnerNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeAddNode:
ids[cc.NodeID] = true
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.NodeID)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
}
}
sids := make(types.Uint64Slice, 0, len(ids))
for id := range ids {
sids = append(sids, id)
}
sort.Sort(sids)
return []uint64(sids)
}

// createConfigChangeEnts creates a series of Raft entries (i.e.
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
// `self` is _not_ removed, even if present in the set.
// If `self` is not inside the given ids, it creates a Raft entry to add a
// default member with the given `self`.
func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
found := false
for _, id := range ids {
if id == self {
found = true
}
}

var ents []raftpb.Entry
next := index + 1

// NB: always add self first, then remove other nodes. Raft will panic if the
// set of voters ever becomes empty.
if !found {
m := membership.Member{
ID: types.ID(self),
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
}
ctx, err := json.Marshal(m)
if err != nil {
lg.Panic("failed to marshal member", zap.Error(err))
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: self,
Context: ctx,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}

for _, id := range ids {
if id == self {
continue
}
cc := &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}
e := raftpb.Entry{
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
Term: term,
Index: next,
}
ents = append(ents, e)
next++
}

return ents
}
5 changes: 3 additions & 2 deletions server/etcdserver/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mock/mockstorage"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ func TestGetIDs(t *testing.T) {
if tt.confState != nil {
snap.Metadata.ConfState = *tt.confState
}
idSet := getIDs(testLogger, &snap, tt.ents)
idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents)
if !reflect.DeepEqual(idSet, tt.widSet) {
t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestCreateConfigChangeEnts(t *testing.T) {
}

for i, tt := range tests {
gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
gents := serverstorage.CreateConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
if !reflect.DeepEqual(gents, tt.wents) {
t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
}
Expand Down
Loading

0 comments on commit 83a325a

Please sign in to comment.