Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#8266 from tinyspeck/am_vtctld_trace_getwo…
Browse files Browse the repository at this point in the history
…rkflows

[workflow] Add tracing to `GetWorkflows` endpoint
  • Loading branch information
ajm188 authored Jun 6, 2021
2 parents ec484e1 + 114d18e commit 405b4ba
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
7 changes: 7 additions & 0 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -921,6 +922,12 @@ func (s *VtctldServer) GetVSchema(ctx context.Context, req *vtctldatapb.GetVSche

// GetWorkflows is part of the vtctlservicepb.VtctldServer interface.
func (s *VtctldServer) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.GetWorkflows")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("active_only", req.ActiveOnly)

return s.ws.GetWorkflows(ctx, req)
}

Expand Down
26 changes: 26 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -262,6 +263,12 @@ func (s *Server) GetCellsWithTableReadsSwitched(
// It has the same signature as the vtctlservicepb.VtctldServer's GetWorkflows
// rpc, and grpcvtctldserver delegates to this function.
func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflowsRequest) (*vtctldatapb.GetWorkflowsResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.GetWorkflows")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("active_only", req.ActiveOnly)

where := ""
if req.ActiveOnly {
where = "WHERE state <> 'Stopped'"
Expand Down Expand Up @@ -307,6 +314,15 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
// - targetShardsByWorkflow[workflow.Name] != nil
// - workflow.ShardStatuses != nil
scanWorkflow := func(ctx context.Context, workflow *vtctldatapb.Workflow, row []sqltypes.Value, tablet *topo.TabletInfo) error {
span, ctx := trace.NewSpan(ctx, "workflow.Server.scanWorkflow")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("shard", tablet.Shard)
span.Annotate("active_only", req.ActiveOnly)
span.Annotate("workflow", workflow.Name)
span.Annotate("tablet_alias", tablet.AliasString())

id, err := evalengine.ToInt64(row[0])
if err != nil {
return err
Expand Down Expand Up @@ -357,6 +373,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return err
}

span.Annotate("num_copy_states", len(stream.CopyStates))

switch {
case strings.Contains(strings.ToLower(stream.Message), "error"):
stream.State = "Error"
Expand Down Expand Up @@ -499,6 +517,14 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
}

func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates")
defer span.Finish()

span.Annotate("keyspace", tablet.Keyspace)
span.Annotate("shard", tablet.Shard)
span.Annotate("tablet_alias", tablet.AliasString())
span.Annotate("vrepl_id", id)

query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d", id)
qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
Expand Down

0 comments on commit 405b4ba

Please sign in to comment.