diff --git a/pkg/kine/logstructured/sqllog/sql.go b/pkg/kine/logstructured/sqllog/sql.go index 8b11e6ae..ad43d28a 100644 --- a/pkg/kine/logstructured/sqllog/sql.go +++ b/pkg/kine/logstructured/sqllog/sql.go @@ -399,23 +399,14 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } } waitForMore = true - watchCtx, cancel := context.WithTimeout(s.ctx, s.d.GetWatchQueryTimeout()) - defer cancel() - - rows, err := s.d.After(watchCtx, last, 500) + events, err := s.getLatestEvents(s.ctx, last) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { - logrus.Errorf("fail to list latest changes: %v", err) + logrus.Errorf("fail to get latest events: %v", err) } continue } - events, err := RowsToEvents(rows) - if err != nil { - logrus.Errorf("fail to convert rows changes: %v", err) - continue - } - if len(events) == 0 { continue } @@ -485,6 +476,22 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { } } +func (s *SQLLog) getLatestEvents(ctx context.Context, last int64) ([]*server.Event, error) { + watchCtx, cancel := context.WithTimeout(ctx, s.d.GetWatchQueryTimeout()) + defer cancel() + + rows, err := s.d.After(watchCtx, last, 500) + if err != nil { + return nil, err + } + + events, err := RowsToEvents(rows) + if err != nil { + return nil, err + } + return events, nil +} + func canSkipRevision(rev, skip int64, skipTime time.Time) bool { return rev == skip && time.Since(skipTime) > time.Second }