Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4125
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Dec 30, 2021
1 parent 2ed77d8 commit c9cc899
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
version: '2.1'

services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka:2.12-2.4.1
ports:
- "9092:9092"
environment:
KAFKA_MESSAGE_MAX_BYTES: 11534336
KAFKA_REPLICA_FETCH_MAX_BYTES: 11534336
KAFKA_CREATE_TOPICS: "big-message-test:1:1"
KAFKA_BROKER_ID: 1
RACK_COMMAND: "curl -sfL https://git.io/JJZXX -o /tmp/kafka.server.keystore.jks && curl -sfL https://git.io/JJZXM -o /tmp/kafka.server.truststore.jks"
KAFKA_LISTENERS: "SSL://127.0.0.1:9093,PLAINTEXT://127.0.0.1:9092"
KAFKA_ADVERTISED_LISTENERS: "SSL://127.0.0.1:9093,PLAINTEXT://127.0.0.1:9092"
KAFKA_SSL_KEYSTORE_LOCATION: "/tmp/kafka.server.keystore.jks"
KAFKA_SSL_KEYSTORE_PASSWORD: "test1234"
KAFKA_SSL_KEY_PASSWORD: "test1234"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/tmp/kafka.server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "test1234"
ZK: "zk"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- "zookeeper"

kafka_integration_test:
build:
context: ../../../
dockerfile: ./deployments/ticdc/docker/integration-test.Dockerfile
depends_on:
- "zookeeper"
- "kafka"
command:
- bash
- -c
- make integration_test_kafka CASE="${CASE}" & tail -f /dev/null
network_mode: "service:kafka"
volumes:
- ./logs/tidb_cdc_test:/tmp/tidb_cdc_test
145 changes: 145 additions & 0 deletions testing_utils/gen_kafka_big_messages/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"fmt"
"log"
"os"
"strings"

"github.com/pingcap/errors"
)

// See: https://docs.pingcap.com/tidb/stable/tidb-limitations/#limitations-on-string-types
const varcharColumnMaxLen = 16383

// Value of col. Defined as a variable for testing.
var colValue = strings.Repeat("a", varcharColumnMaxLen)

type options struct {
// The size of each row.
// The default is 1MiB.
// FIXME: Currently it does not have precise control over the size of each row.
// The overhead needs to be calculated and processed accurately.
rowBytes int
// Total number of rows.
// The default is 1 line.
rowCount int
// Sql file path.
// The default is `./test.sql`.
sqlFilePath string
// Database name.
// The default is `kafka_big_messages`.
databaseName string
// Table name.
// The default is `test`.
tableName string
}

func (o *options) validate() error {
if o.rowBytes <= 0 {
return errors.New("rowBytes must be greater than zero")
}

if o.rowCount <= 0 {
return errors.New("rowCount must be greater than zero")
}

if o.sqlFilePath == "" {
return errors.New("please specify the correct file path")
}

if o.databaseName == "" {
return errors.New("please specify the database name")
}

if o.tableName == "" {
return errors.New("please specify the table name")
}

return nil
}

func gatherOptions() *options {
o := &options{}

fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
fs.IntVar(&o.rowBytes, "row-bytes", 1024*1024, "Number of bytes per row.")
fs.IntVar(&o.rowCount, "row-count", 1, "Count of rows.")
fs.StringVar(&o.sqlFilePath, "sql-file-path", "./test.sql", "Sql file path.")
fs.StringVar(&o.databaseName, "database-name", "kafka_big_messages", "Database name.")
fs.StringVar(&o.tableName, "table-name", "test", "Table name.")

_ = fs.Parse(os.Args[1:])
return o
}

func main() {
o := gatherOptions()
if err := o.validate(); err != nil {
log.Panicf("Invalid options: %v", err)
}

file, err := os.OpenFile(o.sqlFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
if err != nil {
log.Panicf("Open sql file failed: %v", err)
}

_, err = file.Write([]byte(genDatabaseSql(o.databaseName)))
if err != nil {
log.Panicf("Wirte create database sql failed: %v", err)
}

_, err = file.Write([]byte(genCreateTableSql(o.rowBytes, o.tableName)))
if err != nil {
log.Panicf("Wirte create table sql failed: %v", err)
}

for i := 0; i < o.rowCount; i++ {
_, err = file.Write([]byte(genInsertSql(o.rowBytes, o.tableName, i)))
if err != nil {
log.Panicf("Wirte insert sql failed: %v", err)
}
}
}

func genDatabaseSql(databaseName string) string {
return fmt.Sprintf(`DROP DATABASE IF EXISTS %s;
CREATE DATABASE %s;
USE %s;
`, databaseName, databaseName, databaseName)
}

func genCreateTableSql(rawBytes int, tableName string) string {
var cols string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
cols = fmt.Sprintf("%s, a%d VARCHAR(%d)", cols, i, varcharColumnMaxLen)
}

return fmt.Sprintf("CREATE TABLE %s(id int primary key %s);\n", tableName, cols)
}

