Skip to content

Commit

Permalink
flag truncate_timestamps added to fetcharrow functions
Browse files Browse the repository at this point in the history
Arrow only allows timestamps within years between 1400 to 9999.
This new flag will allow to truncate the dates preventing errors.
  • Loading branch information
JorgeGarciaIrazabal committed Apr 15, 2019
1 parent 5556625 commit c488cc5
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 16 deletions.
16 changes: 15 additions & 1 deletion cpp/turbodbc/Library/src/time_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <turbodbc/time_helpers.h>

#ifdef _WIN32
#include <windows.h>
#endif
Expand All @@ -26,6 +25,21 @@ int64_t timestamp_to_microseconds(char const * data_pointer)
return (ts - timestamp_epoch).total_microseconds();
}

int64_t timestamp_to_microseconds_truncated(char const * data_pointer)
{
auto & sql_ts = *reinterpret_cast<SQL_TIMESTAMP_STRUCT const *>(data_pointer);
if (sql_ts.year > 9999) {
SQL_TIMESTAMP_STRUCT new_sql_ts = {9999, 12, 31, 23, 59, 59, 999999000};
return timestamp_to_microseconds(reinterpret_cast<char const *>(&new_sql_ts));
}
if(sql_ts.year < 1400) {
SQL_TIMESTAMP_STRUCT new_sql_ts = {1400, 1, 1, 0, 0, 0, 0};
return timestamp_to_microseconds(reinterpret_cast<char const *>(&new_sql_ts));
}

return timestamp_to_microseconds(data_pointer);
}


