Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix VReplication logging to file and db #8521

Merged
merged 1 commit into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,10 +696,7 @@ func DeleteVReplication(uid uint32) string {
// MessageTruncate truncates the message string to a safe length.
func MessageTruncate(msg string) string {
// message length is 1000 bytes.
if len(msg) > 950 {
return msg[:950] + "..."
}
return msg
return LimitString(msg, 950)
}

func encodeString(in string) string {
Expand Down
20 changes: 16 additions & 4 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (dc *dbClientImpl) Connect() error {
func (dc *dbClientImpl) Begin() error {
_, err := dc.dbConn.ExecuteFetch("begin", 1, false)
if err != nil {
log.Errorf("BEGIN failed w/ error %v", err)
LogError("BEGIN failed w/ error", err)
dc.handleError(err)
}
return err
Expand All @@ -83,7 +83,7 @@ func (dc *dbClientImpl) Begin() error {
func (dc *dbClientImpl) Commit() error {
_, err := dc.dbConn.ExecuteFetch("commit", 1, false)
if err != nil {
log.Errorf("COMMIT failed w/ error %v", err)
LogError("COMMIT failed w/ error", err)
dc.dbConn.Close()
}
return err
Expand All @@ -92,7 +92,7 @@ func (dc *dbClientImpl) Commit() error {
func (dc *dbClientImpl) Rollback() error {
_, err := dc.dbConn.ExecuteFetch("rollback", 1, false)
if err != nil {
log.Errorf("ROLLBACK failed w/ error %v", err)
LogError("ROLLBACK failed w/ error", err)
dc.dbConn.Close()
}
return err
Expand All @@ -102,10 +102,22 @@ func (dc *dbClientImpl) Close() {
dc.dbConn.Close()
}

// LogError logs a message after truncating it to avoid spamming logs
func LogError(msg string, err error) {
log.Errorf("%s: %s", msg, MessageTruncate(err.Error()))
}

// LimitString truncates string to specified size
func LimitString(s string, limit int) string {
if len(s) > limit {
return s[:limit]
}
return s
}

func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
mqr, err := dc.dbConn.ExecuteFetch(query, maxrows, true)
if err != nil {
log.Errorf("ExecuteFetch failed w/ error %v", err)
dc.handleError(err)
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
if strings.Contains(query, "where") {
return sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|state|source",
"int64|varchar|varchar",
"id|state|source|message",
"int64|varchar|varchar|varchar",
),
`1|Running|keyspace:"ks" shard:"0" key_range:<end:"\200" > `,
`1|Running|keyspace:"ks" shard:"0" key_range:<end:"\200" > |`,
), nil
}
return &sqltypes.Result{}, nil
Expand Down
4 changes: 2 additions & 2 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
tp.incNoTabletFoundStat()
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %d seconds",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, int(GetTabletPickerRetryDelay()/1e9))
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (ct *controller) run(ctx context.Context) {
return
default:
}
log.Errorf("stream %v: %v, retrying after %v", ct.id, err, *retryDelay)
binlogplayer.LogError(fmt.Sprintf("error in stream %v, retrying after %v", ct.id, *retryDelay), err)
ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1)
timer := time.NewTimer(*retryDelay)
select {
Expand Down
37 changes: 21 additions & 16 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"

"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/evalengine"
Expand Down Expand Up @@ -68,41 +71,43 @@ const (
LogError = "Error"
)

func getLastLog(dbClient *vdbClient, vreplID uint32) (int64, string, string, string, error) {
func getLastLog(dbClient *vdbClient, vreplID uint32) (id int64, typ, state, message string, err error) {
var qr *sqltypes.Result
var err error
query := fmt.Sprintf("select id, type, state, message from _vt.vreplication_log where vrepl_id = %d order by id desc limit 1", vreplID)
if qr, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch); err != nil {
return 0, "", "", "", err
}
if len(qr.Rows) != 1 || len(qr.Rows[0]) != 4 {
if len(qr.Rows) != 1 {
return 0, "", "", "", nil
}
row := qr.Rows[0]
id, _ := evalengine.ToInt64(row[0])
typ := row[1].ToString()
state := row[2].ToString()
message := row[3].ToString()
id, _ = evalengine.ToInt64(row[0])
typ = row[1].ToString()
state = row[2].ToString()
message = row[3].ToString()
return id, typ, state, message, nil
}

func insertLog(dbClient *vdbClient, typ string, vreplID uint32, state, message string) error {
var query string

// getLastLog returns the last log for a stream. During insertion, if the id/type/state/message match we do not insert
// getLastLog returns the last log for a stream. During insertion, if the type/state/message match we do not insert
// a new log but increment the count. This prevents spamming of the log table in case the same message is logged continuously.
id, currentType, currentState, currentMessage, err := getLastLog(dbClient, vreplID)
id, _, lastLogState, lastLogMessage, err := getLastLog(dbClient, vreplID)
if err != nil {
return err
}

if id > 0 && typ == currentType && state == currentState && message == currentMessage {
if typ == LogStateChange && state == lastLogState {
// handles case where current state is Running, controller restarts after an error and initializes the state Running
return nil
}
var query string
if id > 0 && message == lastLogMessage {
query = fmt.Sprintf("update _vt.vreplication_log set count = count + 1 where id = %d", id)
} else {
query = `insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%d, '%s', '%s', %s)`
query = fmt.Sprintf(query, vreplID, typ, state, encodeString(message))
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("insert into _vt.vreplication_log(vrepl_id, type, state, message) values(%s, %s, %s, %s)",
strconv.Itoa(int(vreplID)), encodeString(typ), encodeString(state), encodeString(message))
query = buf.ParsedQuery().Query
}

if _, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch); err != nil {
return fmt.Errorf("could not insert into log table: %v: %v", query, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestPlayerStatementModeWithFilter(t *testing.T) {
output := []string{
"begin",
"rollback",
"/update _vt.vreplication set message='Error: filter rules are not supported for SBR",
"/update _vt.vreplication set message='filter rules are not supported for SBR",
}

execStatements(t, input)
Expand Down Expand Up @@ -1568,7 +1568,7 @@ func TestPlayerDDL(t *testing.T) {
execStatements(t, []string{"alter table t1 add column val2 varchar(128)"})
expectDBClientQueries(t, []string{
"alter table t1 add column val2 varchar(128)",
"/update _vt.vreplication set message='Error: Duplicate",
"/update _vt.vreplication set message='Duplicate",
})
cancel()

Expand Down Expand Up @@ -2362,7 +2362,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {

streamerEngine.Close()
expectDBClientQueries(t, []string{
"/update _vt.vreplication set message='Error: vstream ended'",
"/update _vt.vreplication set message='vstream ended'",
})
streamerEngine.Open()

Expand Down
13 changes: 9 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/vt/sqlparser"

querypb "vitess.io/vitess/go/vt/proto/query"

"vitess.io/vitess/go/vt/vtgate/evalengine"
Expand Down Expand Up @@ -147,9 +150,8 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame
func (vr *vreplicator) Replicate(ctx context.Context) error {
err := vr.replicate(ctx)
if err != nil {
log.Errorf("Replicate error: %s", err.Error())
if err := vr.setMessage(fmt.Sprintf("Error: %s", err.Error())); err != nil {
log.Errorf("Failed to set error state: %v", err)
if err := vr.setMessage(err.Error()); err != nil {
binlogplayer.LogError("Failed to set error state", err)
}
}
return err
Expand Down Expand Up @@ -334,11 +336,14 @@ func (vr *vreplicator) readSettings(ctx context.Context) (settings binlogplayer.
}

func (vr *vreplicator) setMessage(message string) error {
message = binlogplayer.MessageTruncate(message)
vr.stats.History.Add(&binlogplayer.StatsHistoryRecord{
Time: time.Now(),
Message: message,
})
query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(binlogplayer.MessageTruncate(message)), vr.id)
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("update _vt.vreplication set message=%s where id=%s", encodeString(message), strconv.Itoa(int(vr.id)))
query := buf.ParsedQuery().Query
if _, err := vr.dbClient.Execute(query); err != nil {
return fmt.Errorf("could not set message: %v: %v", query, err)
}
Expand Down