func genInsertSql(rawBytes int, tableName string, id int) string {
var values string

for i := 0; i < rawBytes/varcharColumnMaxLen; i++ {
values = fmt.Sprintf("%s, '%s'", values, colValue)
}

return fmt.Sprintf("INSERT INTO %s VALUES (%d%s);\n", tableName, id, values)
}
121 changes: 121 additions & 0 deletions testing_utils/gen_kafka_big_messages/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestValidateOptions(t *testing.T) {
testCases := []struct {
o *options
expectedErr string
}{
{
&options{
rowBytes: 0,
},
".*rowBytes must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 0,
},
".*rowCount must be greater than zero.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "",
},
".*please specify the correct file path.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "",
},
".*please specify the database name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 1,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "",
},
".*please specify the table name.*",
},
{
&options{
rowBytes: 1024 * 1024,
rowCount: 10,
sqlFilePath: "./test.sql",
databaseName: "kafka-big-messages",
tableName: "test",
},
"",
},
}

for _, tc := range testCases {
err := tc.o.validate()
if tc.expectedErr != "" {
require.Error(t, err)
require.Regexp(t, tc.expectedErr, tc.o.validate().Error())
} else {
require.Nil(t, err)
}
}
}

func TestGenDatabaseSql(t *testing.T) {
database := "test"

sql := genDatabaseSql(database)

require.Equal(t, "DROP DATABASE IF EXISTS test;\nCREATE DATABASE test;\nUSE test;\n\n", sql)
}

func TestGenCreateTableSql(t *testing.T) {
rawBytes := varcharColumnMaxLen
tableName := "test"

sql := genCreateTableSql(rawBytes, tableName)
require.Equal(t, "CREATE TABLE test(id int primary key , a0 VARCHAR(16383));\n", sql)
}

func TestGenInsertSql(t *testing.T) {
// Override the col value to test.
oldColValue := colValue
colValue = "a"
defer func() {
colValue = oldColValue
}()

rawBytes := varcharColumnMaxLen
tableName := "test"
id := 1

sql := genInsertSql(rawBytes, tableName, id)
println(sql)
require.Equal(t, "INSERT INTO test VALUES (1, 'a');\n", sql)
}
29 changes: 29 additions & 0 deletions tests/integration_tests/kafka_big_messages/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/kafka_big_messages/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["kafka_big_messages.test"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
55 changes: 55 additions & 0 deletions tests/integration_tests/kafka_big_messages/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash

set -e

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
# test kafka sink only in this case
if [ "$SINK_TYPE" == "mysql" ]; then
return
fi
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

# Use a max-message-bytes parameter that is larger than the kafka topic max message bytes.
# Test if TiCDC automatically uses the max-message-bytes of the topic.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L178
# Use a topic that has already been created.
# See: https://github.com/PingCAP-QE/ci/blob/ddde195ebf4364a0028d53405d1194aa37a4d853/jenkins/pipelines/ci/ticdc/cdc_ghpr_kafka_integration_test.groovy#L180
SINK_URI="kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&kafka-version=${KAFKA_VERSION}&max-message-bytes=12582912"
cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/big-message-test?protocol=open-protocol&partition-num=1&version=${KAFKA_VERSION}"

echo "Starting generate kafka big messages..."
cd $CUR/../../utils/gen_kafka_big_messages
if [ ! -f ./gen_kafka_big_messages ]; then
GO111MODULE=on go build
fi
# Generate data larger than kafka broker max.message.bytes. We can send this data correctly.
./gen_kafka_big_messages --row-count=15 --sql-file-path=$CUR/test.sql

run_sql_file $CUR/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
table="kafka_big_messages.test"
check_table_exists $table ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Loading

0 comments on commit c9cc899

Please sign in to comment.