Skip to content

Commit

Permalink
Start of changes for roundtrip testing of alltypes in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
jduo committed Oct 6, 2023
1 parent 02de3c1 commit c13003c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 14 deletions.
70 changes: 70 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,76 @@ TEST_F(TestParquetFileSystemDataset, WriteWithEmptyPartitioningSchema) {
TestWriteWithEmptyPartitioningSchema();
}

class TestParquetFileSystemDatasetAllTypes : public WriteFileSystemDatasetMixin,
public testing::Test {
public:
void SetUp() override {
MakeSourceDataset();
check_metadata_ = false;
auto parquet_format = std::make_shared<ParquetFileFormat>();
format_ = parquet_format;
SetWriteOptions(parquet_format->DefaultWriteOptions());
}

std::shared_ptr<Schema> GetSourceSchema() const override {
return std::shared_ptr<Schema>(schema({
field("null", null()),
field("int8", int8()),
field("int16", int16()),
field("int32", int32()),
field("int64", int64()),
field("uint8", uint8()),
field("uint16", uint16()),
field("uint32", uint32()),
field("uint64", uint64()),
field("float16", float16()),
field("float32", float32()),
field("float64", float64()),
field("utf8", utf8()),
field("large_utf8", large_utf8()),
field("binary", binary()),
field("large_binary", large_binary()),
field("date32", date32()),
field("date64", date64()),
field("fixed_size_binary", fixed_size_binary(10)),
field("decimal128", decimal128(5, 1)),
field("decimal256", decimal256(40, 10)),
field("list", list(int32())),
field("large_list", large_list(int32())),
field("map", map(int32(), int64())),
field("fixed_size_list", fixed_size_list(int32(), 10)),
field("duration", duration(TimeUnit::MILLI)),
field("day_time_interval", day_time_interval()),
field("month_interval", month_interval()),
field("month_day_nano_interval", month_day_nano_interval()),
field("timestamp", timestamp(TimeUnit::MILLI)),
field("timestamp", timestamp(TimeUnit::MILLI, "UTC")),

}));
}

PathAndContent GetSourceFiles() const override {

};

};

TEST_F(TestParquetFileSystemDataset, WriteWithIdenticalPartitioningSchema) {
TestWriteWithIdenticalPartitioningSchema();
}

TEST_F(TestParquetFileSystemDataset, WriteWithUnrelatedPartitioningSchema) {
TestWriteWithUnrelatedPartitioningSchema();
}

TEST_F(TestParquetFileSystemDataset, WriteWithSupersetPartitioningSchema) {
TestWriteWithSupersetPartitioningSchema();
}

TEST_F(TestParquetFileSystemDataset, WriteWithEmptyPartitioningSchema) {
TestWriteWithEmptyPartitioningSchema();
}

class TestParquetFileFormatScan : public FileFormatScanMixin<ParquetFormatHelper> {
public:
std::shared_ptr<RecordBatch> SingleBatch(std::shared_ptr<Fragment> fragment) {
Expand Down
35 changes: 21 additions & 14 deletions cpp/src/arrow/dataset/test_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
public:
using PathAndContent = std::unordered_map<std::string, std::string>;

void MakeSourceDataset() {
virtual PathAndContent GetSourceFiles() const {
PathAndContent source_files;

source_files["/dataset/year=2018/month=01/dat0.json"] = R"([
Expand Down Expand Up @@ -1896,22 +1896,29 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
{"region": "QC", "model": "Y", "sales": 37, "country": "CA"}
])";
source_files["/dataset/.pesky"] = "garbage content";
return source_files;
}

auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
for (const auto& f : source_files) {
ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
}
fs_ = mock_fs;

/// schema for the whole dataset (both source and destination)
source_schema_ = schema({
virtual std::shared_ptr<Schema> GetSourceSchema() const {
return std::shared_ptr<Schema>(schema({
field("region", utf8()),
field("model", utf8()),
field("sales", float64()),
field("year", int32()),
field("month", int32()),
field("country", utf8()),
});
}));
}

void MakeSourceDataset() {
auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
for (const auto& f : GetSourceFiles()) {
ARROW_EXPECT_OK(mock_fs->CreateFile(f.first, f.second, /* recursive */ true));
}
fs_ = mock_fs;

/// schema for the whole dataset (both source and destination)
source_schema_ = GetSourceSchema();

/// Dummy file format for source dataset. Note that it isn't partitioned on country
auto source_format = std::make_shared<JSONRecordBatchFileFormat>(
Expand Down Expand Up @@ -1966,7 +1973,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
ASSERT_OK_AND_ASSIGN(written_, factory->Finish());
}

void TestWriteWithIdenticalPartitioningSchema() {
virtual void TestWriteWithIdenticalPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month"})));

Expand Down Expand Up @@ -1996,7 +2003,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
AssertWrittenAsExpected();
}

void TestWriteWithUnrelatedPartitioningSchema() {
virtual void TestWriteWithUnrelatedPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"country", "region"})));

Expand Down Expand Up @@ -2030,7 +2037,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
AssertWrittenAsExpected();
}

void TestWriteWithSupersetPartitioningSchema() {
virtual void TestWriteWithSupersetPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {"year", "month", "country", "region"})));

Expand Down Expand Up @@ -2065,7 +2072,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin {
AssertWrittenAsExpected();
}

void TestWriteWithEmptyPartitioningSchema() {
virtual void TestWriteWithEmptyPartitioningSchema() {
DoWrite(std::make_shared<DirectoryPartitioning>(
SchemaFromColumnNames(source_schema_, {})));

Expand Down

0 comments on commit c13003c

Please sign in to comment.