Skip to content

Commit

Permalink
sink/(ticdc): Restructure codec files (#6796)
Browse files Browse the repository at this point in the history
ref #6797
  • Loading branch information
zhaoxinyu authored Aug 22, 2022
1 parent ff78bf9 commit f8f1df2
Show file tree
Hide file tree
Showing 75 changed files with 1,372 additions and 1,272 deletions.
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestFixSinkProtocolIncompatible(t *testing.T) {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), "ErrMQSinkUnknownProtocol")
require.Contains(t, err.Error(), "ErrSinkUnknownProtocol")
}
}
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestFixSinkProtocol(t *testing.T) {
require.Equal(t, tc.expectedProtocol, protocol)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), "ErrMQSinkUnknownProtocol")
require.Contains(t, err.Error(), "ErrSinkUnknownProtocol")
}
}

Expand Down
87 changes: 45 additions & 42 deletions cdc/sink/mq/codec/avro.go → cdc/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package codec
package avro

import (
"bytes"
Expand All @@ -30,18 +30,20 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

// AvroEventBatchEncoder converts the events to binary Avro data
type AvroEventBatchEncoder struct {
// BatchEncoder converts the events to binary Avro data
type BatchEncoder struct {
namespace string
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
resultBuf []*MQMessage
keySchemaManager *schemaManager
valueSchemaManager *schemaManager
resultBuf []*common.Message
maxMessageBytes int

enableTiDBExtension bool
Expand All @@ -56,14 +58,14 @@ type avroEncodeResult struct {

// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
func (a *BatchEncoder) AppendRowChangedEvent(
ctx context.Context,
topic string,
e *model.RowChangedEvent,
_ func(),
) error {
log.Debug("AppendRowChangedEvent", zap.Any("rowChangedEvent", e))
mqMessage := newMsg(
message := common.NewMsg(
config.ProtocolAvro,
nil,
nil,
Expand All @@ -87,9 +89,9 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
return errors.Trace(err)
}

mqMessage.Value = evlp
message.Value = evlp
} else {
mqMessage.Value = nil
message.Value = nil
}

res, err := a.avroEncode(ctx, e, topic, true)
Expand All @@ -104,42 +106,42 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
log.Error("AppendRowChangedEvent: could not construct Avro envelope", zap.Error(err))
return errors.Trace(err)
}
mqMessage.Key = evlp
message.Key = evlp
} else {
mqMessage.Key = nil
message.Key = nil
}
mqMessage.IncRowsCount()
message.IncRowsCount()

if mqMessage.Length() > a.maxMessageBytes {
if message.Length() > a.maxMessageBytes {
log.Error(
"Single message too large",
zap.Int(
"maxMessageBytes",
a.maxMessageBytes,
),
zap.Int("length", mqMessage.Length()),
zap.Int("length", message.Length()),
zap.Any("table", e.Table),
)
return cerror.ErrAvroEncodeFailed.GenWithStackByArgs()
}

a.resultBuf = append(a.resultBuf, mqMessage)
a.resultBuf = append(a.resultBuf, message)

return nil
}

// EncodeCheckpointEvent is no-op for now
func (a *AvroEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
func (a *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error) {
return nil, nil
}

// EncodeDDLEvent is no-op now
func (a *AvroEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) {
func (a *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) {
return nil, nil
}

// Build MQ Messages
func (a *AvroEventBatchEncoder) Build() (mqMessages []*MQMessage) {
// Build Messages
func (a *BatchEncoder) Build() (messages []*common.Message) {
old := a.resultBuf
a.resultBuf = nil
return old
Expand All @@ -150,7 +152,7 @@ const (
updateOperation = "u"
)

func (a *AvroEventBatchEncoder) avroEncode(
func (a *BatchEncoder) avroEncode(
ctx context.Context,
e *model.RowChangedEvent,
topic string,
Expand All @@ -160,7 +162,7 @@ func (a *AvroEventBatchEncoder) avroEncode(
cols []*model.Column
colInfos []rowcodec.ColInfo
enableTiDBExtension bool
schemaManager *AvroSchemaManager
schemaManager *schemaManager
operation string
)
if isKey {
Expand Down Expand Up @@ -526,7 +528,7 @@ func columnToAvroSchema(
Parameters: map[string]string{tidbType: tt},
}, nil
case mysql.TypeLonglong: // BIGINT
if col.Flag.IsUnsigned() && bigintUnsignedHandlingMode == bigintUnsignedHandlingModeString {
if col.Flag.IsUnsigned() && bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
return avroSchema{
Type: "string",
Parameters: map[string]string{tidbType: tt},
Expand Down Expand Up @@ -554,7 +556,7 @@ func columnToAvroSchema(
},
}, nil
case mysql.TypeNewDecimal:
if decimalHandlingMode == decimalHandlingModePrecise {
if decimalHandlingMode == common.DecimalHandlingModePrecise {
defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
displayFlen, displayDecimal := ft.GetFlen(), ft.GetDecimal()
// length not specified, set it to system type default
Expand Down Expand Up @@ -673,7 +675,7 @@ func columnToAvroData(
case mysql.TypeLonglong:
if v, ok := col.Value.(string); ok {
if col.Flag.IsUnsigned() {
if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeString {
if bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString {
return v, "string", nil
}
n, err := strconv.ParseUint(v, 10, 64)
Expand All @@ -689,7 +691,7 @@ func columnToAvroData(
return n, "long", nil
}
if col.Flag.IsUnsigned() {
if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeLong {
if bigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeLong {
return int64(col.Value.(uint64)), "long", nil
}
// bigintUnsignedHandlingMode == "string"
Expand All @@ -711,7 +713,7 @@ func columnToAvroData(
}
return []byte(types.NewBinaryLiteralFromUint(col.Value.(uint64), -1)), "bytes", nil
case mysql.TypeNewDecimal:
if decimalHandlingMode == decimalHandlingModePrecise {
if decimalHandlingMode == common.DecimalHandlingModePrecise {
v, succ := new(big.Rat).SetString(col.Value.(string))
if !succ {
return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack(
Expand Down Expand Up @@ -793,23 +795,24 @@ func (r *avroEncodeResult) toEnvelope() ([]byte, error) {
return buf.Bytes(), nil
}

type avroEventBatchEncoderBuilder struct {
type batchEncoderBuilder struct {
namespace string
config *Config
keySchemaManager *AvroSchemaManager
valueSchemaManager *AvroSchemaManager
config *common.Config
keySchemaManager *schemaManager
valueSchemaManager *schemaManager
}

const (
keySchemaSuffix = "-key"
valueSchemaSuffix = "-value"
)

func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (EncoderBuilder, error) {
// NewBatchEncoderBuilder creates a avro batchEncoderBuilder.
func NewBatchEncoderBuilder(ctx context.Context, config *common.Config) (codec.EncoderBuilder, error) {
keySchemaManager, err := NewAvroSchemaManager(
ctx,
nil,
config.avroSchemaRegistry,
config.AvroSchemaRegistry,
keySchemaSuffix,
)
if err != nil {
Expand All @@ -819,14 +822,14 @@ func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (Encod
valueSchemaManager, err := NewAvroSchemaManager(
ctx,
nil,
config.avroSchemaRegistry,
config.AvroSchemaRegistry,
valueSchemaSuffix,
)
if err != nil {
return nil, errors.Trace(err)
}

return &avroEventBatchEncoderBuilder{
return &batchEncoderBuilder{
namespace: contextutil.ChangefeedIDFromCtx(ctx).Namespace,
config: config,
keySchemaManager: keySchemaManager,
Expand All @@ -835,16 +838,16 @@ func newAvroEventBatchEncoderBuilder(ctx context.Context, config *Config) (Encod
}

// Build an AvroEventBatchEncoder.
func (b *avroEventBatchEncoderBuilder) Build() EventBatchEncoder {
encoder := &AvroEventBatchEncoder{}
func (b *batchEncoderBuilder) Build() codec.EventBatchEncoder {
encoder := &BatchEncoder{}
encoder.namespace = b.namespace
encoder.keySchemaManager = b.keySchemaManager
encoder.valueSchemaManager = b.valueSchemaManager
encoder.resultBuf = make([]*MQMessage, 0, 4096)
encoder.maxMessageBytes = b.config.maxMessageBytes
encoder.enableTiDBExtension = b.config.enableTiDBExtension
encoder.decimalHandlingMode = b.config.avroDecimalHandlingMode
encoder.bigintUnsignedHandlingMode = b.config.avroBigintUnsignedHandlingMode
encoder.resultBuf = make([]*common.Message, 0, 4096)
encoder.maxMessageBytes = b.config.MaxMessageBytes
encoder.enableTiDBExtension = b.config.EnableTiDBExtension
encoder.decimalHandlingMode = b.config.AvroDecimalHandlingMode
encoder.bigintUnsignedHandlingMode = b.config.AvroBigintUnsignedHandlingMode

return encoder
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package codec
package avro

import (
"bytes"
Expand All @@ -26,14 +26,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/stretchr/testify/require"
)

func setupEncoderAndSchemaRegistry(
enableTiDBExtension bool,
decimalHandlingMode string,
bigintUnsignedHandlingMode string,
) (*AvroEventBatchEncoder, error) {
) (*BatchEncoder, error) {
startHTTPInterceptForTestingRegistry()

keyManager, err := NewAvroSchemaManager(
Expand All @@ -56,11 +57,11 @@ func setupEncoderAndSchemaRegistry(
return nil, err
}

return &AvroEventBatchEncoder{
return &BatchEncoder{
namespace: model.DefaultNamespace,
valueSchemaManager: valueManager,
keySchemaManager: keyManager,
resultBuf: make([]*MQMessage, 0, 4096),
resultBuf: make([]*common.Message, 0, 4096),
maxMessageBytes: math.MaxInt,
enableTiDBExtension: enableTiDBExtension,
decimalHandlingMode: decimalHandlingMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package codec
package avro

var expectedSchemaWithoutExtension = `{
"type": "record",
Expand Down
Loading

0 comments on commit f8f1df2

Please sign in to comment.