diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp index 32f647a06ef5..33d526d07cb7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp @@ -37,10 +37,69 @@ namespace { using namespace NYql; using namespace NKikimr::NMiniKQL; -template -std::shared_ptr ArrowDate32AsYqlDate(const std::shared_ptr& targetType, const std::shared_ptr& value) { - ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); - ::NYql::NUdf::TFixedSizeBlockReader reader; +ui64 GetMultiplierForTimestamp(NDB::FormatSettings::TimestampFormat format) { + switch (format) { + case NDB::FormatSettings::TimestampFormat::UnixTimeMilliseconds: + return 1000; + case NDB::FormatSettings::TimestampFormat::UnixTimeSeconds: + return 1000000; + case NDB::FormatSettings::TimestampFormat::UnixTimeMicroSeconds: + case NDB::FormatSettings::TimestampFormat::ISO: + case NDB::FormatSettings::TimestampFormat::POSIX: + case NDB::FormatSettings::TimestampFormat::Unspecified: + return 1; + } +} + +ui64 GetMultiplierForTimestamp(arrow::DateUnit unit) { + switch (unit) { + case arrow::DateUnit::MILLI: + return 1000; + case arrow::DateUnit::DAY: + return 1000000 * 24 * 3600UL; + } +} + +ui32 GetMultiplierForDatetime(arrow::DateUnit unit) { + switch (unit) { + case arrow::DateUnit::MILLI: + throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the datetime"); + case arrow::DateUnit::DAY: + return 24 * 3600UL; + } +} + +ui64 GetMultiplierForTimestamp(arrow::TimeUnit::type unit) { + switch (unit) { + case arrow::TimeUnit::SECOND: + return 1000000; + case arrow::TimeUnit::MILLI: + return 1000; + case arrow::TimeUnit::MICRO: + return 1; + case arrow::TimeUnit::NANO: + throw parquet::ParquetException(TStringBuilder() << "nanosecond accuracy does not fit into the timestamp"); + } +} + +ui32 GetMultiplierForDatetime(arrow::TimeUnit::type unit) { + switch (unit) { + case arrow::TimeUnit::SECOND: + return 1; + case arrow::TimeUnit::MILLI: + throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the datetime"); + case arrow::TimeUnit::MICRO: + throw parquet::ParquetException(TStringBuilder() << "microsecond accuracy does not fit into the datetime"); + case arrow::TimeUnit::NANO: + throw parquet::ParquetException(TStringBuilder() << "nanosecond accuracy does not fit into the datetime"); + } +} + +// DateTime Converters +template +std::shared_ptr ArrowTypeAsYqlDatetime(const std::shared_ptr& targetType, const std::shared_ptr& value, ui32 multiplier) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; for (i64 i = 0; i < value->length(); ++i) { const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); if constexpr (isOptional) { @@ -49,26 +108,23 @@ std::shared_ptr ArrowDate32AsYqlDate(const std::shared_ptr(); - if (v < 0 || v > ::NYql::NUdf::MAX_DATE) { - throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v); + const TArrowType baseValue = item.As(); + if (baseValue < 0 && baseValue > static_cast(::NYql::NUdf::MAX_DATETIME)) { + throw parquet::ParquetException(TStringBuilder() << "datetime in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATETIME << "]: " << baseValue); } - builder.Add(NUdf::TBlockItem(static_cast(v))); + + const ui64 v = baseValue * multiplier; + if (v > ::NYql::NUdf::MAX_DATETIME) { + throw parquet::ParquetException(TStringBuilder() << "datetime in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATETIME << "] after transformation: " << v); + } + builder.Add(NUdf::TBlockItem(static_cast(v))); } return builder.Build(true).make_array(); } -TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr& targetType, bool isOptional) { - return [targetType, isOptional](const std::shared_ptr& value) { - return isOptional - ? ArrowDate32AsYqlDate(targetType, value) - : ArrowDate32AsYqlDate(targetType, value); - }; -} - template std::shared_ptr ArrowStringAsYqlDateTime(const std::shared_ptr& targetType, const std::shared_ptr& value, const NDB::FormatSettings& formatSettings) { ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); @@ -94,12 +150,34 @@ std::shared_ptr ArrowStringAsYqlDateTime(const std::shared_ptr& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) { - return [targetType, isOptional, formatSettings](const std::shared_ptr& value) { - return isOptional - ? ArrowStringAsYqlDateTime(targetType, value, formatSettings) - : ArrowStringAsYqlDateTime(targetType, value, formatSettings); - }; +template +std::shared_ptr ArrowTypeAsYqlTimestamp(const std::shared_ptr& targetType, const std::shared_ptr& value, ui64 multiplier) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for timestamp could not be represented in non-optional type"); + } + + const TArrowType baseValue = item.As(); + if (baseValue < 0 && baseValue > static_cast(::NYql::NUdf::MAX_TIMESTAMP)) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "]: " << baseValue); + } + + if (static_cast(baseValue) > ::NYql::NUdf::MAX_TIMESTAMP / multiplier) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "] after transformation: " << baseValue); + } + + const ui64 v = baseValue * multiplier; + builder.Add(NUdf::TBlockItem(static_cast(v))); + } + return builder.Build(true).make_array(); } template @@ -127,6 +205,102 @@ std::shared_ptr ArrowStringAsYqlTimestamp(const std::shared_ptr +std::shared_ptr ArrowDate32AsYqlDate(const std::shared_ptr& targetType, const std::shared_ptr& value) { + ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for date could not be represented in non-optional type"); + } + + const i32 v = item.As(); + if (v < 0 || v > ::NYql::NUdf::MAX_DATE) { + throw parquet::ParquetException(TStringBuilder() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v); + } + builder.Add(NUdf::TBlockItem(static_cast(v))); + } + return builder.Build(true).make_array(); +} + +TColumnConverter ArrowUInt32AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, 1) + : ArrowTypeAsYqlDatetime(targetType, value, 1); + }; +} + +TColumnConverter ArrowInt64AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, 1) + : ArrowTypeAsYqlDatetime(targetType, value, 1); + }; +} + +TColumnConverter ArrowUInt16AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, 24*3600) + : ArrowTypeAsYqlDatetime(targetType, value, 24*3600); + }; +} + +TColumnConverter ArrowUInt64AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, 1) + : ArrowTypeAsYqlDatetime(targetType, value, 1); + }; +} + +TColumnConverter ArrowDate64AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, multiplier) + : ArrowTypeAsYqlDatetime(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowTimestampAsYqlDatetime(const std::shared_ptr& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) { + return [targetType, isOptional, multiplier = GetMultiplierForDatetime(timeUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, multiplier) + : ArrowTypeAsYqlDatetime(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowStringAsYqlDateTime(const std::shared_ptr& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) { + return [targetType, isOptional, formatSettings](const std::shared_ptr& value) { + return isOptional + ? ArrowStringAsYqlDateTime(targetType, value, formatSettings) + : ArrowStringAsYqlDateTime(targetType, value, formatSettings); + }; +} + +TColumnConverter ArrowInt32AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, 1) + : ArrowTypeAsYqlDatetime(targetType, value, 1); + }; +} + +TColumnConverter ArrowDate32AsYqlDatetime(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlDatetime(targetType, value, multiplier) + : ArrowTypeAsYqlDatetime(targetType, value, multiplier); + }; +} + TColumnConverter ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, const NDB::FormatSettings& formatSettings) { return [targetType, isOptional, formatSettings](const std::shared_ptr& value) { return isOptional @@ -135,6 +309,81 @@ TColumnConverter ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowDate32AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowInt32AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, NDB::FormatSettings::TimestampFormat timestampFormat) { + return [targetType, isOptional, multiplier = GetMultiplierForTimestamp(timestampFormat)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowInt64AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, NDB::FormatSettings::TimestampFormat timestampFormat) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timestampFormat)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowUInt64AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, NDB::FormatSettings::TimestampFormat timestampFormat) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timestampFormat)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowUInt32AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, NDB::FormatSettings::TimestampFormat timestampFormat) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timestampFormat)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowUInt16AsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional) { + return [targetType, isOptional, multiplier=24*3600*1000000ULL](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowTimestampAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timeUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlTimestamp(targetType, value, multiplier) + : ArrowTypeAsYqlTimestamp(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit unit) { + if (unit == arrow::DateUnit::MILLI) { + throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date"); + } + return [targetType, isOptional](const std::shared_ptr& value) { + return isOptional + ? ArrowDate32AsYqlDate(targetType, value) + : ArrowDate32AsYqlDate(targetType, value); + }; +} + TColumnConverter BuildCustomConverter(const std::shared_ptr& originalType, const std::shared_ptr& targetType, TType* yqlType, const NDB::FormatSettings& formatSettings) { // TODO: support more than 1 optional level bool isOptional = false; @@ -147,18 +396,95 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or if (!slot) { return {}; } - auto slotItem = *slot; switch (originalType->id()) { - case arrow::Type::DATE32: + case arrow::Type::UINT16: { + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowUInt16AsYqlDatetime(targetType, isOptional); + case NUdf::EDataSlot::Timestamp: + return ArrowUInt16AsYqlTimestamp(targetType, isOptional); + default: + return {}; + } + } + case arrow::Type::INT32: { + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowInt32AsYqlDatetime(targetType, isOptional); + case NUdf::EDataSlot::Timestamp: + return ArrowInt32AsYqlTimestamp(targetType, isOptional, formatSettings.timestamp_format_name); + default: + return {}; + } + } + case arrow::Type::UINT32: { + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowUInt32AsYqlDatetime(targetType, isOptional); + case NUdf::EDataSlot::Timestamp: + return ArrowUInt32AsYqlTimestamp(targetType, isOptional, formatSettings.timestamp_format_name); + default: + return {}; + } + } + case arrow::Type::INT64: { + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowInt64AsYqlDatetime(targetType, isOptional); + case NUdf::EDataSlot::Timestamp: + return ArrowInt64AsYqlTimestamp(targetType, isOptional, formatSettings.timestamp_format_name); + default: + return {}; + } + } + case arrow::Type::UINT64: { + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowUInt64AsYqlDatetime(targetType, isOptional); + case NUdf::EDataSlot::Timestamp: + return ArrowUInt64AsYqlTimestamp(targetType, isOptional, formatSettings.timestamp_format_name); + default: + return {}; + } + } + case arrow::Type::DATE32: { + auto& dateType = static_cast(*originalType); switch (slotItem) { case NUdf::EDataSlot::Date: - return ArrowDate32AsYqlDate(targetType, isOptional); + return ArrowDate32AsYqlDate(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::Datetime: + return ArrowDate32AsYqlDatetime(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::Timestamp: + return ArrowDate32AsYqlTimestamp(targetType, isOptional, dateType.unit()); default: return {}; } return {}; - case arrow::Type::BINARY: + } + case arrow::Type::DATE64: { + auto& dateType = static_cast(*originalType); + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowDate64AsYqlDatetime(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::Timestamp: + return ArrowDate64AsYqlTimestamp(targetType, isOptional, dateType.unit()); + default: + return {}; + } + } + case arrow::Type::TIMESTAMP: { + auto& timestampType = static_cast(*originalType); + switch (slotItem) { + case NUdf::EDataSlot::Datetime: + return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit()); + case NUdf::EDataSlot::Timestamp: + return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit()); + default: + return {}; + } + } + case arrow::Type::BINARY: { switch (slotItem) { case NUdf::EDataSlot::Datetime: return ArrowStringAsYqlDateTime(targetType, isOptional, formatSettings); @@ -167,7 +493,7 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or default: return {}; } - return {}; + } default: return {}; } diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index 4ffc9ed05d6a..614dbf70dbe0 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -6,6 +6,8 @@ import io import yatest +import pyarrow as pa +import pyarrow.parquet as pq import pytest from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase @@ -13,10 +15,11 @@ import ydb.public.api.protos.ydb_value_pb2 as ydb import ydb.public.api.protos.draft.fq_pb2 as fq -import ydb.tests.fq.s3.s3_helpers as s3_helpers from ydb.tests.tools.fq_runner.kikimr_utils import yq_all -from google.protobuf import struct_pb2 +import ydb.tests.fq.s3.s3_helpers as s3_helpers +import ydb.tests.library.common.yatest_common as yatest_common +from google.protobuf import struct_pb2 class TestS3(TestYdsBase): def create_bucket_and_upload_file(self, filename, s3, kikimr): @@ -884,3 +887,810 @@ def test_string_not_null_multi(self, kikimr, s3, client, filename): assert data.result.result_set.rows[0].items[0].bytes_value == b"", str(data.result.result_set) assert data.result.result_set.rows[0].items[1].bytes_value == b"", str(data.result.result_set) assert data.result.result_set.rows[0].items[2].bytes_value == b"", str(data.result.result_set) + + + @yq_all + def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix): + # timestamp[ms] -> Timestamp + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_timestamp.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + )); + + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[us] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + + # timestamp[s] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + + # timestamp[ns] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date64 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date32 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int32 [UNIX_TIME_SECONDS] -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_SECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int64 [UNIX_TIME_SECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_SECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int64 [UNIX_TIME_MILLISECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int64 [UNIX_TIME_MICROSECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint32 [UNIX_TIME_SECONDS] -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_SECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint64 [UNIX_TIME_SECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_SECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint64 [UNIX_TIME_MILLISECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint64 [UNIX_TIME_MICROSECONDS] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + ), + `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint16 [default] -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint16()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Timestamp + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + @yq_all + def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix): + # timestamp[ms] -> Datetime + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_datetime.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Datetime + )); + + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + issues = describe_result.query.issue + assert "millisecond accuracy does not fit into the datetime" in str(issues) + + # timestamp[us] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + issues = describe_result.query.issue + assert "microsecond accuracy does not fit into the datetime" in str(issues) + + # timestamp[s] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + issues = describe_result.query.issue + assert "millisecond accuracy does not fit into the datetime" in str(issues) + + # timestamp[ns] -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + describe_result = client.describe_query(query_id).result + issues = describe_result.query.issue + assert "microsecond accuracy does not fit into the datetime" in str(issues) + + # date64 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Datetime + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date32 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int32 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Datetime + )); + + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00Z" as Datetime), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # int64 -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.int64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint32 -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint64 -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # uint16 [default] -> Timestamp + + # 2024-04-02T00:00:00.000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.uint16()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Datetime + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" diff --git a/ydb/tests/fq/s3/ya.make b/ydb/tests/fq/s3/ya.make index eb4f4829ebc8..a1705a15c38f 100644 --- a/ydb/tests/fq/s3/ya.make +++ b/ydb/tests/fq/s3/ya.make @@ -7,6 +7,7 @@ INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.in PEERDIR( contrib/python/boto3 + contrib/python/pyarrow library/python/testing/recipe library/python/testing/yatest_common library/recipes/common