Skip to content

Commit

Permalink
add decode_key option to hive partitioning in R and related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jedi18 committed Dec 6, 2021
1 parent 7d5ffe5 commit f044b75
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ Result<util::optional<KeyValuePartitioning::Key>> HivePartitioning::ParseKey(
if (options.decode_key) {
auto raw_key = util::string_view(segment).substr(0, name_end);
ARROW_ASSIGN_OR_RAISE(name, SafeUriUnescape(raw_key));
}else{
} else {
name = segment.substr(0, name_end);
}

Expand Down
51 changes: 31 additions & 20 deletions cpp/src/arrow/dataset/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,38 +659,49 @@ TEST_F(TestPartitioning, UrlEncodedHive) {
TEST_F(TestPartitioning, UrlEncodedHiveWithKeyEncoded) {
HivePartitioningFactoryOptions options;
auto ts = timestamp(TimeUnit::type::SECOND);
options.schema = schema({field("test'; date", ts), field("test'; time", ts), field("str", utf8())});
options.schema =
schema({field("test'; date", ts), field("test'; time", ts), field("str", utf8())});
options.decode_key = true;
options.null_fallback = "$";
factory_ = HivePartitioning::MakeFactory(options);

AssertInspect(
{"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 07:27:00/str=$",
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE",
"/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24"},
options.schema->fields());
AssertInspect({"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
"/test%27%3B%20date=2021-05-04 "
"00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24"},
options.schema->fields());

auto date = std::make_shared<TimestampScalar>(1620086400, ts);
auto time = std::make_shared<TimestampScalar>(1620113220, ts);
partitioning_ = std::make_shared<HivePartitioning>(options.schema, ArrayVector(),
options.AsHivePartitioningOptions());
AssertParse("/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 07:27:00/str=$",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
AssertParse("/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 07:27:00/str=%E3%81%8F%E3%81%BE",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)),
equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=$",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
AssertParse(
"/test%27%3B%20date=2021-05-04 00:00:00/test%27%3B%20time=2021-05-04 "
"07:27:00/str=%E3%81%8F%E3%81%BE",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)),
equal(field_ref("str"), literal("\xE3\x81\x8F\xE3\x81\xBE"))}));
// URL-encoded null fallback value
AssertParse("/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 07%3A27%3A00/str=%24",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));
AssertParse(
"/test%27%3B%20date=2021-05-04 00%3A00%3A00/test%27%3B%20time=2021-05-04 "
"07%3A27%3A00/str=%24",
and_({equal(field_ref("test'; date"), literal(date)),
equal(field_ref("test'; time"), literal(time)), is_null(field_ref("str"))}));

// Invalid UTF-8
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"}));
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("was not valid UTF-8"),
partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"}));
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
factory_->Inspect({"/%AF=2021-05-04/time=2021-05-04 07%3A27%3A00/str=%24"}));
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("was not valid UTF-8"),
partitioning_->Parse({"/%AF=2021-05-04/%BF=2021-05-04 07%3A27%3A00/str=%24"}));
}

