Skip to content

Commit

Permalink
test: add integration test of tidb/ticdc (risingwavelabs#8708)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Mar 23, 2023
1 parent 3a0ee3d commit ad61a71
Show file tree
Hide file tree
Showing 14 changed files with 1,158 additions and 14 deletions.
2 changes: 1 addition & 1 deletion integration_tests/datagen/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type MysqlSink struct {
func OpenMysqlSink(cfg MysqlConfig) (*MysqlSink, error) {
fmt.Printf("Opening MySQL sink: %+v\n", cfg)

db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s",
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?multiStatements=true",
cfg.DbUser, cfg.DbPassword, cfg.DbHost, cfg.DbPort, cfg.Database))
if err != nil {
return nil, err
Expand Down
14 changes: 4 additions & 10 deletions integration_tests/datagen/twitter/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ type twitterUser struct {
}

func (r *twitterEvent) ToPostgresSql() string {
return fmt.Sprintf("INSERT INTO %s (data, author) values (%s, %s);",
"twitter", r.Data.objectString(), r.Author.objectString())
}

func (r *twitterUser) objectString() string {
return fmt.Sprintf("('%s'::TIMESTAMP, '%s', '%s', '%s')", r.CreatedAt, r.Id, r.Name, r.UserName)
}

func (r *tweetData) objectString() string {
return fmt.Sprintf("('%s'::TIMESTAMP, '%s', '%s', '%s')", r.CreatedAt, r.Id, r.Text, r.Lang)
return fmt.Sprintf("INSERT INTO tweet (created_at, id, text, lang, author_id) values ('%s', '%s', '%s', '%s', '%s'); INSERT INTO user (created_at, id, name, username, followers) values ('%s', '%s', '%s', '%s', %d);",
r.Data.CreatedAt, r.Data.Id, r.Data.Text, r.Data.Lang, r.Author.Id,
r.Author.CreatedAt, r.Author.Id, r.Author.Name, r.Author.UserName, r.Author.Followers,
)
}

func (r *twitterEvent) ToJson() (topic string, key string, data []byte) {
Expand Down
1 change: 1 addition & 0 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ FROM
bhv_mv WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'user_id, target_id, event_timestamp',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def run_demo(demo: str, format: str):
demo_dir = os.path.join(project_dir, demo)
print("Running demo: {}".format(demo))

subprocess.run(["docker", "compose", "up", "-d", "-e",
"ENABLE_TELEMETRY=false"], cwd=demo_dir, check=True)
subprocess.run(["docker", "compose", "up", "-d"], cwd=demo_dir, check=True)
sleep(40)

sql_files = ['create_source.sql', 'create_mv.sql', 'query.sql']
Expand All @@ -50,7 +49,7 @@ def run_iceberg_demo():
demo_dir = os.path.join(project_dir, demo)
print("Running demo: iceberg-sink")

subprocess.run(["docker", "compose", "up", "-d", "-e", "ENABLE_TELEMETRY=false"], cwd=demo_dir, check=True)
subprocess.run(["docker", "compose", "up", "-d"], cwd=demo_dir, check=True)
sleep(40)

subprocess.run(["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "create-table"],
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/tidb-cdc-sink/config/changefeed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "ticdc_{schema}_{table}"},
]
86 changes: 86 additions & 0 deletions integration_tests/tidb-cdc-sink/config/pd.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# PD Configuration.

name = "pd"
data-dir = "default.pd"

client-urls = "http://127.0.0.1:2379"
# if not set, use ${client-urls}
advertise-client-urls = ""

peer-urls = "http://127.0.0.1:2380"
# if not set, use ${peer-urls}
advertise-peer-urls = ""

initial-cluster = "pd=http://127.0.0.1:2380"
initial-cluster-state = "new"

lease = 3
tso-save-interval = "3s"

[security]
# Path of file that contains list of trusted SSL CAs. if set, following four settings shouldn't be empty
cacert-path = ""
# Path of file that contains X509 certificate in PEM format.
cert-path = ""
# Path of file that contains X509 key in PEM format.
key-path = ""

[log]
level = "error"

# log format, one of json, text, console
#format = "text"

# disable automatic timestamps in output
#disable-timestamp = false

# file logging
[log.file]
#filename = ""
# max log file size in MB
#max-size = 300
# max log file keep days
#max-days = 28
# maximum number of old log files to retain
#max-backups = 7
# rotate log by day
#log-rotate = true

[metric]
# prometheus client push interval, set "0s" to disable prometheus.
interval = "15s"
# prometheus pushgateway address, leaves it empty will disable prometheus.
address = "prometheus-0:9091"

[schedule]
max-merge-region-size = 0
split-merge-interval = "1h"
max-snapshot-count = 3
max-pending-peer-count = 16
max-store-down-time = "30m"
leader-schedule-limit = 4
region-schedule-limit = 4
replica-schedule-limit = 8
merge-schedule-limit = 8
tolerant-size-ratio = 5.0

# customized schedulers, the format is as below
# if empty, it will use balance-leader, balance-region, hot-region as default
# [[schedule.schedulers]]
# type = "evict-leader"
# args = ["1"]

[replication]
# The number of replicas for each region.
max-replicas = 3
# The label keys specified the location of a store.
# The placement priorities is implied by the order of label keys.
# For example, ["zone", "rack"] means that we should place replicas to
# different zones first, then to different racks if we don't have enough zones.
location-labels = []

[label-property]
# Do not assign region leaders to stores that have these tags.
# [[label-property.reject-leader]]
# key = "zone"
# value = "cn1
239 changes: 239 additions & 0 deletions integration_tests/tidb-cdc-sink/config/tidb.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# TiDB Configuration.

# TiDB server host.
host = "0.0.0.0"

# TiDB server port.
port = 4000

# Registered store name, [tikv, mocktikv]
store = "mocktikv"

# TiDB storage path.
path = "/tmp/tidb"

# The socket file to use for connection.
socket = ""

# Run ddl worker on this tidb-server.
run-ddl = true

# Schema lease duration, very dangerous to change only if you know what you do.
lease = "0"

# When create table, split a separated region for it. It is recommended to
# turn off this option if there will be a large number of tables created.
split-table = true

# The limit of concurrent executed sessions.
token-limit = 1000

# Only print a log when out of memory quota.
# Valid options: ["log", "cancel"]
oom-action = "log"

# Set the memory quota for a query in bytes. Default: 32GB
mem-quota-query = 34359738368

# Enable coprocessor streaming.
enable-streaming = false

# Set system variable 'lower_case_table_names'
lower-case-table-names = 2

[log]
# Log level: debug, info, warn, error, fatal.
level = "error"

# Log format, one of json, text, console.
format = "text"

# Disable automatic timestamp in output
disable-timestamp = false

# Stores slow query log into separated files.
slow-query-file = ""

# Queries with execution time greater than this value will be logged. (Milliseconds)
slow-threshold = 300

# Queries with internal result greater than this value will be logged.
expensive-threshold = 10000

# Maximum query length recorded in log.
query-log-max-len = 2048

# File logging.
[log.file]
# Log file name.
filename = ""

# Max log file size in MB (upper limit to 4096MB).
max-size = 300

# Max log file keep days. No clean up by default.
max-days = 0

# Maximum number of old log files to retain. No clean up by default.
max-backups = 0

# Rotate log by day
log-rotate = true

[security]
# Path of file that contains list of trusted SSL CAs for connection with mysql client.
ssl-ca = ""

# Path of file that contains X509 certificate in PEM format for connection with mysql client.
ssl-cert = ""

# Path of file that contains X509 key in PEM format for connection with mysql client.
ssl-key = ""

# Path of file that contains list of trusted SSL CAs for connection with cluster components.
cluster-ssl-ca = ""

# Path of file that contains X509 certificate in PEM format for connection with cluster components.
cluster-ssl-cert = ""

# Path of file that contains X509 key in PEM format for connection with cluster components.
cluster-ssl-key = ""

[status]
# If enable status report HTTP service.
report-status = true

# TiDB status port.
status-port = 10080

# Prometheus pushgateway address, leaves it empty will disable prometheus push.
metrics-addr = "prometheus-0:9091"

# Prometheus client push interval in second, set \"0\" to disable prometheus push.
metrics-interval = 15

[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0
# StmtCountLimit limits the max count of statement inside a transaction.
stmt-count-limit = 5000

# Set keep alive option for tcp connection.
tcp-keep-alive = true

# The maximum number of retries when commit a transaction.
retry-limit = 10

# Whether support cartesian product.
cross-join = true

# Stats lease duration, which influences the time of analyze and stats load.
stats-lease = "3s"

# Run auto analyze worker on this tidb-server.
run-auto-analyze = true

# Probability to use the query feedback to update stats, 0 or 1 for always false/true.
feedback-probability = 0.0

# The max number of query feedback that cache in memory.
query-feedback-limit = 1024

# Pseudo stats will be used if the ratio between the modify count and
# row count in statistics of a table is greater than it.
pseudo-estimate-ratio = 0.7

[proxy-protocol]
# PROXY protocol acceptable client networks.
# Empty string means disable PROXY protocol, * means all networks.
networks = ""

# PROXY protocol header read timeout, unit is second
header-timeout = 5

[plan-cache]
enabled = false
capacity = 2560
shards = 256

[prepared-plan-cache]
enabled = false
capacity = 100

[opentracing]
# Enable opentracing.
enable = false

# Whether to enable the rpc metrics.
rpc-metrics = false

[opentracing.sampler]
# Type specifies the type of the sampler: const, probabilistic, rateLimiting, or remote
type = "const"

# Param is a value passed to the sampler.
# Valid values for Param field are:
# - for "const" sampler, 0 or 1 for always false/true respectively
# - for "probabilistic" sampler, a probability between 0 and 1
# - for "rateLimiting" sampler, the number of spans per second
# - for "remote" sampler, param is the same as for "probabilistic"
# and indicates the initial sampling rate before the actual one
# is received from the mothership
param = 1.0

# SamplingServerURL is the address of jaeger-agent's HTTP sampling server
sampling-server-url = ""

# MaxOperations is the maximum number of operations that the sampler
# will keep track of. If an operation is not tracked, a default probabilistic
# sampler will be used rather than the per operation specific sampler.
max-operations = 0

# SamplingRefreshInterval controls how often the remotely controlled sampler will poll
# jaeger-agent for the appropriate sampling strategy.
sampling-refresh-interval = 0

[opentracing.reporter]
# QueueSize controls how many spans the reporter can keep in memory before it starts dropping
# new spans. The queue is continuously drained by a background go-routine, as fast as spans
# can be sent out of process.
queue-size = 0

# BufferFlushInterval controls how often the buffer is force-flushed, even if it's not full.
# It is generally not useful, as it only matters for very low traffic services.
buffer-flush-interval = 0

# LogSpans, when true, enables LoggingReporter that runs in parallel with the main reporter
# and logs all submitted spans. Main Configuration.Logger must be initialized in the code
# for this option to have any effect.
log-spans = false

# LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
local-agent-host-port = ""

[tikv-client]
# Max gRPC connections that will be established with each tikv-server.
grpc-connection-count = 16

# After a duration of this time in seconds if the client doesn't see any activity it pings
# the server to see if the transport is still alive.
grpc-keepalive-time = 10

# After having pinged for keepalive check, the client waits for a duration of Timeout in seconds
# and if no activity is seen even after that the connection is closed.
grpc-keepalive-timeout = 3

# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

[binlog]

# Socket file to write binlog.
binlog-socket = ""

# WriteTimeout specifies how long it will wait for writing binlog to pump.
write-timeout = "15s"

# If IgnoreError is true, when writing binlog meets error, TiDB would stop writing binlog,
# but still provide service.
ignore-error = false
Loading

0 comments on commit ad61a71

Please sign in to comment.