diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic.groovy index 3ad8c820f7d254..cc6609081b7e1f 100644 --- a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic.groovy +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic.groovy @@ -84,6 +84,23 @@ suite('test_warm_up_cluster_periodic', 'docker') { return getBrpcMetrics(ip, port, "ttl_cache_size") } + def getClusterTTLCacheSizeSum = { cluster -> + def backends = sql """SHOW BACKENDS""" + + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + + long sum = 0 + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def size = getTTLCacheSize(ip, port) + sum += size + logger.info("be be ${ip}:${port} ttl cache size ${size}") + } + + return sum + } + def checkTTLCacheSizeSumEqual = { cluster1, cluster2 -> def backends = sql """SHOW BACKENDS""" @@ -113,6 +130,16 @@ suite('test_warm_up_cluster_periodic', 'docker') { assertEquals(srcSum, tgtSum) } + def waitUntil = { condition, timeoutMs -> + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeoutMs) { + if (condition()) { + return + } + sleep(1000) + } + } + docker(options) { def clusterName1 = "warmup_source" def clusterName2 = "warmup_target" @@ -164,12 +191,14 @@ suite('test_warm_up_cluster_periodic', 'docker') { sql """SELECT * FROM customer""" } - sleep(10000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) > 0 }, 30000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) == getClusterTTLCacheSizeSum(clusterName2) }, 60000) def hotspot = sql """select * from __internal_schema.cloud_cache_hotspot;""" logger.info("hotspot: {}", hotspot) logFileCacheDownloadMetrics(clusterName2) + assertTrue(getClusterTTLCacheSizeSum(clusterName1) > 0) checkTTLCacheSizeSumEqual(clusterName1, clusterName2) def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_add_new_be.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_add_new_be.groovy index 5c58da8a564410..1ab3a12ded19d1 100644 --- a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_add_new_be.groovy +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_add_new_be.groovy @@ -84,6 +84,23 @@ suite('test_warm_up_cluster_periodic_add_new_be', 'docker') { return getBrpcMetrics(ip, port, "ttl_cache_size") } + def getClusterTTLCacheSizeSum = { cluster -> + def backends = sql """SHOW BACKENDS""" + + def cluster_bes = backends.findAll { it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") } + + long sum = 0 + for (be in cluster_bes) { + def ip = be[1] + def port = be[5] + def size = getTTLCacheSize(ip, port) + sum += size + logger.info("be be ${ip}:${port} ttl cache size ${size}") + } + + return sum + } + def checkTTLCacheSizeSumEqual = { cluster1, cluster2 -> def backends = sql """SHOW BACKENDS""" @@ -94,14 +111,18 @@ suite('test_warm_up_cluster_periodic_add_new_be', 'docker') { for (src in srcBes) { def ip = src[1] def port = src[5] - srcSum += getTTLCacheSize(ip, port) + def size = getTTLCacheSize(ip, port) + srcSum += size + logger.info("src be ${ip}:${port} ttl cache size ${size}") } long tgtSum = 0 for (tgt in tgtBes) { def ip = tgt[1] def port = tgt[5] - tgtSum += getTTLCacheSize(ip, port) + def size = getTTLCacheSize(ip, port) + tgtSum += size + logger.info("dst be ${ip}:${port} ttl cache size ${size}") } logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}") @@ -109,6 +130,16 @@ suite('test_warm_up_cluster_periodic_add_new_be', 'docker') { assertEquals(srcSum, tgtSum) } + def waitUntil = { condition, timeoutMs -> + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeoutMs) { + if (condition()) { + return + } + sleep(1000) + } + } + docker(options) { def clusterName1 = "warmup_source" def clusterName2 = "warmup_target" @@ -162,12 +193,14 @@ suite('test_warm_up_cluster_periodic_add_new_be', 'docker') { sql """SELECT * FROM customer""" } - sleep(15000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) > 0 }, 60000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) == getClusterTTLCacheSizeSum(clusterName2) }, 60000) def hotspot = sql """select * from __internal_schema.cloud_cache_hotspot;""" logger.info("hotspot: {}", hotspot) logFileCacheDownloadMetrics(clusterName2) + assertTrue(getClusterTTLCacheSizeSum(clusterName1) > 0) checkTTLCacheSizeSumEqual(clusterName1, clusterName2) def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}""" diff --git a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_rename.groovy b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_rename.groovy index 271fec8bbfc9b0..53f65605a439f9 100644 --- a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_rename.groovy +++ b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_periodic_rename.groovy @@ -77,6 +77,11 @@ suite('test_warm_up_cluster_periodic_rename', 'docker') { def failed = getBrpcMetrics(ip, port, "file_cache_download_failed_num") logger.info("${cluster} be ${ip}:${port}, downloader submitted=${submitted}" + ", finished=${finished}, failed=${failed}") + + def submitted_segment = getBrpcMetrics(ip, port, "file_cache_once_or_periodic_warm_up_submitted_segment_num") + def finished_segment = getBrpcMetrics(ip, port, "file_cache_once_or_periodic_warm_up_finished_segment_num") + logger.info("${cluster} be ${ip}:${port}, warmup submitted=${submitted_segment}" + + ", finished=${finished_segment}") } } @@ -130,6 +135,16 @@ suite('test_warm_up_cluster_periodic_rename', 'docker') { assertEquals(srcSum, tgtSum) } + def waitUntil = { condition, timeoutMs -> + long start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < timeoutMs) { + if (condition()) { + return + } + sleep(1000) + } + } + docker(options) { def clusterName1 = "warmup_source" def clusterName2 = "warmup_target" @@ -184,7 +199,7 @@ suite('test_warm_up_cluster_periodic_rename', 'docker') { sql """SELECT * FROM customer""" } - sleep(5000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) > 0 }, 60000) def hotspot = sql """select * from __internal_schema.cloud_cache_hotspot;""" logger.info("hotspot: {}", hotspot) @@ -194,7 +209,7 @@ suite('test_warm_up_cluster_periodic_rename', 'docker') { assertEquals(0, getClusterTTLCacheSizeSum(clusterName3)) sql """ALTER SYSTEM RENAME COMPUTE GROUP ${clusterName3} ${clusterName2}""" - sleep(5000) + waitUntil({ getClusterTTLCacheSizeSum(clusterName1) == getClusterTTLCacheSizeSum(clusterName2) }, 60000) logFileCacheDownloadMetrics(clusterName2) checkTTLCacheSizeSumEqual(clusterName1, clusterName2)