TEST_F(TestPartitioning, EtlThenHive) {
Expand Down
8 changes: 4 additions & 4 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions r/R/dataset-partition.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ DirectoryPartitioning$create <- function(schm, segment_encoding = "uri") {
#' @rdname Partitioning
#' @export
HivePartitioning <- R6Class("HivePartitioning", inherit = Partitioning)
HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "uri") {
HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding = "uri", decode_key = FALSE) {
dataset___HivePartitioning(schm,
null_fallback = null_fallback_or_default(null_fallback),
segment_encoding = segment_encoding
segment_encoding = segment_encoding,
decode_key = decode_key
)
}

Expand All @@ -93,17 +94,19 @@ HivePartitioning$create <- function(schm, null_fallback = NULL, segment_encoding
#' which is what Hive uses.
#' @param segment_encoding Decode partition segments after splitting paths.
#' Default is `"uri"` (URI-decode segments). May also be `"none"` (leave as-is).
#' @param decode_key Decode key as well using given segment encoding scheme.
#' Default is `false` (decode key).
#' @return A [HivePartitioning][Partitioning], or a `HivePartitioningFactory` if
#' calling `hive_partition()` with no arguments.
#' @examplesIf arrow_with_dataset()
#' hive_partition(year = int16(), month = int8())
#' @export
hive_partition <- function(..., null_fallback = NULL, segment_encoding = "uri") {
hive_partition <- function(..., null_fallback = NULL, segment_encoding = "uri", decode_key = FALSE) {
schm <- schema(...)
if (length(schm) == 0) {
HivePartitioningFactory$create(null_fallback, segment_encoding)
HivePartitioningFactory$create(null_fallback, segment_encoding, decode_key)
} else {
HivePartitioning$create(schm, null_fallback, segment_encoding)
HivePartitioning$create(schm, null_fallback, segment_encoding, decode_key)
}
}

Expand All @@ -123,8 +126,8 @@ DirectoryPartitioningFactory$create <- function(field_names, segment_encoding =
#' @rdname Partitioning
#' @export
HivePartitioningFactory <- R6Class("HivePartitioningFactory", inherit = PartitioningFactory)
HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "uri") {
dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), segment_encoding)
HivePartitioningFactory$create <- function(null_fallback = NULL, segment_encoding = "uri", decode_key = FALSE) {
dataset___HivePartitioning__MakeFactory(null_fallback_or_default(null_fallback), segment_encoding, decode_key)
}

null_fallback_or_default <- function(null_fallback) {
Expand Down
10 changes: 9 additions & 1 deletion r/man/hive_partition.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 12 additions & 10 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,20 +374,23 @@ std::shared_ptr<ds::PartitioningFactory> dataset___DirectoryPartitioning__MakeFa
// [[dataset::export]]
std::shared_ptr<ds::HivePartitioning> dataset___HivePartitioning(
const std::shared_ptr<arrow::Schema>& schm, const std::string& null_fallback,
const std::string& segment_encoding) {
const std::string& segment_encoding, bool decode_key) {
ds::HivePartitioningOptions options;
options.null_fallback = null_fallback;
options.segment_encoding = GetSegmentEncoding(segment_encoding);
options.decode_key = decode_key;
std::vector<std::shared_ptr<arrow::Array>> dictionaries;
return std::make_shared<ds::HivePartitioning>(schm, dictionaries, options);
}

// [[dataset::export]]
std::shared_ptr<ds::PartitioningFactory> dataset___HivePartitioning__MakeFactory(
const std::string& null_fallback, const std::string& segment_encoding) {
const std::string& null_fallback, const std::string& segment_encoding,
bool decode_key) {
ds::HivePartitioningFactoryOptions options;
options.null_fallback = null_fallback;
options.segment_encoding = GetSegmentEncoding(segment_encoding);
options.decode_key = decode_key;
return ds::HivePartitioning::MakeFactory(options);
}

Expand Down
66 changes: 66 additions & 0 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,72 @@ test_that("URI-decoding with hive partitioning", {
)
})

test_that("URI-decoding with hive partitioning with key encoded", {
root <- make_temp_dir()
fmt <- FileFormat$create("feather")
fs <- LocalFileSystem$create()
selector <- FileSelector$create(root, recursive = TRUE)
dir1 <- file.path(root, "%C2%B5=2021-05-04 00%3A00%3A00", "%C3%9F=%24")
dir.create(dir1, recursive = TRUE)
write_feather(df1, file.path(dir1, "data.feather"))

partitioning <- hive_partition(
µ = timestamp(unit = "s"), ß = utf8(), segment_encoding = "uri", decode_key = TRUE
)
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt,
partitioning = partitioning
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_scan_result(ds, schm)

# segment encoding for both key and values
partitioning_factory <- hive_partition(segment_encoding = "uri", decode_key = TRUE)
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(µ == "2021-05-04 00:00:00", ß == "$") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)

# segment encoding only for values
partitioning_factory <- hive_partition(segment_encoding = "uri", decode_key = FALSE)
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`%C2%B5` == "2021-05-04 00:00:00", `%C3%9F` == "$") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)

# no segment encoding
partitioning_factory <- hive_partition(segment_encoding = "none")
factory <- FileSystemDatasetFactory$create(
fs, selector, NULL, fmt, partitioning_factory
)
schm <- factory$Inspect()
ds <- factory$Finish(schm)
expect_equal(
ds %>%
filter(`%C2%B5` == "2021-05-04 00%3A00%3A00", `%C3%9F` == "%24") %>%
select(int) %>%
collect(),
df1 %>% select(int) %>% collect()
)
})

# Everything else below here is using parquet files
skip_if_not_available("parquet")

Expand Down

0 comments on commit f044b75

Please sign in to comment.