From 2f1ff01c0083d0a6ecb1166e71bf014b51aa16b7 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Mon, 7 Aug 2023 18:39:08 +0800 Subject: [PATCH] ttl: make ttl manually trigger stable --- ttl/client/command.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/ttl/client/command.go b/ttl/client/command.go index 3ec5d0c9be376..6168d540ada3c 100644 --- a/ttl/client/command.go +++ b/ttl/client/command.go @@ -29,7 +29,7 @@ import ( ) const ( - ttlCmdKeyLeaseSeconds int64 = 60 + ttlCmdKeyLeaseSeconds int64 = 180 ttlCmdKeyRequestPrefix = "/tidb/ttl/cmd/req/" ttlCmdKeyResponsePrefix = "/tidb/ttl/cmd/resp/" ttlCmdTypeTriggerTTLJob = "trigger_ttl_job" @@ -161,13 +161,15 @@ func (c *etcdClient) waitCmdResponse(ctx context.Context, reqID string, obj inte key := ttlCmdKeyResponsePrefix + reqID ch := c.etcdCli.Watch(ctx, key) - ticker := time.NewTimer(time.Second) + ticker := time.NewTicker(time.Second) defer ticker.Stop() var respData []byte loop: for { select { + case <-ctx.Done(): + return ctx.Err() case <-ticker.C: response, err := c.etcdCli.Get(ctx, key) if err != nil { @@ -178,7 +180,15 @@ loop: respData = response.Kvs[0].Value break loop } - case resp := <-ch: + case resp, ok := <-ch: + if !ok { + logutil.BgLogger().Info("watcher is closed") + // to make ch always block + ch = make(chan clientv3.WatchResponse) + time.Sleep(time.Second) + continue loop + } + for _, event := range resp.Events { if event.Type == clientv3.EventTypePut { respData = event.Kv.Value