Skip to content

Commit

Permalink
tests(ticdc): add tests for kafka max-message-bytes (#4125) (#4156)
Browse files Browse the repository at this point in the history
close #4124
  • Loading branch information
ti-chi-bot authored Feb 9, 2022
1 parent 3522f86 commit 4f19f29
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 3 deletions.
145 changes: 145 additions & 0 deletions testing_utils/gen_kafka_big_messages/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"fmt"
"log"
"os"
"strings"

"github.com/pingcap/errors"
)

// See: https://docs.pingcap.com/tidb/stable/tidb-limitations/#limitations-on-string-types
const varcharColumnMaxLen = 16383

// Value of col. Defined as a variable for testing.
var colValue = strings.Repeat("a", varcharColumnMaxLen)

type options struct {
// The size of each row.
// The default is 1MiB.
// FIXME: Currently it does not have precise control over the size of each row.
// The overhead needs to be calculated and processed accurately.
rowBytes int
// Total number of rows.
// The default is 1 line.
rowCount int
// Sql file path.
// The default is `./test.sql`.
sqlFilePath string
// Database name.
// The default is `kafka_big_messages`.
databaseName string
// Table name.
// The default is `test`.
tableName string
}

func (o *options) validate() error {
if o.rowBytes <= 0 {
return errors.New("rowBytes must be greater than zero")
}

if o.rowCount <= 0 {
return errors.New("rowCount must be greater than zero")
}

if o.sqlFilePath == "" {
return errors.New("please specify the correct file path")
}

if o.databaseName == "" {
return errors.New("please specify the database name")
}

if o.tableName == "" {
return errors.New("please specify the table name")
}

return nil
}

func gatherOptions() *options {
o := &options{}

fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.IntVar(&o.rowBytes, "row-bytes", 1024*1024, "Number of bytes per row.")
fs.IntVar(&o.rowCount, "row-count", 1, "Count of rows.")
fs.StringVar(&o.sqlFilePath, "sql-file-path", "./test.sql", "Sql file path.")
fs.StringVar(&o.databaseName, "database-name", "kafka_big_messages", "Database name.")
fs.StringVar(&o.tableName, "table-name", "test", "Table name.")

_ = fs.Parse(os.Args[1:])
return o
}

func main() {
o := gatherOptions()
if err := o.validate(); err != nil {
log.Panicf("Invalid options: %v", err)
}

file, err := os.OpenFile(o.sqlFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Panicf("Open sql file failed: %v", err)
}

_, err = file.Write([]byte(genDatabaseSQL(o.databaseName)))
if err != nil {
log.Panicf("Wirte create database sql failed: %v", err)
}

_, err = file.Write([]byte(genCreateTableSQL(o.rowBytes, o.tableName)))
if err != nil {
log.Panicf("Wirte create table sql failed: %v", err)
}

for i := 0; i < o.rowCount; i++ {
_, err = file.Write([]byte(genInsertSQL(o.rowBytes, o.tableName, i)))
if err != nil {
log.Panicf("Wirte insert sql failed: %v", err)
}
}
}

func genDatabaseSQL(databaseName string) string {
return fmt.Sprintf(`DROP DATABASE IF EXISTS %s;
CREATE DATABASE %s;
USE %s;
`, databaseName, databaseName, databaseName)
}

func genCreateTableSQL(rawBytes int, tableName string) string {
var cols string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
cols = fmt.Sprintf("%s, a%d VARCHAR(%d)", cols, i, varcharColumnMaxLen)
}

return fmt.Sprintf("CREATE TABLE %s(id int primary key %s);\n", tableName, cols)
}

func genInsertSQL(rawBytes int, tableName string, id int) string {
var values string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
values = fmt.Sprintf("%s, '%s'", values, colValue)
}

return fmt.Sprintf("INSERT INTO %s VALUES (%d%s);\n", tableName, id, values)
}
121 changes: 121 additions & 0 deletions testing_utils/gen_kafka_big_messages/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateOptions(t *testing.T) {
testCases := []struct {
o *options
expectedErr string
}{
{
&options{
rowBytes: 0,
},
".*rowBytes must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 0,
},
".*rowCount must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "",
},
".*please specify the correct file path.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "",
},
".*please specify the database name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "",
},
".*please specify the table name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 10,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "test",
},
"",
},
}

for _, tc := range testCases {
err := tc.o.validate()
if tc.expectedErr != "" {
require.Error(t, err)
require.Regexp(t, tc.expectedErr, tc.o.validate().Error())
} else {
require.Nil(t, err)
}
}
}

func TestGenDatabaseSQL(t *testing.T) {
database := "test"

sql := genDatabaseSQL(database)

require.Equal(t, "DROP DATABASE IF EXISTS test;\nCREATE DATABASE test;\nUSE test;\n\n", sql)
}

func TestGenCreateTableSQL(t *testing.T) {
rawBytes := varcharColumnMaxLen
tableName := "test"

sql := genCreateTableSQL(rawBytes, tableName)
require.Equal(t, "CREATE TABLE test(id int primary key , a0 VARCHAR(16383));\n", sql)
}

func TestGenInsertSQL(t *testing.T) {
// Override the col value to test.
oldColValue := colValue
colValue = "a"
defer func() {
colValue = oldColValue
}()

rawBytes := varcharColumnMaxLen
tableName := "test"
id := 1

sql := genInsertSQL(rawBytes, tableName, id)
println(sql)
require.Equal(t, "INSERT INTO test VALUES (1, 'a');\n", sql)
}
29 changes: 29 additions & 0 deletions tests/integration_tests/kafka_big_messages/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/kafka_big_messages/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["kafka_big_messages.test"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
55 changes: 55 additions & 0 deletions tests/integration_tests/kafka_big_messages/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash

set -e

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" == "mysql" ]; then
return
fi
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

# Use a max-message-bytes parameter that is larger than the kafka topic max message bytes.
# Test if TiCDC automatically uses the max-message-bytes of the topic.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178
# Use a topic that has already been created.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L180
SINK_URI="kafka://127.0.0.1:9092/big-message-test?partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/big-message-test?partition-num=1&version=${KAFKA_VERSION}"

echo "Starting generate kafka big messages..."
cd $CUR/../../../testing_utils/gen_kafka_big_messages
if [ ! -f ./gen_kafka_big_messages ]; then
GO111MODULE=on go build
fi
# Generate data larger than kafka broker max.message.bytes. We can send this data correctly.
./gen_kafka_big_messages --row-count=15 --sql-file-path=$CUR/test.sql

run_sql_file $CUR/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
table="kafka_big_messages.test"
check_table_exists $table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export-fix-sql = true
check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/kafka_message/sync_diff/output"
output-dir = "/tmp/tidb_cdc_test/kafka_messages/sync_diff/output"

source-instances = ["mysql1"]

Expand Down
7 changes: 5 additions & 2 deletions tests/integration_tests/kafka_messages/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ function run_length_limit() {
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "info"

TOPIC_NAME="ticdc-kafka-message-test-$RANDOM"
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760"
# Use a max-message-bytes parameter that is larger than the kafka broker max message bytes.
# Test if TiCDC automatically uses the max-message-bytes of the broker.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178
SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
fi

# Add a check table to reduce check time, or if we check data with sync diff
Expand Down

0 comments on commit 4f19f29

Please sign in to comment.