From be2e3103bcbb283710304a447906a1adb3e12bdd Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 18 Apr 2024 12:13:37 +0800 Subject: [PATCH] br: Enable checkpoint advancer to pause tasks lagged too large (#51441) (#52554) close pingcap/tidb#50803 --- br/pkg/streamhelper/BUILD.bazel | 5 +- br/pkg/streamhelper/advancer.go | 41 +++++++++- br/pkg/streamhelper/advancer_cliext.go | 88 +++++++++++++++------ br/pkg/streamhelper/advancer_env.go | 7 ++ br/pkg/streamhelper/advancer_test.go | 78 ++++++++++++++++++ br/pkg/streamhelper/basic_lib_for_test.go | 45 +++++++++++ br/pkg/streamhelper/config/advancer_conf.go | 9 +++ br/pkg/streamhelper/models.go | 6 ++ br/pkg/streamhelper/regioniter.go | 2 + br/pkg/streamhelper/regioniter_test.go | 7 ++ 10 files changed, 264 insertions(+), 24 deletions(-) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index 9a2e0b91a26b6..2d42a61ff369b 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -67,7 +67,8 @@ go_test( "subscription_test.go", ], flaky = True, - shard_count = 22, + race = "on", + shard_count = 25, deps = [ ":streamhelper", "//br/pkg/errors", @@ -88,8 +89,10 @@ go_test( "@com_github_pingcap_kvproto//pkg/logbackuppb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/txnlock", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index b3dba99b13292..f28c5655eed9a 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -71,6 +71,7 @@ type CheckpointAdvancer struct { lastCheckpoint *checkpoint lastCheckpointMu sync.Mutex inResolvingLock atomic.Bool + isPaused atomic.Bool checkpoints *spans.ValueSortedFull checkpointsMu sync.Mutex @@ -446,6 +447,14 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error log.Warn("failed to remove service GC safepoint", logutil.ShortError(err)) } metrics.LastCheckpoint.DeleteLabelValues(e.Name) + case EventPause: + if c.task.GetName() == e.Name { + c.isPaused.CompareAndSwap(false, true) + } + case EventResume: + if c.task.GetName() == e.Name { + c.isPaused.CompareAndSwap(true, false) + } case EventErr: return e.Err } @@ -543,6 +552,25 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error { return c.subscriber.PendingErrors() } +func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) { + if c.cfg.CheckPointLagLimit <= 0 { + return false, nil + } + + now, err := c.env.FetchCurrentTS(ctx) + if err != nil { + return false, err + } + + lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS)) + if lagDuration > c.cfg.CheckPointLagLimit { + log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"), + zap.Stringer("lag", lagDuration)) + return true, nil + } + return false, nil +} + func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { c.checkpointsMu.Lock() c.setCheckpoint(ctx, c.checkpoints.Min()) @@ -550,6 +578,17 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil { return errors.Annotate(err, "failed to upload global checkpoint") } + isLagged, err := c.isCheckpointLagged(ctx) + if err != nil { + return errors.Annotate(err, "failed to check timestamp") + } + if isLagged { + err := c.env.PauseTask(ctx, c.task.Name) + if err != nil { + return errors.Annotate(err, "failed to pause task") + } + return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large") + } p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS()) if err != nil { return errors.Annotatef(err, @@ -605,7 +644,7 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error { func (c *CheckpointAdvancer) tick(ctx context.Context) error { c.taskMu.Lock() defer c.taskMu.Unlock() - if c.task == nil { + if c.task == nil || c.isPaused.Load() { log.Debug("No tasks yet, skipping advancing.") return nil } diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index b2f0c00ab4da9..8b17753977ffb 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -29,6 +29,8 @@ const ( EventAdd EventType = iota EventDel EventErr + EventPause + EventResume ) func (t EventType) String() string { @@ -39,6 +41,10 @@ func (t EventType) String() string { return "Del" case EventErr: return "Err" + case EventPause: + return "Pause" + case EventResume: + return "Resume" } return "Unknown" } @@ -70,29 +76,47 @@ func errorEvent(err error) TaskEvent { } func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) { - if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) { - return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, - "the path isn't a task path (%s)", string(event.Kv.Key)) + te := TaskEvent{} + var prefix string + + if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) { + prefix = PrefixOfTask() + te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix) + } else if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfPause())) { + prefix = PrefixOfPause() + te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix) + } else { + return TaskEvent{}, + errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task/pause path (%s)", + string(event.Kv.Key)) } - te := TaskEvent{} - te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask()) - if event.Type == clientv3.EventTypeDelete { - te.Type = EventDel - } else if event.Type == clientv3.EventTypePut { + switch { + case event.Type == clientv3.EventTypePut && prefix == PrefixOfTask(): te.Type = EventAdd - } else { - return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type) + case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfTask(): + te.Type = EventDel + case event.Type == clientv3.EventTypePut && prefix == PrefixOfPause(): + te.Type = EventPause + case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfPause(): + te.Type = EventResume + default: + return TaskEvent{}, + errors.Annotatef(berrors.ErrInvalidArgument, + "invalid event type or prefix: type=%s, prefix=%s", event.Type, prefix) } + te.Info = new(backuppb.StreamBackupTaskInfo) if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil { return TaskEvent{}, err } + var err error te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx) if err != nil { return TaskEvent{}, err } + return te, nil } @@ -113,7 +137,10 @@ func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResp } func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) { - c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) + taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) + pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev)) + + // inner function def handleResponse := func(resp clientv3.WatchResponse) bool { events, err := t.eventFromWatch(ctx, resp) if err != nil { @@ -126,21 +153,26 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE } return true } + + // inner function def collectRemaining := func() { - log.Info("[log backup advancer] Start collecting remaining events in the channel.", - zap.Int("remained", len(c))) - defer log.Info("[log backup advancer] Finish collecting remaining events in the channel.") + log.Info("Start collecting remaining events in the channel.", zap.String("category", "log backup advancer"), + zap.Int("remained", len(taskCh))) + defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer")) for { + if taskCh == nil && pauseCh == nil { + return + } + select { - case resp, ok := <-c: - if !ok { - return + case resp, ok := <-taskCh: + if !ok || !handleResponse(resp) { + taskCh = nil } - if !handleResponse(resp) { - return + case resp, ok := <-pauseCh: + if !ok || !handleResponse(resp) { + pauseCh = nil } - default: - return } } } @@ -149,7 +181,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE defer close(ch) for { select { - case resp, ok := <-c: + case resp, ok := <-taskCh: failpoint.Inject("advancer_close_channel", func() { // We cannot really close the channel, just simulating it. ok = false @@ -161,6 +193,18 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE if !handleResponse(resp) { return } + case resp, ok := <-pauseCh: + failpoint.Inject("advancer_close_pause_channel", func() { + // We cannot really close the channel, just simulating it. + ok = false + }) + if !ok { + ch <- errorEvent(io.EOF) + return + } + if !handleResponse(resp) { + return + } case <-ctx.Done(): collectRemaining() ch <- errorEvent(ctx.Err()) diff --git a/br/pkg/streamhelper/advancer_env.go b/br/pkg/streamhelper/advancer_env.go index 5b29651aa45c8..1ba16c008d2cc 100644 --- a/br/pkg/streamhelper/advancer_env.go +++ b/br/pkg/streamhelper/advancer_env.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/engine" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" pd "github.com/tikv/pd/client" @@ -48,6 +49,11 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at) } +// TODO: It should be able to synchoronize the current TS with the PD. +func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) { + return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil +} + // RegionScan gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) { @@ -152,6 +158,7 @@ type StreamMeta interface { UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error // ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store. ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error + PauseTask(ctx context.Context, taskName string) error } var _ tikv.RegionLockResolver = &AdvancerLockResolver{} diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 2e6afe4ea1c68..7b7e31f24ae07 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/pingcap/tidb/kv" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" @@ -463,3 +464,80 @@ func TestRemoveTaskAndFlush(t *testing.T) { return !adv.HasSubscribion() }, 10*time.Second, 100*time.Millisecond) } + +func TestEnableCheckPointLimit(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + for i := 0; i < 5; i++ { + c.advanceClusterTimeBy(30 * time.Second) + c.advanceCheckpointBy(20 * time.Second) + require.NoError(t, adv.OnTick(ctx)) + } +} + +func TestCheckPointLagged(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + c.advanceClusterTimeBy(1 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") + // after some times, the isPaused will be set and ticks are skipped + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) +} + +func TestCheckPointResume(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + env := &testEnv{fakeCluster: c, testCtx: t} + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + c.advanceClusterTimeBy(1 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) + //now the checkpoint issue is fixed and resumed + c.advanceCheckpointBy(1 * time.Minute) + env.ResumeTask(ctx) + require.Eventually(t, func() bool { + return assert.NoError(t, adv.OnTick(ctx)) + }, 5*time.Second, 100*time.Millisecond) + //with time passed, the checkpoint will exceed the limit again + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index fc0435b47015d..1b6e01a145d04 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/codec" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" @@ -101,6 +102,7 @@ type fakeCluster struct { onGetClient func(uint64) error serviceGCSafePoint uint64 + currentTS uint64 } func (r *region) splitAt(newID uint64, k string) *region { @@ -272,6 +274,10 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro return at, nil } +func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) { + return f.currentTS, nil +} + // RegionScan gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) { @@ -489,6 +495,29 @@ func (f *fakeCluster) advanceCheckpoints() uint64 { return minCheckpoint } +func (f *fakeCluster) advanceCheckpointBy(duration time.Duration) uint64 { + minCheckpoint := uint64(math.MaxUint64) + for _, r := range f.regions { + f.updateRegion(r.id, func(r *region) { + newCheckpointTime := oracle.GetTimeFromTS(r.checkpoint.Load()).Add(duration) + newCheckpoint := oracle.GoTimeToTS(newCheckpointTime) + r.checkpoint.Store(newCheckpoint) + if newCheckpoint < minCheckpoint { + minCheckpoint = newCheckpoint + } + r.fsim.flushedEpoch.Store(0) + }) + } + log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint)) + return minCheckpoint +} + +func (f *fakeCluster) advanceClusterTimeBy(duration time.Duration) uint64 { + newTime := oracle.GoTimeToTS(oracle.GetTimeFromTS(f.currentTS).Add(duration)) + f.currentTS = newTime + return newTime +} + func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster { c := &fakeCluster{ stores: map[uint64]*fakeStore{}, @@ -653,6 +682,22 @@ func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName s return nil } +func (t *testEnv) PauseTask(ctx context.Context, taskName string) error { + t.taskCh <- streamhelper.TaskEvent{ + Type: streamhelper.EventPause, + Name: taskName, + } + return nil +} + +func (t *testEnv) ResumeTask(ctx context.Context) error { + t.taskCh <- streamhelper.TaskEvent{ + Type: streamhelper.EventResume, + Name: "whole", + } + return nil +} + func (t *testEnv) getCheckpoint() uint64 { t.mu.Lock() defer t.mu.Unlock() diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 35fbb14f354f3..b8fd1a03569a5 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -17,6 +17,7 @@ const ( DefaultConsistencyCheckTick = 5 DefaultTryAdvanceThreshold = 4 * time.Minute + DefaultCheckPointLagLimit = 0 DefaultBackOffTime = 5 * time.Second DefaultTickInterval = 12 * time.Second DefaultFullScanTick = 4 @@ -34,6 +35,8 @@ type Config struct { TickDuration time.Duration `toml:"tick-interval" json:"tick-interval"` // The threshold for polling TiKV for checkpoint of some range. TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"` + // The maximum lag could be tolerated for the checkpoint lag. + CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"` } func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { @@ -50,6 +53,7 @@ func Default() Config { BackoffTime: DefaultBackOffTime, TickDuration: DefaultTickInterval, TryAdvanceThreshold: DefaultTryAdvanceThreshold, + CheckPointLagLimit: DefaultCheckPointLagLimit, } } @@ -76,6 +80,11 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration { return conf.TryAdvanceThreshold } +// GetCheckPointLagLimit returns the maximum lag could be tolerated for the checkpoint lag. +func (conf Config) GetCheckPointLagLimit() time.Duration { + return conf.CheckPointLagLimit +} + // GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint // when the subscriber meets error. func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration { diff --git a/br/pkg/streamhelper/models.go b/br/pkg/streamhelper/models.go index 327704b3e1db5..70ef6849ebaf6 100644 --- a/br/pkg/streamhelper/models.go +++ b/br/pkg/streamhelper/models.go @@ -94,6 +94,12 @@ func Pause(task string) string { return path.Join(streamKeyPrefix, taskPausePath, task) } +// PrefixOfPause returns the prefix for pausing the task. +// Normally it would be /pause/ +func PrefixOfPause() string { + return path.Join(streamKeyPrefix, taskPausePath) + "/" +} + // LastErrorPrefixOf make the prefix for searching last error by some task. func LastErrorPrefixOf(task string) string { return strings.TrimSuffix(path.Join(streamKeyPrefix, taskLastErrorPath, task), "/") + "/" diff --git a/br/pkg/streamhelper/regioniter.go b/br/pkg/streamhelper/regioniter.go index 02d8ca9d94e94..d6aa6f800f22a 100644 --- a/br/pkg/streamhelper/regioniter.go +++ b/br/pkg/streamhelper/regioniter.go @@ -42,6 +42,8 @@ type TiKVClusterMeta interface { // NOTE: once we support multi tasks, perhaps we need to allow the caller to provide a namespace. // For now, all tasks (exactly one task in fact) use the same checkpoint. BlockGCUntil(ctx context.Context, at uint64) (uint64, error) + + FetchCurrentTS(ctx context.Context) (uint64, error) } type Store struct { diff --git a/br/pkg/streamhelper/regioniter_test.go b/br/pkg/streamhelper/regioniter_test.go index c2ad92c9a58da..6ef65a5c86987 100644 --- a/br/pkg/streamhelper/regioniter_test.go +++ b/br/pkg/streamhelper/regioniter_test.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/logutil" @@ -16,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/streamhelper/spans" "github.com/pingcap/tidb/kv" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -81,6 +83,11 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e return 0, status.Error(codes.Unimplemented, "Unsupported operation") } +// TODO: It should be able to synchoronize the current TS with the PD. +func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) { + return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil +} + func makeSubrangeRegions(keys ...string) constantRegions { if len(keys) == 0 { return nil