diff --git a/drainer/util.go b/drainer/util.go index 9543cdf3c..50c0a67c9 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -33,7 +33,14 @@ import ( ) const ( +<<<<<<< HEAD maxMsgSize = 1024 * 1024 * 1024 +======= + maxKafkaMsgSize = 1 << 30 + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + maxGrpcMsgSize = int(^uint(0)>>1) - 4*1024*1024 +>>>>>>> 9992784c (*: fix drainer can't send request correctly to pump when set compressor=gzip (#1186)) ) // taskGroup is a wrapper of `sync.WaitGroup`. diff --git a/pump/config.go b/pump/config.go index d9d35d9c1..ba52c8be9 100644 --- a/pump/config.go +++ b/pump/config.go @@ -32,10 +32,19 @@ import ( ) const ( +<<<<<<< HEAD defaultEtcdDialTimeout = 5 * time.Second defaultEtcdURLs = "http://127.0.0.1:2379" defaultListenAddr = "127.0.0.1:8250" defautMaxKafkaSize = 1024 * 1024 * 1024 +======= + defaultEtcdDialTimeout = 5 * time.Second + defaultEtcdURLs = "http://127.0.0.1:2379" + defaultListenAddr = "127.0.0.1:8250" + // max grpc message size, leave 4MB as buffer. Because when grpc decompresses messages, it will leave a few buffer + // for this, which overflows the int64: https://github.com/grpc/grpc-go/blob/v1.44.0/rpc_util.go#L742 + defaultMaxMsgSize = int(^uint(0)>>1) - 4*1024*1024 +>>>>>>> 9992784c (*: fix drainer can't send request correctly to pump when set compressor=gzip (#1186)) defaultHeartbeatInterval = 2 defaultGC = "7" defaultDataDir = "data.pump" diff --git a/tests/cache_table/run.sh b/tests/cache_table/run.sh new file mode 100644 index 000000000..48ae47eda --- /dev/null +++ b/tests/cache_table/run.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +set -e + +cd "$(dirname "$0")" + +run_drainer --compressor gzip & + +sleep 3 + +run_sql 'CREATE DATABASE cache_test;' +run_sql 'CREATE TABLE cache_test.t1( a int, b varchar(128));' +run_sql "INSERT INTO cache_test.t1 (a,b) VALUES(1,'a'),(2,'b'),(3,'c'),(4,'d');" +run_sql "ALTER TABLE cache_test.t1 CACHE;" +run_sql "INSERT INTO cache_test.t1 (a,b) VALUES(5,'e'),(6,'f'),(7,'g'),(8,'h');" +run_sql "ALTER TABLE cache_test.t1 NOCACHE" + +sleep 3 + +down_run_sql 'SELECT a, b FROM cache_test.t1 order by a' +check_contains 'a: 7' +check_contains 'b: g' +check_contains 'a: 8' +check_contains 'b: h' + +run_sql 'DROP DATABASE cache_test' + +killall drainer