Skip to content

Commit

Permalink
pkg/tz(ticdc): use correct ctx to get timezone (#8799) (#8830)
Browse files Browse the repository at this point in the history
ref #8798
  • Loading branch information
ti-chi-bot authored May 5, 2023
1 parent 4d633bf commit 3b9d4d8
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 41 deletions.
15 changes: 10 additions & 5 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/gctuner"
"github.com/pingcap/tiflow/cdc"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -239,23 +240,23 @@ func (s *server) createSortEngineFactory() error {
}

// Run runs the server.
func (s *server) Run(ctx context.Context) error {
if err := s.prepare(ctx); err != nil {
func (s *server) Run(serverCtx context.Context) error {
if err := s.prepare(serverCtx); err != nil {
return err
}

err := s.startStatusHTTP(s.tcpServer.HTTP1Listener())
err := s.startStatusHTTP(serverCtx, s.tcpServer.HTTP1Listener())
if err != nil {
return err
}

return s.run(ctx)
return s.run(serverCtx)
}

// startStatusHTTP starts the HTTP server.
// `lis` is a listener that gives us plain-text HTTP requests.
// TODO: can we decouple the HTTP server from the capture server?
func (s *server) startStatusHTTP(lis net.Listener) error {
func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) error {
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener. Connections that exceed the
// limit will wait in a queue and no new goroutines will be created until
Expand All @@ -279,6 +280,10 @@ func (s *server) startStatusHTTP(lis net.Listener) error {
Handler: router,
ReadTimeout: httpConnectionTimeout,
WriteTimeout: httpConnectionTimeout,
BaseContext: func(listener net.Listener) context.Context {
return contextutil.PutTimezoneInCtx(context.Background(),
contextutil.TimezoneFromCtx(serverCtx))
},
}

go func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) {
cp.EtcdClient = etcdClient
server.capture = cp
require.Nil(t, err)
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener())
require.Nil(t, err)
defer func() {
require.Nil(t, server.statusServer.Close())
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestServerTLSWithCommonNameAndRotate(t *testing.T) {
cp.EtcdClient = etcdClient
server.capture = cp
require.Nil(t, err)
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener())
require.Nil(t, err)
defer func() {
require.Nil(t, server.statusServer.Close())
Expand Down
37 changes: 29 additions & 8 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -330,28 +331,48 @@ func getSafeMode(values url.Values, safeMode *bool) error {
return nil
}

func getTimezone(ctx context.Context, values url.Values, timezone *string) error {
func getTimezone(ctxWithTimezone context.Context, values url.Values, timezone *string) error {
const pleaseSpecifyTimezone = "We recommend that you specify the time-zone explicitly. " +
"Please make sure that the timezone of the TiCDC server, " +
"sink-uri and the downstream database are consistent. " +
"If the downstream database does not load the timezone information, " +
"you can refer to https://dev.mysql.com/doc/refman/8.0/en/mysql-tzinfo-to-sql.html."
serverTimezone := contextutil.TimezoneFromCtx(ctxWithTimezone)
if _, ok := values["time-zone"]; !ok {
tz := contextutil.TimezoneFromCtx(ctx)
*timezone = fmt.Sprintf(`"%s"`, tz.String())
// If time-zone is not specified, use the timezone of the server.
log.Warn("Because time-zone is not specified, "+
"the timezone of the TiCDC server will be used. "+
pleaseSpecifyTimezone,
zap.String("timezone", serverTimezone.String()))
*timezone = fmt.Sprintf(`"%s"`, serverTimezone.String())
return nil
}

s := values.Get("time-zone")
if len(s) == 0 {
*timezone = ""
log.Warn("Because time-zone is empty, " +
"the timezone of the downstream database will be used. " +
pleaseSpecifyTimezone)
return nil
}

value, err := url.QueryUnescape(s)
changefeedTimezone, err := util.GetTimezone(s)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
_, err = time.LoadLocation(value)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
*timezone = fmt.Sprintf(`"%s"`, changefeedTimezone.String())
// We need to check whether the timezone of the TiCDC server and the sink-uri are consistent.
// If they are inconsistent, it may cause the data to be inconsistent.
if changefeedTimezone.String() != serverTimezone.String() {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, errors.Errorf(
"the timezone of the TiCDC server and the sink-uri are inconsistent. "+
"TiCDC server timezone: %s, sink-uri timezone: %s. "+
"Please make sure that the timezone of the TiCDC server, "+
"sink-uri and the downstream database are consistent.",
serverTimezone.String(), changefeedTimezone.String()))
}
*timezone = fmt.Sprintf(`"%s"`, s)

return nil
}

Expand Down
117 changes: 91 additions & 26 deletions pkg/sink/mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -202,32 +205,6 @@ func TestApplySinkURIParamsToConfig(t *testing.T) {
require.Equal(t, expected, cfg)
}

func TestParseSinkURITimezone(t *testing.T) {
t.Parallel()

uris := []string{
"mysql://127.0.0.1:3306/?time-zone=Asia/Shanghai&worker-count=32",
"mysql://127.0.0.1:3306/?time-zone=&worker-count=32",
"mysql://127.0.0.1:3306/?worker-count=32",
}
expected := []string{
"\"Asia/Shanghai\"",
"",
"\"UTC\"",
}
ctx := context.TODO()
for i, uriStr := range uris {
uri, err := url.Parse(uriStr)
require.Nil(t, err)
cfg := NewConfig()
err = cfg.Apply(ctx,
model.DefaultChangeFeedID("cf"),
uri, config.GetDefaultReplicaConfig())
require.Nil(t, err)
require.Equal(t, expected[i], cfg.Timezone)
}
}

func TestParseSinkURIOverride(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -345,3 +322,91 @@ func TestCheckTiDBVariable(t *testing.T) {
require.NotNil(t, err)
require.Regexp(t, ".*"+sql.ErrConnDone.Error(), err.Error())
}

func TestApplyTimezone(t *testing.T) {
t.Parallel()

localTimezone, err := util.GetTimezone("Local")
require.Nil(t, err)

for _, test := range []struct {
name string
noChangefeedTimezone bool
changefeedTimezone string
serverTimezone *time.Location
expected string
expectedHasErr bool
expectedErr string
}{
{
name: "no changefeed timezone",
noChangefeedTimezone: true,
serverTimezone: time.UTC,
expected: "\"UTC\"",
expectedHasErr: false,
},
{
name: "empty changefeed timezone",
noChangefeedTimezone: false,
changefeedTimezone: "",
serverTimezone: time.UTC,
expected: "",
expectedHasErr: false,
},
{
name: "normal changefeed timezone",
noChangefeedTimezone: false,
changefeedTimezone: "UTC",
serverTimezone: time.UTC,
expected: "\"UTC\"",
expectedHasErr: false,
},
{
name: "local timezone",
noChangefeedTimezone: false,
changefeedTimezone: "Local",
serverTimezone: localTimezone,
expected: "\"" + localTimezone.String() + "\"",
expectedHasErr: false,
},
{
name: "sink-uri timezone different from server timezone",
noChangefeedTimezone: false,
changefeedTimezone: "UTC",
serverTimezone: localTimezone,
expectedHasErr: true,
expectedErr: "Please make sure that the timezone of the TiCDC server",
},
{
name: "unsupported timezone format",
noChangefeedTimezone: false,
changefeedTimezone: "%2B08%3A00", // +08:00
serverTimezone: time.UTC,
expectedHasErr: true,
expectedErr: "unknown time zone +08:00",
},
} {
tc := test
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

cfg := NewConfig()
ctx := contextutil.PutTimezoneInCtx(context.Background(), tc.serverTimezone)
sinkURI := "mysql://127.0.0.1:3306"
if !tc.noChangefeedTimezone {
sinkURI = sinkURI + "?time-zone=" + tc.changefeedTimezone
}
uri, err := url.Parse(sinkURI)
require.Nil(t, err)
err = cfg.Apply(ctx,
model.DefaultChangeFeedID("changefeed-01"), uri, config.GetDefaultReplicaConfig())
if tc.expectedHasErr {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.expectedErr)
} else {
require.Nil(t, err)
require.Equal(t, tc.expected, cfg.Timezone)
}
})
}
}
12 changes: 12 additions & 0 deletions pkg/util/tz.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/util/timeutil"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// GetTimezone returns the timezone specified by the name
Expand All @@ -28,9 +30,19 @@ func GetTimezone(name string) (tz *time.Location, err error) {
case "", "system", "local":
tz, err = GetLocalTimezone()
err = cerror.WrapError(cerror.ErrLoadTimezone, err)
if err == nil {
log.Info("Use the timezone of the TiCDC server machine",
zap.String("timezoneName", name),
zap.String("timezone", tz.String()))
}
default:
tz, err = time.LoadLocation(name)
err = cerror.WrapError(cerror.ErrLoadTimezone, err)
if err == nil {
log.Info("Load the timezone specified by the user",
zap.String("timezoneName", name),
zap.String("timezone", tz.String()))
}
}
return
}
Expand Down

0 comments on commit 3b9d4d8

Please sign in to comment.