Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/tz(ticdc): use correct ctx to get timezone #8799

Merged
merged 11 commits into from
Apr 21, 2023
15 changes: 10 additions & 5 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/gctuner"
"github.com/pingcap/tiflow/cdc"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -239,23 +240,23 @@ func (s *server) createSortEngineFactory() error {
}

// Run runs the server.
func (s *server) Run(ctx context.Context) error {
if err := s.prepare(ctx); err != nil {
func (s *server) Run(serverCtx context.Context) error {
if err := s.prepare(serverCtx); err != nil {
return err
}

err := s.startStatusHTTP(s.tcpServer.HTTP1Listener())
err := s.startStatusHTTP(serverCtx, s.tcpServer.HTTP1Listener())
if err != nil {
return err
}

return s.run(ctx)
return s.run(serverCtx)
}

// startStatusHTTP starts the HTTP server.
// `lis` is a listener that gives us plain-text HTTP requests.
// TODO: can we decouple the HTTP server from the capture server?
func (s *server) startStatusHTTP(lis net.Listener) error {
func (s *server) startStatusHTTP(serverCtx context.Context, lis net.Listener) error {
// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener. Connections that exceed the
// limit will wait in a queue and no new goroutines will be created until
Expand All @@ -279,6 +280,10 @@ func (s *server) startStatusHTTP(lis net.Listener) error {
Handler: router,
ReadTimeout: httpConnectionTimeout,
WriteTimeout: httpConnectionTimeout,
BaseContext: func(listener net.Listener) context.Context {
return contextutil.PutTimezoneInCtx(context.Background(),
contextutil.TimezoneFromCtx(serverCtx))
},
}

go func() {
Expand Down
4 changes: 2 additions & 2 deletions cdc/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) {
cp.EtcdClient = etcdClient
server.capture = cp
require.Nil(t, err)
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener())
require.Nil(t, err)
defer func() {
require.Nil(t, server.statusServer.Close())
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestServerTLSWithCommonNameAndRotate(t *testing.T) {
cp.EtcdClient = etcdClient
server.capture = cp
require.Nil(t, err)
err = server.startStatusHTTP(server.tcpServer.HTTP1Listener())
err = server.startStatusHTTP(context.TODO(), server.tcpServer.HTTP1Listener())
require.Nil(t, err)
defer func() {
require.Nil(t, server.statusServer.Close())
Expand Down
37 changes: 29 additions & 8 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -330,28 +331,48 @@ func getSafeMode(values url.Values, safeMode *bool) error {
return nil
}

func getTimezone(ctx context.Context, values url.Values, timezone *string) error {
func getTimezone(ctxWithTimezone context.Context, values url.Values, timezone *string) error {
const pleaseSpecifyTimezone = "We recommend that you specify the time-zone explicitly. " +
"Please make sure that the timezone of the TiCDC server, " +
"sink-uri and the downstream database are consistent. " +
"If the downstream database does not load the timezone information, " +
"you can refer to https://dev.mysql.com/doc/refman/8.0/en/mysql-tzinfo-to-sql.html."
serverTimezone := contextutil.TimezoneFromCtx(ctxWithTimezone)
if _, ok := values["time-zone"]; !ok {
tz := contextutil.TimezoneFromCtx(ctx)
*timezone = fmt.Sprintf(`"%s"`, tz.String())
// If time-zone is not specified, use the timezone of the server.
log.Warn("Because time-zone is not specified, "+
"the timezone of the TiCDC server will be used. "+
pleaseSpecifyTimezone,
zap.String("timezone", serverTimezone.String()))
*timezone = fmt.Sprintf(`"%s"`, serverTimezone.String())
return nil
}

s := values.Get("time-zone")
if len(s) == 0 {
*timezone = ""
log.Warn("Because time-zone is empty, " +
"the timezone of the downstream database will be used. " +
pleaseSpecifyTimezone)
return nil
}

value, err := url.QueryUnescape(s)
changefeedTimezone, err := util.GetTimezone(s)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
_, err = time.LoadLocation(value)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
*timezone = fmt.Sprintf(`"%s"`, changefeedTimezone.String())
// We need to check whether the timezone of the TiCDC server and the sink-uri are consistent.
// If they are inconsistent, it may cause the data to be inconsistent.
if changefeedTimezone.String() != serverTimezone.String() {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, errors.Errorf(
"the timezone of the TiCDC server and the sink-uri are inconsistent. "+
"TiCDC server timezone: %s, sink-uri timezone: %s. "+
"Please make sure that the timezone of the TiCDC server, "+
"sink-uri and the downstream database are consistent.",
serverTimezone.String(), changefeedTimezone.String()))
}
*timezone = fmt.Sprintf(`"%s"`, s)

return nil
}

Expand Down
117 changes: 91 additions & 26 deletions pkg/sink/mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"net/url"
"strings"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -202,32 +205,6 @@ func TestApplySinkURIParamsToConfig(t *testing.T) {
require.Equal(t, expected, cfg)
}

func TestParseSinkURITimezone(t *testing.T) {
t.Parallel()

uris := []string{
"mysql://127.0.0.1:3306/?time-zone=Asia/Shanghai&worker-count=32",
"mysql://127.0.0.1:3306/?time-zone=&worker-count=32",
"mysql://127.0.0.1:3306/?worker-count=32",
}
expected := []string{
"\"Asia/Shanghai\"",
"",
"\"UTC\"",
}
ctx := context.TODO()
for i, uriStr := range uris {
uri, err := url.Parse(uriStr)
require.Nil(t, err)
cfg := NewConfig()
err = cfg.Apply(ctx,
model.DefaultChangeFeedID("cf"),
uri, config.GetDefaultReplicaConfig())
require.Nil(t, err)
require.Equal(t, expected[i], cfg.Timezone)
}
}

func TestParseSinkURIOverride(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -345,3 +322,91 @@ func TestCheckTiDBVariable(t *testing.T) {
require.NotNil(t, err)
require.Regexp(t, ".*"+sql.ErrConnDone.Error(), err.Error())
}

func TestApplyTimezone(t *testing.T) {
t.Parallel()

localTimezone, err := util.GetTimezone("Local")
require.Nil(t, err)

for _, test := range []struct {
name string
noChangefeedTimezone bool
changefeedTimezone string
serverTimezone *time.Location
expected string
expectedHasErr bool
expectedErr string
}{
{
name: "no changefeed timezone",
noChangefeedTimezone: true,
serverTimezone: time.UTC,
expected: "\"UTC\"",
expectedHasErr: false,
},
{
name: "empty changefeed timezone",
noChangefeedTimezone: false,
changefeedTimezone: "",
serverTimezone: time.UTC,
expected: "",
expectedHasErr: false,
},
{
name: "normal changefeed timezone",
noChangefeedTimezone: false,
changefeedTimezone: "UTC",
serverTimezone: time.UTC,
expected: "\"UTC\"",
expectedHasErr: false,
},
{
name: "local timezone",
noChangefeedTimezone: false,
changefeedTimezone: "Local",
serverTimezone: localTimezone,
expected: "\"" + localTimezone.String() + "\"",
expectedHasErr: false,
},
{
name: "sink-uri timezone different from server timezone",
noChangefeedTimezone: false,
changefeedTimezone: "UTC",
serverTimezone: localTimezone,
expectedHasErr: true,
expectedErr: "Please make sure that the timezone of the TiCDC server",
},
{
name: "unsupported timezone format",
noChangefeedTimezone: false,
changefeedTimezone: "%2B08%3A00", // +08:00
serverTimezone: time.UTC,
expectedHasErr: true,
expectedErr: "unknown time zone +08:00",
},
} {
tc := test
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

cfg := NewConfig()
ctx := contextutil.PutTimezoneInCtx(context.Background(), tc.serverTimezone)
sinkURI := "mysql://127.0.0.1:3306"
if !tc.noChangefeedTimezone {
sinkURI = sinkURI + "?time-zone=" + tc.changefeedTimezone
}
uri, err := url.Parse(sinkURI)
require.Nil(t, err)
err = cfg.Apply(ctx,
model.DefaultChangeFeedID("changefeed-01"), uri, config.GetDefaultReplicaConfig())
if tc.expectedHasErr {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.expectedErr)
} else {
require.Nil(t, err)
require.Equal(t, tc.expected, cfg.Timezone)
}
})
}
}
12 changes: 12 additions & 0 deletions pkg/util/tz.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/util/timeutil"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

// GetTimezone returns the timezone specified by the name
Expand All @@ -28,9 +30,19 @@ func GetTimezone(name string) (tz *time.Location, err error) {
case "", "system", "local":
tz, err = GetLocalTimezone()
err = cerror.WrapError(cerror.ErrLoadTimezone, err)
if err == nil {
log.Info("Use the timezone of the TiCDC server machine",
zap.String("timezoneName", name),
zap.String("timezone", tz.String()))
}
default:
tz, err = time.LoadLocation(name)
err = cerror.WrapError(cerror.ErrLoadTimezone, err)
if err == nil {
log.Info("Load the timezone specified by the user",
zap.String("timezoneName", name),
zap.String("timezone", tz.String()))
}
}
return
}
Expand Down