Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/vec/exec/format/parquet/parquet_thrift_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_m

// validate magic
uint8_t* magic_ptr = footer.data() + bytes_read - 4;
if (bytes_read < PARQUET_FOOTER_SIZE ||
memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
if (bytes_read < PARQUET_FOOTER_SIZE) {
return Status::Corruption(
"Read parquet file footer fail, bytes read: {}, file size: {}, path: {}",
bytes_read, file_size, file->path().native());
} else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
return Status::Corruption(
"Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, "
"read magic: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
2024-09-01 5
2024-09-02 1
2024-09-03 1
2024-09-04 3

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.file.StandardCopyOption

suite('test_ingestion_load', 'p0,external') {

def testIngestLoadJob = { testTable, loadLabel, String dataFile ->
def testIngestLoadJob = { testTable, loadLabel, String dataFile , filesize ->

sql "TRUNCATE TABLE ${testTable}"

Expand Down Expand Up @@ -85,7 +85,7 @@ suite('test_ingestion_load', 'p0,external') {
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 81758}",
"filePathToSize": "{\\"${etlResultFilePath}\\": ${filesize}}",
"hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}"
}
}"""
Expand Down Expand Up @@ -156,7 +156,7 @@ suite('test_ingestion_load', 'p0,external') {

def label = "test_ingestion_load"

testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet')
testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet',5745)

tableName = 'tbl_test_spark_load_unique_mor'

Expand Down Expand Up @@ -189,7 +189,7 @@ suite('test_ingestion_load', 'p0,external') {

label = "test_ingestion_load_unique_mor"

testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet')
testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet',5745)

tableName = 'tbl_test_spark_load_agg'

Expand All @@ -215,7 +215,7 @@ suite('test_ingestion_load', 'p0,external') {

label = "test_ingestion_load_agg"

testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data1.parquet')
testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data1.parquet',4057)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ suite('test_ingestion_load_alter_column', 'p0,external') {
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 81758}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 5745}",
"hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}"
}
}"""
Expand All @@ -112,7 +112,7 @@ suite('test_ingestion_load_alter_column', 'p0,external') {
while (max_try_milli_secs) {
def result = sql "show load where label = '${loadLabel}'"
if (result[0][2] == "CANCELLED") {
msg = result[0][7]
def msg = result[0][7]
logger.info("err msg: " + msg)
assertTrue((result[0][7] =~ /schema of index \[\d+\] has changed/).find())
break
Expand All @@ -134,6 +134,8 @@ suite('test_ingestion_load_alter_column', 'p0,external') {

try {

sql "DROP TABLE if exists ${tableName1}"
sql "DROP TABLE if exists ${tableName2}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName1} (
c_int int(11) NULL,
Expand Down Expand Up @@ -199,10 +201,8 @@ suite('test_ingestion_load_alter_column', 'p0,external') {
})

} finally {
//sql "DROP TABLE ${tableName1}"
//sql "DROP TABLE ${tableName2}"
}

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ suite('test_ingestion_load_alter_partition', 'p0,external') {
qt_select "select c1, count(*) from ${testTable} group by c1 order by c1"
break
} else if (result[0][2] == "CANCELLED") {
msg = result[0][7]
logger.info("err msg: " + msg)
def msg2 = result[0][7]
logger.info("err msg: " + msg2)
assertTrue((result[0][7] =~ /partition does not exist/).find())
break
} else {
Expand All @@ -146,6 +146,10 @@ suite('test_ingestion_load_alter_partition', 'p0,external') {

try {

sql "DROP TABLE if exists ${tableName1}"
sql "DROP TABLE if exists ${tableName2}"
sql "DROP TABLE if exists ${tableName3}"

sql """
CREATE TABLE IF NOT EXISTS ${tableName1} (
c0 int not null,
Expand Down Expand Up @@ -214,9 +218,6 @@ suite('test_ingestion_load_alter_partition', 'p0,external') {
})

} finally {
// sql "DROP TABLE ${tableName1}"
// sql "DROP TABLE ${tableName2}"
// sql "DROP TABLE ${tableName3}"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ suite('test_ingestion_load_drop_table', 'p0,external') {
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 81758}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 5745}",
"hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}"
}
}"""
Expand Down Expand Up @@ -188,7 +188,6 @@ suite('test_ingestion_load_drop_table', 'p0,external') {
})

} finally {
sql "DROP TABLE ${tableName}"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ suite('test_ingestion_load_multi_table', 'p0,external') {
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath1}\\": 81758, \\"${etlResultFilePath2}\\": 81758}",
"filePathToSize": "{\\"${etlResultFilePath1}\\": 5745, \\"${etlResultFilePath2}\\": 5745}",
"hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}"
}
}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ suite('test_ingestion_load_with_inverted_index', 'p0,external') {
"msg": "",
"appId": "",
"dppResult": "${dppResult}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 81758}",
"filePathToSize": "{\\"${etlResultFilePath}\\": 5745}",
"hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}"
}
}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ suite('test_ingestion_load_with_partition', 'p0,external') {
}
}

etlResultFilePaths = []
def etlResultFilePaths = []
for(int i=0; i < dataFiles.size(); i++) {
Files.copy(Paths.get(dataFiles[i]),
Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileNames[i]}"), StandardCopyOption.REPLACE_EXISTING)
Expand Down Expand Up @@ -115,7 +115,7 @@ suite('test_ingestion_load_with_partition', 'p0,external') {

def max_try_milli_secs = 120000
while (max_try_milli_secs) {
result = sql "show load where label = '${loadLabel}'"
def result = sql "show load where label = '${loadLabel}'"
if (result[0][2] == "FINISHED") {
sql "sync"
qt_select "select c1, count(*) from ${testTable} group by c1 order by c1"
Expand All @@ -133,8 +133,8 @@ suite('test_ingestion_load_with_partition', 'p0,external') {

if (enableHdfs()) {

def tableName = 'tbl_test_spark_load_partition'

def tableName = 'tbl_test_spark_load_with_partition'
sql "DROP TABLE if exists ${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
c0 int not null,
Expand All @@ -151,7 +151,7 @@ suite('test_ingestion_load_with_partition', 'p0,external') {
)
"""

def label = "test_ingestion_load_partition"
def label = "test_ingestion_load_with_partition__"

testIngestLoadJob.call(tableName, label, [context.config.dataPath + '/load_p0/ingestion_load/data2-0.parquet', context.config.dataPath + '/load_p0/ingestion_load/data2-1.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-2.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-3.parquet'])

Expand Down
Loading