Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Oct 3, 2024
1 parent 80b472c commit a4456a4
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 7 deletions.
47 changes: 47 additions & 0 deletions go/vt/events/eventer/eventer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package eventer

import (
"fmt"

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/events"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
)

var eventerName string

func RegisterFlags(fs *pflag.FlagSet) {
fs.StringVar(&eventerName, "eventer", eventerName, "the eventer to be used to broadcast internal events")
}

type Eventer interface {
DeleteTablet(ev *events.DeleteTabletEvent)
EmergencyReparentShard(ev *events.EmergencyReparentShardEvent)
PlannedReparentShard(ev *events.PlannedReparentShardEvent)
}

type NewEventer func() (Eventer, error)

var eventers = make(map[string]NewEventer)

func RegisterEventer(name string, eventerFunc NewEventer) {
if eventers[name] != nil {
log.Fatalf("eventer %v already registered", name)
}
eventers[name] = eventerFunc
}

func Get() (Eventer, error) {
if eventerFunc, ok := eventers[eventerName]; ok {
return eventerFunc()
}
return nil, fmt.Errorf("no eventer %v registered", eventerName)
}

func init() {
servenv.OnParse(func(fs *pflag.FlagSet) {
RegisterFlags(fs)
})
}
28 changes: 28 additions & 0 deletions go/vt/events/eventer/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package eventer

import (
"vitess.io/vitess/go/vt/events"
"vitess.io/vitess/go/vt/log"
)

type LogEventer struct{}

func NewLogEventer() (Eventer, error) {
return &LogEventer{}, nil
}

func (le *LogEventer) DeleteTablet(ev *events.DeleteTabletEvent) {
log.Infof("Received DeleteTabletEvent: %v", ev)
}

func (le *LogEventer) EmergencyReparentShard(ev *events.EmergencyReparentShardEvent) {
log.Infof("Received EmergencyReparentShardEvent: %v", ev)
}

func (le *LogEventer) PlannedReparentShard(ev *events.PlannedReparentShardEvent) {
log.Infof("Received PlannedReparentShardEvent: %v", ev)
}

func init() {
RegisterEventer("log", NewLogEventer)
}
20 changes: 20 additions & 0 deletions go/vt/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package events

type SourceType string

const (
SourceVtctld SourceType = "vtctld"
SourceVtorc SourceType = "vtorc"
)

type Source struct {
Type SourceType `json:"type"`
Hostname string `json:"hostname"`
}

func NewSourceVtctld(hostname string) Source {
return Source{
Type: SourceVtctld,
Hostname: hostname,
}
}
26 changes: 26 additions & 0 deletions go/vt/events/reparents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package events

import (
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
)

type EmergencyReparentShardEvent struct {
Source Source `json:"source"`
Time time.Time `json:"time"`
ShardInfo topo.ShardInfo `json:"shard_info"`
OldPrimary *topodatapb.Tablet `json:"old_primary"`
NewPrimary *topodatapb.Tablet `json:"new_primary"`
Error error `json:"error"`
}

type PlannedReparentShardEvent struct {
Source Source `json:"source"`
Time time.Time `json:"time"`
ShardInfo topo.ShardInfo `json:"shard_info"`
OldPrimary *topodatapb.Tablet `json:"old_primary"`
NewPrimary *topodatapb.Tablet `json:"new_primary"`
Error error `json:"error"`
}
15 changes: 15 additions & 0 deletions go/vt/events/tablets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package events

import (
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// DeleteTabletEvent represents a DeleteTablet event from vtctl.
type DeleteTabletEvent struct {
Source Source `json:"source"`
Time time.Time `json:"time"`
Tablet *topodatapb.Tablet `json:"tablet"`
Error error `json:"error"`
}
14 changes: 9 additions & 5 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/events/eventer"
hk "vitess.io/vitess/go/vt/hook"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -90,26 +91,29 @@ type VtctldServer struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
ws *workflow.Server
ev eventer.Eventer
}

// NewVtctldServer returns a new VtctldServer for the given topo server.
func NewVtctldServer(env *vtenv.Environment, ts *topo.Server) *VtctldServer {
func NewVtctldServer(env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) *VtctldServer {
tmc := tmclient.NewTabletManagerClient()

return &VtctldServer{
ts: ts,
tmc: tmc,
ws: workflow.NewServer(env, ts, tmc),
ev: ev,
}
}

// NewTestVtctldServer returns a new VtctldServer for the given topo server
// AND tmclient for use in tests. This should NOT be used in production.
func NewTestVtctldServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *VtctldServer {
func NewTestVtctldServer(ts *topo.Server, tmc tmclient.TabletManagerClient, ev eventer.Eventer) *VtctldServer {
return &VtctldServer{
ts: ts,
tmc: tmc,
ws: workflow.NewServer(vtenv.NewTestEnv(), ts, tmc),
ev: ev,
}
}

Expand Down Expand Up @@ -1122,7 +1126,7 @@ func (s *VtctldServer) DeleteTablets(ctx context.Context, req *vtctldatapb.Delet
span.Annotate("allow_primary", req.AllowPrimary)

for _, alias := range req.TabletAliases {
if err2 := deleteTablet(ctx, s.ts, alias, req.AllowPrimary); err2 != nil {
if err2 := deleteTablet(ctx, s.ts, s.ev, alias, req.AllowPrimary); err2 != nil {
err = err2
return nil, err
}
Expand Down Expand Up @@ -5029,8 +5033,8 @@ func (s *VtctldServer) WorkflowUpdate(ctx context.Context, req *vtctldatapb.Work
}

// StartServer registers a VtctldServer for RPCs on the given gRPC server.
func StartServer(s *grpc.Server, env *vtenv.Environment, ts *topo.Server) {
vtctlservicepb.RegisterVtctldServer(s, NewVtctldServer(env, ts))
func StartServer(s *grpc.Server, env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) {
vtctlservicepb.RegisterVtctldServer(s, NewVtctldServer(env, ts, ev))
}

// getTopologyCell is a helper method that returns a topology cell given its path.
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vtctl/grpcvtctldserver/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/events"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
Expand Down Expand Up @@ -205,7 +207,7 @@ func deleteShardCell(ctx context.Context, ts *topo.Server, keyspace string, shar
return nil
}

func deleteTablet(ctx context.Context, ts *topo.Server, alias *topodatapb.TabletAlias, allowPrimary bool) (err error) {
func deleteTablet(ctx context.Context, ts *topo.Server, ev eventer.Eventer, alias *topodatapb.TabletAlias, allowPrimary bool) (err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.deleteTablet")
defer span.Finish()

Expand Down Expand Up @@ -257,7 +259,16 @@ func deleteTablet(ctx context.Context, ts *topo.Server, alias *topodatapb.Tablet
}

// Remove the tablet record and its replication graph entry.
if err := topotools.DeleteTablet(ctx, ts, tablet.Tablet); err != nil {
err = topotools.DeleteTablet(ctx, ts, tablet.Tablet)
if ev != nil {
ev.DeleteTablet(&events.DeleteTabletEvent{
Source: events.NewSourceVtctld("TODO"),
Time: time.Now(),
Tablet: tablet.Tablet,
Error: err,
})
}
if err != nil {
return err
}

Expand Down

0 comments on commit a4456a4

Please sign in to comment.