Skip to content

Commit

Permalink
ARROW-14737: [C++][Dataset] Support URI-decoding partition keys
Browse files Browse the repository at this point in the history
Support URI decoding of key in hive partitioning.

Closes #11858 from Jedi18/support_uri_decode_partition_key

Authored-by: Jedi18 <akhiljnair.188@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
Jedi18 authored and lidavidm committed Dec 13, 2021
1 parent dee4ba3 commit 11be9c5
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -580,19 +580,22 @@ Result<util::optional<KeyValuePartitioning::Key>> HivePartitioning::ParseKey(
// Static method, so we have no better place for it
util::InitializeUTF8();

auto name = segment.substr(0, name_end);
std::string name;
std::string value;
switch (options.segment_encoding) {
case SegmentEncoding::None: {
name = segment.substr(0, name_end);
value = segment.substr(name_end + 1);
if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(value))) {
return Status::Invalid("Partition segment was not valid UTF-8: ", value);
if (ARROW_PREDICT_FALSE(!util::ValidateUTF8(segment))) {
return Status::Invalid("Partition segment was not valid UTF-8: ", segment);
}
break;
}
case SegmentEncoding::Uri: {
auto raw_value = util::string_view(segment).substr(name_end + 1);
ARROW_ASSIGN_OR_RAISE(value, SafeUriUnescape(raw_value));
auto raw_key = util::string_view(segment).substr(0, name_end);
ARROW_ASSIGN_OR_RAISE(name, SafeUriUnescape(raw_key));
break;
}
default:
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/arrow/dataset/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,53 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
partitioning_->Parse({"/date=\xAF/time=\xBF/str=\xCF"}));
}

TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
HivePartitioningFactoryOptions options;
auto ts = timestamp(TimeUnit::type::SECOND);
options.schema =
schema({field("test'; date", ts), field("test'; time", ts), field("str", utf8())});
options.null_fallback = "$";
factory_ = HivePartitioning::MakeFactory(options);

AssertInspect({"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
"/test%27%3B%20date=2021-05-04 "
"00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24"},
options.schema->fields());

auto date = std::make_shared<TimestampScalar>(1620086400, ts);
auto time = std::make_shared<TimestampScalar>(1620113220, ts);
partitioning_ = std::make_shared<HivePartitioning>(options.schema, ArrayVector(),
options.AsHivePartitioningOptions());
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)),
equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
// URL-encoded null fallback value
AssertParse(
"/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 "
"07%3A27%3A00/str=%24",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));

// Invalid UTF-8
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"}));
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"}));
}

TEST_F(TestPartitioning, EtlThenHive) {
FieldVector etl_fields{field("year", int16()), field("month", int8()),
field("day", int8()), field("hour", int8())};
Expand Down
79 changes: 79 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,85 @@ def test_partitioning_factory_segment_encoding():
inferred_schema = factory.inspect()


def test_partitioning_factory_hive_segment_encoding_key_encoded():
mockfs = fs._MockFileSystem()
format = ds.IpcFileFormat()
schema = pa.schema([("i64", pa.int64())])
table = pa.table([pa.array(range(10))], schema=schema)
partition_schema = pa.schema(
[("test'; date", pa.timestamp("s")), ("test';[ string'", pa.string())])
string_partition_schema = pa.schema(
[("test'; date", pa.string()), ("test';[ string'", pa.string())])
full_schema = pa.schema(list(schema) + list(partition_schema))

partition_schema_en = pa.schema(
[("test%27%3B%20date", pa.timestamp("s")),
("test%27%3B%5B%20string%27", pa.string())])
string_partition_schema_en = pa.schema(
[("test%27%3B%20date", pa.string()),
("test%27%3B%5B%20string%27", pa.string())])

directory = ("hive/test%27%3B%20date=2021-05-04 00%3A00%3A00/"
"test%27%3B%5B%20string%27=%24")
mockfs.create_dir(directory)
with mockfs.open_output_stream(directory + "/0.feather") as sink:
with pa.ipc.new_file(sink, schema) as writer:
writer.write_table(table)
writer.close()

# Hive
selector = fs.FileSelector("hive", recursive=True)
options = ds.FileSystemFactoryOptions("hive")
options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema)
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
inferred_schema = factory.inspect()
assert inferred_schema == full_schema
actual = factory.finish().to_table(columns={
"date_int": ds.field("test'; date").cast(pa.int64()),
})
assert actual[0][0].as_py() == 1620086400

options.partitioning_factory = ds.HivePartitioning.discover(
segment_encoding="uri")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test'; date") == "2021-05-04 00:00:00") &
(ds.field("test';[ string'") == "$"))

options.partitioning = ds.HivePartitioning(
string_partition_schema, segment_encoding="uri")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test'; date") == "2021-05-04 00:00:00") &
(ds.field("test';[ string'") == "$"))

options.partitioning_factory = ds.HivePartitioning.discover(
segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") &
(ds.field("test%27%3B%5B%20string%27") == "%24"))

options.partitioning = ds.HivePartitioning(
string_partition_schema_en, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("test%27%3B%20date") == "2021-05-04 00%3A00%3A00") &
(ds.field("test%27%3B%5B%20string%27") == "%24"))

options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema_en, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
with pytest.raises(pa.ArrowInvalid,
match="Could not cast segments for partition field"):
inferred_schema = factory.inspect()


def test_dictionary_partitioning_outer_nulls_raises(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']})
part = ds.partitioning(
Expand Down
51 changes: 51 additions & 0 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,57 @@ test_that("URI-decoding with hive partitioning", {
)
})

test_that("URI-decoding with hive partitioning with key encoded", {
root <- make_temp_dir()
fmt <- FileFormat$create("feather")
fs <- LocalFileSystem$create()
selector <- FileSelector$create(root, recursive = TRUE)
dir1 <- file.path(root, "test%20key=2021-05-04 00%3A00%3A00", "test%20key1=%24")
dir.create(dir1, recursive = TRUE)
write_feather(df1, file.path(dir1, "data.feather"))

partitioning <- hive_partition(
`test key` = timestamp(unit = "s"), `test key1` = utf8(), segment_encoding = "uri"
)
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt,
partitioning = partitioning
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_scan_result(ds, schm)

# segment encoding for both key and values
partitioning_factory <- hive_partition(segment_encoding = "uri")
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`test key` == "2021-05-04 00:00:00", `test key1` == "$") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)

# no segment encoding
partitioning_factory <- hive_partition(segment_encoding = "none")
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`test%20key` == "2021-05-04 00%3A00%3A00", `test%20key1` == "%24") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)
})

# Everything else below here is using parquet files
skip_if_not_available("parquet")

Expand Down

0 comments on commit 11be9c5

Please sign in to comment.