Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,39 +36,39 @@ suite("test_packed_file_concurrent_load", "p0, nonConcurrent") {
}
}

// Get merge file total small file count metric from all backends
def get_merge_file_total_small_file_count = {
// Get packed file total small file count metric from all backends
def get_packed_file_total_small_file_count = {
long total_count = 0
for (String backend_id: backendId_to_backendIP.keySet()) {
def ip = backendId_to_backendIP.get(backend_id)
def brpc_port = backendId_to_backendBrpcPort.get(backend_id)
try {
def count = getBrpcMetrics(ip, brpc_port, "merge_file_total_small_file_num")
def count = getBrpcMetrics(ip, brpc_port, "packed_file_total_small_file_num")
if (count > 0) {
total_count += count
logger.info("BE ${ip}:${brpc_port} merge_file_total_small_file_num = ${count}")
logger.info("BE ${ip}:${brpc_port} packed_file_total_small_file_num = ${count}")
}
} catch (Exception e) {
logger.warn("Failed to get metrics from BE ${ip}:${brpc_port}: ${e.getMessage()}")
}
}
logger.info("Total merge_file_total_small_file_num across all backends: ${total_count}")
logger.info("Total packed_file_total_small_file_num across all backends: ${total_count}")
return total_count
}


// Enable merge file feature and set small file threshold using framework's temporary config function
// Enable packed file feature and set small file threshold using framework's temporary config function
// This will automatically restore configs after test completes
setBeConfigTemporary([
"enable_merge_file": "true",
"enable_packed_file": "true",
"small_file_threshold_bytes": "102400"
]) {
// Get initial merge file count
def initial_merge_file_count = get_merge_file_total_small_file_count()
logger.info("Initial merge_file_total_small_file_count: ${initial_merge_file_count}")
// Get initial packed file count
def initial_packed_file_count = get_packed_file_total_small_file_count()
logger.info("Initial packed_file_total_small_file_count: ${initial_packed_file_count}")

// Test case 1: Multiple small concurrent loads to the same tablet
def tableName1 = "test_merge_file_same_tablet"
def tableName1 = "test_packed_file_same_tablet"
sql """ DROP TABLE IF EXISTS ${tableName1} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName1} (
Expand Down Expand Up @@ -132,15 +132,15 @@ suite("test_packed_file_concurrent_load", "p0, nonConcurrent") {
assertEquals(expected_rows1, result1[0][0] as int,
"Expected exactly ${expected_rows1} rows for DUPLICATE KEY table, got ${result1[0][0]}")

def count_after_test1 = get_merge_file_total_small_file_count()
logger.info("merge_file_total_small_file_count after test case 1: ${count_after_test1} (initial: ${initial_merge_file_count})")
def count_after_test1 = get_packed_file_total_small_file_count()
logger.info("packed_file_total_small_file_count after test case 1: ${count_after_test1} (initial: ${initial_packed_file_count})")
// The count must increase after test case 1
assertTrue(count_after_test1 > initial_merge_file_count,
"merge_file_total_small_file_count must increase after test case 1. " +
"Initial: ${initial_merge_file_count}, After test1: ${count_after_test1}")
assertTrue(count_after_test1 > initial_packed_file_count,
"packed_file_total_small_file_count must increase after test case 1. " +
"Initial: ${initial_packed_file_count}, After test1: ${count_after_test1}")

// Test case 2: Multiple small concurrent loads to different partitions
def tableName2 = "test_merge_file_different_partitions"
def tableName2 = "test_packed_file_different_partitions"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (
Expand Down Expand Up @@ -234,19 +234,19 @@ suite("test_packed_file_concurrent_load", "p0, nonConcurrent") {
"Some rows may have been filtered if they were out of partition range.")
logger.info("Loaded ${actual_rows2} rows out of expected ${expected_rows2} rows")

def count_after_test2 = get_merge_file_total_small_file_count()
logger.info("merge_file_total_small_file_count after test case 2: ${count_after_test2} (after test1: ${count_after_test1})")
def count_after_test2 = get_packed_file_total_small_file_count()
logger.info("packed_file_total_small_file_count after test case 2: ${count_after_test2} (after test1: ${count_after_test1})")
// The count must increase after test case 2
assertTrue(count_after_test2 > count_after_test1,
"merge_file_total_small_file_count must increase after test case 2. " +
"packed_file_total_small_file_count must increase after test case 2. " +
"After test1: ${count_after_test1}, After test2: ${count_after_test2}")

// Test case 3: Multiple small concurrent loads to different tables
def load_threads3 = []
def load_count3 = 6
def load_data_different_tables = { table_id, thread_id ->
try {
def table_name = "test_merge_file_table_${table_id}"
def table_name = "test_packed_file_table_${table_id}"
sql """ DROP TABLE IF EXISTS ${table_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
Expand Down Expand Up @@ -305,20 +305,20 @@ suite("test_packed_file_concurrent_load", "p0, nonConcurrent") {
// Verify data in all tables
def expected_rows_per_table = 5 * 100 // 5 batches * 100 rows = 500 rows per table
for (int i = 0; i < load_count3; i++) {
def table_name = "test_merge_file_table_${i}"
def table_name = "test_packed_file_table_${i}"
def result = sql "select count(*) from ${table_name}"
logger.info("Table ${table_name} row count: ${result[0][0]}, expected: ${expected_rows_per_table}")
// For DUPLICATE KEY table, all rows should be preserved exactly
assertEquals(expected_rows_per_table, result[0][0] as int,
"Expected exactly ${expected_rows_per_table} rows for DUPLICATE KEY table ${table_name}, got ${result[0][0]}")
}

def count_after_test3 = get_merge_file_total_small_file_count()
logger.info("merge_file_total_small_file_count after test case 3: ${count_after_test3} (after test2: ${count_after_test2}, initial: ${initial_merge_file_count})")
def count_after_test3 = get_packed_file_total_small_file_count()
logger.info("packed_file_total_small_file_count after test case 3: ${count_after_test3} (after test2: ${count_after_test2}, initial: ${initial_packed_file_count})")

// The count must increase after test case 3
assertTrue(count_after_test3 > count_after_test2,
"merge_file_total_small_file_count must increase after test case 3. " +
"packed_file_total_small_file_count must increase after test case 3. " +
"After test2: ${count_after_test2}, After test3: ${count_after_test3}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,23 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
}
}

// Get merge file total small file count metric from all backends
def get_merge_file_total_small_file_count = {
// Get packed file total small file count metric from all backends
def get_packed_file_total_small_file_count = {
long total_count = 0
for (String backend_id: backendId_to_backendIP.keySet()) {
def ip = backendId_to_backendIP.get(backend_id)
def brpc_port = backendId_to_backendBrpcPort.get(backend_id)
try {
def count = getBrpcMetrics(ip, brpc_port, "merge_file_total_small_file_num")
def count = getBrpcMetrics(ip, brpc_port, "packed_file_total_small_file_num")
if (count > 0) {
total_count += count
logger.info("BE ${ip}:${brpc_port} merge_file_total_small_file_num = ${count}")
logger.info("BE ${ip}:${brpc_port} packed_file_total_small_file_num = ${count}")
}
} catch (Exception e) {
logger.warn("Failed to get metrics from BE ${ip}:${brpc_port}: ${e.getMessage()}")
}
}
logger.info("Total merge_file_total_small_file_num across all backends: ${total_count}")
logger.info("Total packed_file_total_small_file_num across all backends: ${total_count}")
return total_count
}

Expand Down Expand Up @@ -199,14 +199,14 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
}
}

// Enable merge file feature and set small file threshold using framework's temporary config function
// Enable packed file feature and set small file threshold using framework's temporary config function
// This will automatically restore configs after test completes
setBeConfigTemporary([
"enable_merge_file": "true",
"enable_packed_file": "true",
"small_file_threshold_bytes": "102400" // 100KB threshold
]) {
// Test case 1: Mixed load (small and large files) - check query results
def tableName1 = "test_merge_file_mixed_load_query"
def tableName1 = "test_packed_file_mixed_load_query"
sql """ DROP TABLE IF EXISTS ${tableName1} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName1} (
Expand All @@ -220,10 +220,10 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
"""

def load_threads = []
def small_load_count = 5 // Small loads that will trigger merge
def large_load_count = 3 // Large loads that won't trigger merge
def small_load_count = 5 // Small loads that will trigger packed
def large_load_count = 3 // Large loads that won't trigger packed

// Small load function - generates files smaller than threshold (will be merged)
// Small load function - generates files smaller than threshold (will be packed)
def small_load = { table_name, thread_id ->
try {
for (int i = 0; i < 3; i++) {
Expand Down Expand Up @@ -255,7 +255,7 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
}
}

// Large load function - generates files larger than threshold (won't be merged)
// Large load function - generates files larger than threshold (won't be packed)
def large_load = { table_name, thread_id ->
try {
for (int i = 0; i < 2; i++) {
Expand Down Expand Up @@ -308,7 +308,7 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
t.join(120000) // 2 minutes timeout
}

// Wait a bit for merge operations to complete
// Wait a bit for packed operations to complete
sleep(5000)

// Verify query results - should include data from both small and large loads
Expand Down Expand Up @@ -343,7 +343,7 @@ suite("test_packed_file_mixed_load", "p0, nonConcurrent") {
logger.info("✓ Test case 1.1 passed: Query results are correct after clearing file cache")

// Test case 2: Mixed load - check index and delete bitmap
def tableName2 = "test_merge_file_mixed_load_index"
def tableName2 = "test_packed_file_mixed_load_index"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName2} (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ suite("test_packed_file_query_with_table_drop", "p0, nonConcurrent") {
getBackendIpHttpAndBrpcPort(backendId_to_backendIP, backendId_to_backendHttpPort, backendId_to_backendBrpcPort)

setBeConfigTemporary([
"enable_merge_file": "true",
"enable_packed_file": "true",
"small_file_threshold_bytes": "102400" // 100KB threshold
]) {
// Create main table that will be queried
def mainTableName = "test_merge_file_main_table"
def mainTableName = "test_packed_file_main_table"
sql """ DROP TABLE IF EXISTS ${mainTableName} """
sql """
CREATE TABLE IF NOT EXISTS ${mainTableName} (
Expand All @@ -49,7 +49,7 @@ suite("test_packed_file_query_with_table_drop", "p0, nonConcurrent") {
def otherTableNames = []
def otherTableCount = 5
for (int i = 0; i < otherTableCount; i++) {
def tableName = "test_merge_file_other_table_${i}"
def tableName = "test_packed_file_other_table_${i}"
otherTableNames.add(tableName)
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
Expand All @@ -67,7 +67,7 @@ suite("test_packed_file_query_with_table_drop", "p0, nonConcurrent") {
"""
}

// Load data into main table (small files that will trigger merge)
// Load data into main table (small files that will trigger packed)
def mainTableLoadCount = 8
def mainTableLoadThreads = []
def loadMainTable = { table_name, thread_id ->
Expand Down Expand Up @@ -101,7 +101,7 @@ suite("test_packed_file_query_with_table_drop", "p0, nonConcurrent") {
}
}

// Load data into other tables (small files that will trigger merge)
// Load data into other tables (small files that will trigger packed)
def otherTableLoadThreads = []
def loadOtherTable = { table_name, thread_id ->
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,39 +36,39 @@ suite("test_packed_file_with_group_commit", "p0, nonConcurrent") {
}
}

// Get merge file total small file count metric from all backends
def get_merge_file_total_small_file_count = {
// Get packed file total small file count metric from all backends
def get_packed_file_total_small_file_count = {
long total_count = 0
for (String backend_id: backendId_to_backendIP.keySet()) {
def ip = backendId_to_backendIP.get(backend_id)
def brpc_port = backendId_to_backendBrpcPort.get(backend_id)
try {
def count = getBrpcMetrics(ip, brpc_port, "merge_file_total_small_file_num")
def count = getBrpcMetrics(ip, brpc_port, "packed_file_total_small_file_num")
if (count > 0) {
total_count += count
logger.info("BE ${ip}:${brpc_port} merge_file_total_small_file_num = ${count}")
logger.info("BE ${ip}:${brpc_port} packed_file_total_small_file_num = ${count}")
}
} catch (Exception e) {
logger.warn("Failed to get metrics from BE ${ip}:${brpc_port}: ${e.getMessage()}")
}
}
logger.info("Total merge_file_total_small_file_num across all backends: ${total_count}")
logger.info("Total packed_file_total_small_file_num across all backends: ${total_count}")
return total_count
}

// Enable merge file feature and set small file threshold using framework's temporary config function
// Enable packed file feature and set small file threshold using framework's temporary config function
// This will automatically restore configs after test completes
setBeConfigTemporary([
"enable_merge_file": "true",
"enable_packed_file": "true",
"small_file_threshold_bytes": "102400"
]) {
// Get initial merge file count
def initial_merge_file_count = get_merge_file_total_small_file_count()
logger.info("Initial merge_file_total_small_file_count: ${initial_merge_file_count}")
// Get initial packed file count
def initial_packed_file_count = get_packed_file_total_small_file_count()
logger.info("Initial packed_file_total_small_file_count: ${initial_packed_file_count}")

// Test case: Merge file with group commit enabled
// This test verifies that merge file logic works correctly when group commit is enabled
def tableName = "test_merge_file_with_group_commit"
// Test case: Packed file with group commit enabled
// This test verifies that packed file logic works correctly when group commit is enabled
def tableName = "test_packed_file_with_group_commit"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
Expand All @@ -84,8 +84,8 @@ suite("test_packed_file_with_group_commit", "p0, nonConcurrent") {
);
"""

def count_before_test = get_merge_file_total_small_file_count()
logger.info("merge_file_total_small_file_count before test (with group commit): ${count_before_test}")
def count_before_test = get_packed_file_total_small_file_count()
logger.info("packed_file_total_small_file_count before test (with group commit): ${count_before_test}")

def load_threads = []
def load_count = 8
Expand Down Expand Up @@ -140,21 +140,21 @@ suite("test_packed_file_with_group_commit", "p0, nonConcurrent") {
t.join(60000)
}

// Wait a bit for group commit to finish and merge operations to complete
// Wait a bit for group commit to finish and packed operations to complete
sleep(5000)

def result = sql "select count(*) from ${tableName}"
def expected_rows = load_count * 5 * 100 // 8 threads * 5 batches * 100 rows = 4000
assertEquals(expected_rows, result[0][0] as int,
"Expected exactly ${expected_rows} rows for DUPLICATE KEY table with group commit, got ${result[0][0]}")

def count_after_test = get_merge_file_total_small_file_count()
logger.info("merge_file_total_small_file_count after test (with group commit): ${count_after_test} (before: ${count_before_test})")
def count_after_test = get_packed_file_total_small_file_count()
logger.info("packed_file_total_small_file_count after test (with group commit): ${count_after_test} (before: ${count_before_test})")

// The count must increase after test, verifying that merge file works with group commit
// The count must increase after test, verifying that packed file works with group commit
assertTrue(count_after_test > count_before_test,
"merge_file_total_small_file_count must increase after test (with group commit). " +
"packed_file_total_small_file_count must increase after test (with group commit). " +
"Before: ${count_before_test}, After: ${count_after_test}. " +
"This verifies that merge file logic works correctly when group commit is enabled.")
"This verifies that packed file logic works correctly when group commit is enabled.")
}
}
Loading