void microseconds_to_timestamp(int64_t microseconds, char * data_pointer)
{
Expand Down
9 changes: 9 additions & 0 deletions cpp/turbodbc/Library/turbodbc/time_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ namespace turbodbc {
*/
int64_t timestamp_to_microseconds(char const * data_pointer);


/**
* @brief Convert an SQL_TIMESTAMP_STRUCT stored at data_pointer to an
* integer describing the elapsed microseconds since the POSIX epoch
* and truncated to arrow valid range (year 1400 to 9999)
*/
int64_t timestamp_to_microseconds_truncated(char const * data_pointer);


/**
* @brief Convert the number of microseconds since the POSIX epoch
* to a timestamp and store it in an SQL_TIMESTAMP_STRUCT located
Expand Down
19 changes: 19 additions & 0 deletions cpp/turbodbc/Test/tests/time_helpers_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <sql.h>

using turbodbc::timestamp_to_microseconds;
using turbodbc::timestamp_to_microseconds_truncated;
using turbodbc::microseconds_to_timestamp;
using turbodbc::nanoseconds_to_timestamp;
using turbodbc::date_to_days;
Expand Down Expand Up @@ -59,6 +60,24 @@ TEST(TimeHelpersTest, MicrosecondsToTimestampForYear4000)
}


TEST(TimeHelpersTest, TimestampToMicrosecondsTruncatedForYear10000)
{
SQL_TIMESTAMP_STRUCT data = {10000, 01, 02, 3, 4, 5, 123456000};
// expectation one microsecond before year 10000
std::int64_t expected = 253402300799999999;
EXPECT_EQ(expected, timestamp_to_microseconds_truncated(reinterpret_cast<char const *>(&data)));
}


TEST(TimeHelpersTest, TimestampToMicrosecondsTruncatedForYear100)
{
SQL_TIMESTAMP_STRUCT data = {100, 01, 02, 3, 4, 5, 0};
// expectation january first 1400
std::int64_t expected = -17987443200000000;
EXPECT_EQ(expected, timestamp_to_microseconds_truncated(reinterpret_cast<char const *>(&data)));
}


TEST(TimeHelpersTest, NanosecondsToTimestampForEpoch)
{
std::int64_t const nanoseconds = 0;
Expand Down
10 changes: 6 additions & 4 deletions cpp/turbodbc_arrow/Library/src/arrow_result_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ Status AppendIntsToBuilder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> c

}

arrow_result_set::arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary, bool adaptive_integers) :
base_result_(base), strings_as_dictionary_(strings_as_dictionary), adaptive_integers_(adaptive_integers)
arrow_result_set::arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary, bool adaptive_integers, bool truncate_timestamps) :
base_result_(base), strings_as_dictionary_(strings_as_dictionary), adaptive_integers_(adaptive_integers), truncate_timestamps_(truncate_timestamps)
{
}

Expand Down Expand Up @@ -213,12 +213,14 @@ Status append_to_bool_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder
return typed_builder->AppendValues(data_ptr, rows_in_batch, valid_bytes);
}

Status append_to_timestamp_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> const& builder, cpp_odbc::multi_value_buffer const& input_buffer, uint8_t*) {
Status append_to_timestamp_builder(size_t rows_in_batch, std::unique_ptr<ArrayBuilder> const& builder, cpp_odbc::multi_value_buffer const& input_buffer, uint8_t*, bool truncate_timestamp) {
auto typed_builder = static_cast<TimestampBuilder*>(builder.get());
for (std::size_t j = 0; j < rows_in_batch; ++j) {
auto element = input_buffer[j];
if (element.indicator == SQL_NULL_DATA) {
ARROW_RETURN_NOT_OK(typed_builder->AppendNull());
} else if (truncate_timestamp) {
ARROW_RETURN_NOT_OK(typed_builder->Append(turbodbc::timestamp_to_microseconds_truncated(element.data_pointer)));
} else {
ARROW_RETURN_NOT_OK(typed_builder->Append(turbodbc::timestamp_to_microseconds(element.data_pointer)));
}
Expand Down Expand Up @@ -288,7 +290,7 @@ Status arrow_result_set::process_batch(size_t rows_in_batch, std::vector<std::un
ARROW_RETURN_NOT_OK(append_to_bool_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));
break;
case turbodbc::type_code::timestamp:
ARROW_RETURN_NOT_OK(append_to_timestamp_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));
ARROW_RETURN_NOT_OK(append_to_timestamp_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data(), truncate_timestamps_));
break;
case turbodbc::type_code::date:
ARROW_RETURN_NOT_OK(append_to_date_builder(rows_in_batch, columns[i], buffers[i].get(), valid_bytes.data()));
Expand Down
4 changes: 2 additions & 2 deletions cpp/turbodbc_arrow/Library/src/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ using turbodbc_arrow::arrow_result_set;
namespace {

arrow_result_set make_arrow_result_set(std::shared_ptr<turbodbc::result_sets::result_set> result_set_pointer,
bool strings_as_dictionary, bool adaptive_integers)
bool strings_as_dictionary, bool adaptive_integers, bool truncate_timestamps)
{
return arrow_result_set(*result_set_pointer, strings_as_dictionary, adaptive_integers);
return arrow_result_set(*result_set_pointer, strings_as_dictionary, adaptive_integers, truncate_timestamps);
}

void set_arrow_parameters(turbodbc::cursor & cursor, pybind11::object const & pyarrow_table)
Expand Down
3 changes: 2 additions & 1 deletion cpp/turbodbc_arrow/Library/turbodbc_arrow/arrow_result_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PYBIND11_EXPORT arrow_result_set {
* in the base result set in a row-based fashion
*/
arrow_result_set(turbodbc::result_sets::result_set & base, bool strings_as_dictionary,
bool adaptive_integers);
bool adaptive_integers, bool truncate_timestamps);

/**
* @brief Retrieve a native (C++) Arrow Table which contains
Expand Down Expand Up @@ -58,6 +58,7 @@ class PYBIND11_EXPORT arrow_result_set {
turbodbc::result_sets::result_set & base_result_;
bool strings_as_dictionary_;
bool adaptive_integers_;
bool truncate_timestamps_;
};

}
9 changes: 5 additions & 4 deletions cpp/turbodbc_arrow/Test/tests/arrow_result_set_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace {

bool const strings_as_strings = false;
bool const strings_as_dictionaries = true;
bool const truncate_timestamps = true;

bool const plain_integers = false;
bool const compressed_integers = true;
Expand Down Expand Up @@ -129,7 +130,7 @@ class ArrowResultSetTest : public ::testing::Test {
auto schema = std::make_shared<arrow::Schema>(expected_fields);
std::shared_ptr<arrow::Table> expected_table = arrow::Table::Make(schema, expected_arrays);

turbodbc_arrow::arrow_result_set ars(rs, strings_as_dictionary, adaptive_integers);
turbodbc_arrow::arrow_result_set ars(rs, strings_as_dictionary, adaptive_integers, truncate_timestamps);
std::shared_ptr<arrow::Table> table;
ASSERT_OK(ars.fetch_all_native(&table, false));
ASSERT_TRUE(expected_table->Equals(*table));
Expand All @@ -151,7 +152,7 @@ TEST_F(ArrowResultSetTest, SimpleSchemaConversion)
"int_column", turbodbc::type_code::integer, size_unimportant, true}};
EXPECT_CALL(rs, do_get_column_info()).WillRepeatedly(testing::Return(expected));

turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
auto schema = ars.schema();
ASSERT_EQ(schema->num_fields(), 1);
auto field = schema->field(0);
Expand Down Expand Up @@ -191,7 +192,7 @@ TEST_F(ArrowResultSetTest, AllTypesSchemaConversion)
std::make_shared<arrow::Field>("nonnull_int_column", arrow::int64(), false)
};

turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
auto schema = ars.schema();

ASSERT_EQ(schema->num_fields(), 12);
Expand Down Expand Up @@ -225,7 +226,7 @@ TEST_F(ArrowResultSetTest, SingleBatchSingleColumnResultSetConversion)
EXPECT_CALL(rs, do_get_buffers()).WillOnce(testing::Return(expected_buffers));
EXPECT_CALL(rs, do_fetch_next_batch()).WillOnce(testing::Return(OUTPUT_SIZE)).WillOnce(testing::Return(0));

turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers);
turbodbc_arrow::arrow_result_set ars(rs, strings_as_strings, plain_integers, truncate_timestamps);
std::shared_ptr<arrow::Table> table;
ASSERT_OK(ars.fetch_all_native(&table, false));
ASSERT_TRUE(expected_table->Equals(*table));
Expand Down
15 changes: 11 additions & 4 deletions python/turbodbc/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _numpy_batch_generator(self):
first_run = False
yield result_batch

def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False):
def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False, truncate_timestamps=False):
"""
Fetches rows in the active result set generated with ``execute()`` or
``executemany()`` as an iterable of arrow tables.
Expand All @@ -320,6 +320,8 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
smallest possible integer type in which all values can be
stored. Be aware that here the type depends on the resulting
data.
:param truncate_timestamps: If true, instead of throwing if a timestamp is not between
the valid years 1400 and 9999, a truncated value will be returned.
:return: generator of ``pyarrow.Table``
"""
Expand All @@ -329,7 +331,8 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
rs = make_arrow_result_set(
self.impl.get_result_set(),
strings_as_dictionary,
adaptive_integers)
adaptive_integers,
truncate_timestamps)
first_run = True
while True:
table = rs.fetch_next_batch()
Expand All @@ -341,7 +344,7 @@ def fetcharrowbatches(self, strings_as_dictionary=False, adaptive_integers=False
else:
raise Error(_NO_ARROW_SUPPORT_MSG)

def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False, truncate_timestamps=False):
"""
Fetches all rows in the active result set generated with ``execute()`` or
``executemany()``.
Expand All @@ -355,6 +358,9 @@ def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
stored. Be aware that here the type depends on the resulting
data.
:param truncate_timestamps: If true, instead of throwing if a timestamp is not between
the valid years 1400 and 9999, a truncated value will be returned.
:return: ``pyarrow.Table``
"""
self._assert_valid_result_set()
Expand All @@ -363,7 +369,8 @@ def fetchallarrow(self, strings_as_dictionary=False, adaptive_integers=False):
return make_arrow_result_set(
self.impl.get_result_set(),
strings_as_dictionary,
adaptive_integers).fetch_all()
adaptive_integers,
truncate_timestamps).fetch_all()
else:
raise Error(_NO_ARROW_SUPPORT_MSG)

Expand Down

0 comments on commit c488cc5

Please sign in to comment.