Skip to content

Commit

Permalink
Read/write support for TIME64 with usec precision
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd committed Jul 1, 2024
1 parent ce896df commit f68383d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 2 deletions.
35 changes: 35 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,41 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadDate) {
ASSERT_EQ(data_buffer[1], 47482);
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadTime) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyTime;
data.size_bytes = sizeof(kTestPgCopyTime);

auto col_type = PostgresType(PostgresTypeId::kTime);
PostgresType input_type(PostgresTypeId::kRecord);
input_type.AppendChild("col", col_type);

PostgresCopyStreamTester tester;
ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
ASSERT_EQ(tester.ReadAll(&data), ENODATA);
ASSERT_EQ(data.data.as_uint8 - kTestPgCopyTime, sizeof(kTestPgCopyTime));
ASSERT_EQ(data.size_bytes, 0);

nanoarrow::UniqueArray array;
ASSERT_EQ(tester.GetArray(array.get()), NANOARROW_OK);
ASSERT_EQ(array->length, 4);
ASSERT_EQ(array->n_children, 1);

auto validity = reinterpret_cast<const uint8_t*>(array->children[0]->buffers[0]);
auto data_buffer = reinterpret_cast<const int64_t*>(array->children[0]->buffers[1]);
ASSERT_NE(validity, nullptr);
ASSERT_NE(data_buffer, nullptr);

ASSERT_TRUE(ArrowBitGet(validity, 0));
ASSERT_TRUE(ArrowBitGet(validity, 1));
ASSERT_TRUE(ArrowBitGet(validity, 2));
ASSERT_FALSE(ArrowBitGet(validity, 3));

ASSERT_EQ(data_buffer[0], 0);
ASSERT_EQ(data_buffer[1], 86399000000);
ASSERT_EQ(data_buffer[2], 49376123456);
}

TEST(PostgresCopyUtilsTest, PostgresCopyReadNumeric) {
ArrowBufferView data;
data.data.as_uint8 = kTestPgCopyNumeric;
Expand Down
9 changes: 9 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ static const uint8_t kTestPgCopyDate[] = {
0x04, 0xff, 0xff, 0x71, 0x54, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00,
0x00, 0x8e, 0xad, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

// COPY (SELECT CAST("col" AS TIME) AS "col" FROM ( VALUES ('00:00:00'), ('23:59:59'),
// ('13:42:57.123456'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary);
static const uint8_t kTestPgCopyTime[] = {
0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00,
0x14, 0x1d, 0xc8, 0x1d, 0xc0, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00,
0x0b, 0x7f, 0x0b, 0xda, 0x40, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};

// COPY (SELECT CAST(col AS TIMESTAMP) FROM ( VALUES ('1900-01-01 12:34:56'),
// ('2100-01-01 12:34:56'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY);
static const uint8_t kTestPgCopyTimestamp[] = {
Expand Down
31 changes: 31 additions & 0 deletions c/driver/postgresql/copy/postgres_copy_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,37 @@ TEST(PostgresCopyUtilsTest, PostgresCopyWriteDate) {
}
}

TEST(PostgresCopyUtilsTest, PostgresCopyWriteTime) {
adbc_validation::Handle<struct ArrowSchema> schema;
adbc_validation::Handle<struct ArrowArray> array;
struct ArrowError na_error;

const enum ArrowTimeUnit unit = NANOARROW_TIME_UNIT_MICRO;
const auto values =
std::vector<std::optional<int64_t>>{0, 86399000000, 49376123456, std::nullopt};

ArrowSchemaInit(&schema.value);
ArrowSchemaSetTypeStruct(&schema.value, 1);
ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIME64, unit, nullptr);
ArrowSchemaSetName(schema->children[0], "col");
ASSERT_EQ(
adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values),
ADBC_STATUS_OK);

PostgresCopyStreamWriteTester tester;
ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
ASSERT_EQ(tester.WriteAll(nullptr), ENODATA);

const struct ArrowBuffer buf = tester.WriteBuffer();
// The last 2 bytes of a message can be transmitted via PQputCopyData
// so no need to test those bytes from the Writer
constexpr size_t buf_size = sizeof(kTestPgCopyTime) - 2;
ASSERT_EQ(buf.size_bytes, buf_size);
for (size_t i = 0; i < buf_size; i++) {
ASSERT_EQ(buf.data[i], kTestPgCopyTime[i]);
}
}

// This buffer is similar to the read variant above but removes special values
// nan, ±inf as they are not supported via the Arrow Decimal types
// COPY (SELECT CAST(col AS NUMERIC) AS col FROM ( VALUES (NULL), (-123.456),
Expand Down
9 changes: 7 additions & 2 deletions c/driver/postgresql/copy/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,13 @@ static inline ArrowErrorCode MakeCopyFieldReader(
}

case NANOARROW_TYPE_TIME64: {
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>();
return NANOARROW_OK;
switch (pg_type.type_id()) {
case PostgresTypeId::kTime:
*out = std::make_unique<PostgresCopyNetworkEndianFieldReader<int64_t>>();
return NANOARROW_OK;
default:
return ErrorCantConvert(error, pg_type, schema_view);
}
}

case NANOARROW_TYPE_TIMESTAMP:
Expand Down
9 changes: 9 additions & 0 deletions c/driver/postgresql/copy/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,15 @@ static inline ArrowErrorCode MakeCopyFieldWriter(
PostgresCopyNetworkEndianFieldWriter<int32_t, kPostgresDateEpoch>>();
return NANOARROW_OK;
}
case NANOARROW_TYPE_TIME64: {
switch (schema_view.time_unit) {
case NANOARROW_TIME_UNIT_MICRO:
*out = std::make_unique<PostgresCopyNetworkEndianFieldWriter<int64_t>>();
return NANOARROW_OK;
default:
return ADBC_STATUS_INVALID_STATE;
}
}
case NANOARROW_TYPE_FLOAT:
*out = std::make_unique<PostgresCopyFloatFieldWriter>();
return NANOARROW_OK;
Expand Down

0 comments on commit f68383d

Please sign in to comment.