Skip to content

Commit

Permalink
feat(cpp-client): Honor Arrow TimeUnit when converting Timestamp and …
Browse files Browse the repository at this point in the history
…Time64 to ColumnSource (#6609)

This PR has adds two tests but leaves one disabled. The disabled one
will fail on the main branch; that is why it is disabled.

To run all the tests, execute this command.
```
./gradlew :cpp-client:testCppClient
```

Then, if you rebase this branch onto your `nate/nightly/barrage_types`
branch, you can re-enable the disabled test by editing the file
`cpp-client/deephaven/tests/src/time_unit_test.cc` and removing the tag
`[.hidden]`. Currently that tag is at line 75.

You should then be able to run all the unit tests with the same gradlew
command above and they should all pass.
  • Loading branch information
kosak authored Feb 8, 2025
1 parent 1feacf4 commit 6252118
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cpp-client/deephaven/dhclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ set(ALL_FILES
include/private/deephaven/client/impl/util.h

src/arrowutil/arrow_client_table.cc
src/arrowutil/arrow_column_source.cc
include/private/deephaven/client/arrowutil/arrow_client_table.h
include/private/deephaven/client/arrowutil/arrow_column_source.h
include/private/deephaven/client/arrowutil/arrow_value_converter.h
Expand Down Expand Up @@ -107,6 +108,7 @@ set(ALL_FILES
src/utility/table_maker.cc

include/public/deephaven/client/utility/arrow_util.h
include/public/deephaven/client/utility/internal_types.h
include/public/deephaven/client/utility/misc_types.h
include/public/deephaven/client/utility/table_maker.h
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,22 @@ namespace internal {
// null-ness by determining whether the optional has a value.
// kTimestamp is its own special case, where nullness is determined by the underlying nanos
// being equal to Deephaven's NULL_LONG.
// kLocalDate and kLocalTime are like kTimestamp except they resolve to different data types.
// kLocalDate and kLocalTime are similar to kTimestamp in nullness except they resolve to different
// data types.
enum class ArrowProcessingStyle { kNormal, kBooleanOrString, kTimestamp, kLocalDate, kLocalTime };

/**
* When 'array' has dynamic type arrow::TimestampArray or arrow::Time64Array, look at the
* underlying time resolution of the arrow type and calculate a conversion factor from that unit
* to nanoseconds. For example if the underlying time unit is arrow::TimeUnit::MILLI, then the
* conversion factor would be 1_000_000, meaning that one needs to multiply incoming millisecond
* values by one million to convert them to nanoseconds. If 'array' is not one of those types,
* return 1.
* @param array The Arrow array
* @return For supported time types, the conversion factor to nanoseconds. Otherwise, 1.
*/
size_t CalcTimeNanoScaleFactor(const arrow::Array &array);

template<ArrowProcessingStyle Style, typename TColumnSourceBase, typename TArrowArray, typename TChunk>
class GenericArrowColumnSource final : public TColumnSourceBase {
using BooleanChunk = deephaven::dhcore::chunk::BooleanChunk;
Expand All @@ -37,7 +50,8 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
using UInt64Chunk = deephaven::dhcore::chunk::UInt64Chunk;

public:
static std::shared_ptr<GenericArrowColumnSource> OfArrowArray(std::shared_ptr<TArrowArray> array) {
static std::shared_ptr<GenericArrowColumnSource>
OfArrowArray(std::shared_ptr<TArrowArray> array) {
std::vector<std::shared_ptr<TArrowArray>> arrays{std::move(array)};
return OfArrowArrayVec(std::move(arrays));
}
Expand All @@ -48,7 +62,9 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
}

explicit GenericArrowColumnSource(std::vector<std::shared_ptr<TArrowArray>> arrays) :
arrays_(std::move(arrays)) {}
arrays_(std::move(arrays)) {
time_nano_scale_factor_ = arrays_.empty() ? 1 : CalcTimeNanoScaleFactor(*arrays_.front());
}

~GenericArrowColumnSource() final = default;

Expand All @@ -67,13 +83,14 @@ class GenericArrowColumnSource final : public TColumnSourceBase {

// This algorithm is a little tricky because the source data and RowSequence are both
// segmented, perhaps in different ways.
auto *typed_dest = VerboseCast<TChunk*>(DEEPHAVEN_LOCATION_EXPR(dest_data));
auto *typed_dest = VerboseCast<TChunk *>(DEEPHAVEN_LOCATION_EXPR(dest_data));
auto *destp = typed_dest->data();
auto outerp = arrays_.begin();
size_t src_segment_begin = 0;
size_t src_segment_end = (*outerp)->length();

auto *null_destp = optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;
auto *null_destp =
optional_dest_null_flags != nullptr ? optional_dest_null_flags->data() : nullptr;

rows.ForEachInterval([&](uint64_t requested_segment_begin, uint64_t requested_segment_end) {
while (true) {
Expand Down Expand Up @@ -147,11 +164,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = DateTime::FromNanos(*ip);
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
*destp = DateTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
*null_destp = is_null;
++null_destp;
}
}
Expand All @@ -175,11 +193,12 @@ class GenericArrowColumnSource final : public TColumnSourceBase {
const auto *src_endp = innerp->raw_values() + relative_end;

for (const auto *ip = src_beginp; ip != src_endp; ++ip) {
*destp = LocalTime::FromNanos(*ip);
auto is_null = *ip == DeephavenTraits<int64_t>::kNullValue;
*destp = LocalTime::FromNanos(is_null ? *ip : (*ip * time_nano_scale_factor_));
++destp;

if (null_destp != nullptr) {
*null_destp = *ip == DeephavenTraits<int64_t>::kNullValue;
*null_destp = is_null;
++null_destp;
}
}
Expand All @@ -200,6 +219,18 @@ class GenericArrowColumnSource final : public TColumnSourceBase {

private:
std::vector<std::shared_ptr<TArrowArray>> arrays_;
/**
* This value is valid for Style == ArrowProcessingStyle::kTimestamp and
* ArrowProcessingStyle::kLocalTime, and ignored for other ArrowProcessingStyle enumeration
* values.
*
* These ArrowProcessingStyles come into play when processing the arrow types
* arrow::TimestampType and arrow::Time64Type respectively.
*
* The value stores a conversion factor from whatever the input scale is to nanoseconds.
* For example, if the input timescale is milliseconds, this value will be 1_000_000.
*/
size_t time_nano_scale_factor_ = 1;
};
} // namespace internal

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
#pragma once

#include <arrow/flight/types.h>

namespace deephaven::client::utility {
/**
* For Deephaven use only
*/
namespace internal {
/**
* This class exists only for the benefit of the unit tests. Our normal DateTime class has a
* native time resolution of nanoseconds. This class allows our unit tests to upload a
* DateTime having a different time unit so we can confirm that the client and server both handle
* it correctly.
*/
template<arrow::TimeUnit::type UNIT>
struct InternalDateTime {
explicit InternalDateTime(int64_t value) : value_(value) {}

int64_t value_ = 0;
};

/**
* This class exists only for the benefit of the unit tests. Our normal LocalTime class has a
* native time resolution of nanoseconds. This class allows our unit tests to upload a
* LocalTime having a different time unit so we can confirm that the client and server both handle
* it correctly.
*/
template<arrow::TimeUnit::type UNIT>
struct InternalLocalTime {
// Arrow Time64 only supports micro and nano units
static_assert(UNIT == arrow::TimeUnit::MICRO || UNIT == arrow::TimeUnit::NANO);

explicit InternalLocalTime(int64_t value) : value_(value) {}

int64_t value_ = 0;
};
} // namespace internal
} // namespace deephaven::client::utility
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include "deephaven/client/client.h"
#include "deephaven/client/utility/arrow_util.h"
#include "deephaven/client/utility/internal_types.h"
#include "deephaven/client/utility/misc_types.h"
#include "deephaven/dhcore/types.h"
#include "deephaven/dhcore/utility/utility.h"
#include "deephaven/third_party/fmt/format.h"

Expand Down Expand Up @@ -356,6 +359,38 @@ struct TypeConverterTraits<std::optional<T>> {
}
};

template<arrow::TimeUnit::type UNIT>
struct TypeConverterTraits<deephaven::client::utility::internal::InternalDateTime<UNIT>> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::timestamp(UNIT, "UTC");
}
static arrow::TimestampBuilder GetBuilder() {
return arrow::TimestampBuilder(GetDataType(), arrow::default_memory_pool());
}
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalDateTime<UNIT> &o) {
return o.value_;
}
static std::string_view GetDeephavenTypeName() {
return "java.time.ZonedDateTime";
}
};

template<arrow::TimeUnit::type UNIT>
struct TypeConverterTraits<deephaven::client::utility::internal::InternalLocalTime<UNIT>> {
static std::shared_ptr<arrow::DataType> GetDataType() {
return arrow::time64(UNIT);
}
static arrow::Time64Builder GetBuilder() {
return arrow::Time64Builder(GetDataType(), arrow::default_memory_pool());
}
static int64_t Reinterpret(const deephaven::client::utility::internal::InternalLocalTime<UNIT> &o) {
return o.value_;
}
static std::string_view GetDeephavenTypeName() {
return "java.time.LocalTime";
}
};

template<typename T>
TypeConverter TypeConverter::CreateNew(const std::vector<T> &values) {
using deephaven::client::utility::OkOrThrow;
Expand Down
87 changes: 87 additions & 0 deletions cpp-client/deephaven/dhclient/src/arrowutil/arrow_column_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
#include "deephaven/client/arrowutil/arrow_column_source.h"
#include "deephaven/client/utility/arrow_util.h"

using deephaven::client::utility::OkOrThrow;

namespace deephaven::client::arrowutil {
namespace internal {

namespace {
struct NanoScaleFactorVisitor final : public arrow::TypeVisitor {
size_t result_ = 1;

arrow::Status Visit(const arrow::Int8Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int16Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int32Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Int64Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::FloatType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::DoubleType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::BooleanType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::UInt16Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::StringType &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::TimestampType &type) final {
result_ = ScaleFromUnit(type.unit());
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Date64Type &/*type*/) final {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &type) final {
result_ = ScaleFromUnit(type.unit());
return arrow::Status::OK();
}

static size_t ScaleFromUnit(arrow::TimeUnit::type unit) {
switch (unit) {
case arrow::TimeUnit::SECOND: return 1'000'000'000;
case arrow::TimeUnit::MILLI: return 1'000'000;
case arrow::TimeUnit::MICRO: return 1'000;
case arrow::TimeUnit::NANO: return 1;
default: {
auto message = fmt::format("Unhandled arrow::TimeUnit {}", static_cast<size_t>(unit));
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
}
}
};
} // namespace

size_t CalcTimeNanoScaleFactor(const arrow::Array &array) {
NanoScaleFactorVisitor visitor;
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(array.type()->Accept(&visitor)));
return visitor.result_;
}
} // namespace internal
} // namespace deephaven::client::arrowutil
14 changes: 2 additions & 12 deletions cpp-client/deephaven/dhclient/src/utility/arrow_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::TimestampType &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected TimestampType with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
arrow::Status Visit(const arrow::TimestampType &/*type*/) final {
type_id_ = ElementTypeId::kTimestamp;
return arrow::Status::OK();
}
Expand All @@ -96,12 +91,7 @@ struct ArrowToElementTypeId final : public arrow::TypeVisitor {
return arrow::Status::OK();
}

arrow::Status Visit(const arrow::Time64Type &type) final {
if (type.unit() != arrow::TimeUnit::NANO) {
auto message = fmt::format("Expected Time64Type with nano units, got {}",
type.ToString());
throw std::runtime_error(DEEPHAVEN_LOCATION_STR(message));
}
arrow::Status Visit(const arrow::Time64Type &/*type*/) final {
type_id_ = ElementTypeId::kLocalTime;
return arrow::Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ class LocalTime {
/**
* Converts nanoseconds-since-start-of-day to LocalTime. The Deephaven null value sentinel is
* turned into LocalTime(0).
* TODO(kosak): find out null convention
* @param nanos Nanoseconds since the start of the day.
* @return The corresponding LocalTime.
*/
Expand Down Expand Up @@ -579,7 +578,6 @@ class LocalTime {
return !(lhs == rhs);
}
};

} // namespace deephaven::dhcore

template<> struct fmt::formatter<deephaven::dhcore::DateTime> : ostream_formatter {};
Expand Down
1 change: 1 addition & 0 deletions cpp-client/deephaven/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_executable(dhclient_tests
src/table_test.cc
src/test_util.cc
src/ticking_test.cc
src/time_unit_test.cc
src/ungroup_test.cc
src/update_by_test.cc
src/utility_test.cc
Expand Down
Loading

0 comments on commit 6252118

Please sign in to comment.