Skip to content

Commit

Permalink
Support Timestamp type in xlang JDBC Read and Write (#22561)
Browse files Browse the repository at this point in the history
Support Timestamp type in xlang JDBC Read and Write
  • Loading branch information
TheNeuralBit committed Aug 29, 2022
2 parents 7cc48e9 + b0484e7 commit 45cf3c4
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed Python cross-language JDBC IO Connector cannot read or write rows containing Timestamp type values [19817](https://github.com/apache/beam/issues/19817).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ examples:
# import typing
# import apache_beam as beam
# class Test(typing.NamedTuple):
# f_map: typing.Mapping[str,int]
# f_map: typing.Mapping[str,typing.Optional[int]]
# schema = beam.typehints.schemas.named_tuple_to_schema(Test)
# coder = beam.coders.row_coder.RowCoder(schema)
# print("payload = %s" % schema.SerializeToString())
Expand Down Expand Up @@ -453,6 +453,13 @@ coder:
examples:
"\x03\x00\x02\x00\xb6\x95\xd5\xf9\x05\xc0\xc4\x07\x1b2020-08-13T14:14:14.123456Z\xc0\xf7\x85\xda\xae\x98\xeb\x02": {f_timestamp: {seconds: 1597328054, micros: 123456}, f_string: "2020-08-13T14:14:14.123456Z", f_int: 1597328054123456}

coder:
urn: "beam:coder:row:v1"
# f_timestamp: logical(millis_instant), f_string: string, f_int: int64
payload: "\n:\n\x0bf_timestamp\x1a+:)\n#beam:logical_type:millis_instant:v1\x1a\x02\x10\x04\n\x0e\n\x08f_string\x1a\x02\x10\x07\n\x0b\n\x05f_int\x1a\x02\x10\x04\x12$80be749a-5700-4ede-89d8-dd9a4433a3f8"
examples:
"\x03\x00\x80\x00\x01s\xe8+\xd7k\x182020-08-13T14:14:14.123Z\xeb\xae\xaf\xc1\xbe.": {f_timestamp: -9223370439526721685, f_string: "2020-08-13T14:14:14.123Z", f_int: 1597328054123}

---

coder:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ message LogicalTypes {
// amount of time since the epoch.
MICROS_INSTANT = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:micros_instant:v1"];

// A URN for MillisInstant type
// - Representation type: INT64
// - A timestamp without a timezone represented by the number of
// milliseconds since the epoch. The INT64 value is encoded with
// big-endian shifted such that lexicographic ordering of the bytes
// corresponds to chronological order.
MILLIS_INSTANT = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:millis_instant:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,9 @@ private static Object parseField(Object value, Schema.FieldType fieldType) {
return (String) value;
case BOOLEAN:
return (Boolean) value;
case DATETIME:
// convert shifted millis to epoch millis as in InstantCoder
return new Instant((Long) value + -9223372036854775808L);
case BYTES:
// extract String as byte[]
return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
Expand Down Expand Up @@ -465,7 +468,7 @@ private static Object parseField(Object value, Schema.FieldType fieldType) {
return fieldType
.getLogicalType()
.toInputType(parseField(value, fieldType.getLogicalType().getBaseType()));
default: // DECIMAL, DATETIME
default: // DECIMAL
throw new IllegalArgumentException("Unsupported type name: " + fieldType.getTypeName());
}
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/test/regression/coders/fromyaml/fromyaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var unimplementedCoders = map[string]bool{
var filteredCases = []struct{ filter, reason string }{
{"logical", "BEAM-9615: Support logical types"},
{"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."},
{"80be749a-5700-4ede-89d8-dd9a4433a3f8", "https://github.com/apache/beam/issues/19817: Support millis_instant."},
}

// Coder is a representation a serialized beam coder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.model.pipeline.v1.SchemaApi.ArrayTypeValue;
import org.apache.beam.model.pipeline.v1.SchemaApi.AtomicTypeValue;
Expand Down Expand Up @@ -64,9 +65,13 @@
})
public class SchemaTranslation {

private static final String URN_BEAM_LOGICAL_DATETIME = "beam:logical_type:datetime:v1";
private static final String URN_BEAM_LOGICAL_DECIMAL = "beam:logical_type:decimal:v1";
private static final String URN_BEAM_LOGICAL_JAVASDK = "beam:logical_type:javasdk:v1";
private static final String URN_BEAM_LOGICAL_MILLIS_INSTANT =
SchemaApi.LogicalTypes.Enum.MILLIS_INSTANT
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

// TODO(https://github.com/apache/beam/issues/19715): Populate this with a LogicalTypeRegistrar,
// which includes a way to construct
Expand Down Expand Up @@ -198,7 +203,7 @@ private static SchemaApi.FieldType fieldTypeToProto(
case DATETIME:
builder.setLogicalType(
SchemaApi.LogicalType.newBuilder()
.setUrn(URN_BEAM_LOGICAL_DATETIME)
.setUrn(URN_BEAM_LOGICAL_MILLIS_INSTANT)
.setRepresentation(fieldTypeToProto(FieldType.INT64, serializeLogicalType))
.build());
break;
Expand Down Expand Up @@ -358,7 +363,7 @@ private static FieldType fieldTypeFromProtoWithoutNullable(SchemaApi.FieldType p
}
// Special-case for DATETIME and DECIMAL which are logical types in portable representation,
// but not yet in Java. (https://github.com/apache/beam/issues/19817)
if (urn.equals(URN_BEAM_LOGICAL_DATETIME)) {
if (urn.equals(URN_BEAM_LOGICAL_MILLIS_INSTANT)) {
return FieldType.DATETIME;
} else if (urn.equals(URN_BEAM_LOGICAL_DECIMAL)) {
return FieldType.DECIMAL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadWithPartitions;
import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -189,6 +190,18 @@ static JdbcIO.PreparedStatementSetCaller getPreparedStatementSetCaller(
}

String logicalTypeName = fieldType.getLogicalType().getIdentifier();

if (logicalTypeName.equals(MicrosInstant.IDENTIFIER)) {
// Process timestamp of MicrosInstant kind, which should only be passed from other type
// systems such as SQL and other Beam SDKs.
return (element, ps, i, fieldWithIndex) -> {
// MicrosInstant uses native java.time.Instant instead of joda.Instant.
java.time.Instant value =
element.getLogicalTypeValue(fieldWithIndex.getIndex(), java.time.Instant.class);
ps.setTimestamp(i + 1, value == null ? null : new Timestamp(value.toEpochMilli()));
};
}

JDBCType jdbcType = JDBCType.valueOf(logicalTypeName);
switch (jdbcType) {
case DATE:
Expand Down
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from apache_beam.coders.coders import NullableCoder
from apache_beam.coders.coders import SinglePrecisionFloatCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.coders.coders import TimestampCoder
from apache_beam.coders.coders import VarIntCoder
from apache_beam.portability import common_urns
from apache_beam.portability.api import schema_pb2
Expand Down Expand Up @@ -168,10 +169,15 @@ def _nonnull_coder_from_type(field_type):
_coder_from_type(field_type.map_type.key_type),
_coder_from_type(field_type.map_type.value_type))
elif type_info == "logical_type":
# Special case for the Any logical type. Just use the default coder for an
# unknown Python object.
if field_type.logical_type.urn == PYTHON_ANY_URN:
# Special case for the Any logical type. Just use the default coder for an
# unknown Python object.
return typecoders.registry.get_coder(object)
elif field_type.logical_type.urn == common_urns.millis_instant.urn:
# Special case for millis instant logical type used to handle Java sdk's
# millis Instant. It explicitly uses TimestampCoder which deals with fix
# length 8-bytes big-endian-long instead of VarInt coder.
return TimestampCoder()

logical_type = LogicalType.from_runner_api(field_type.logical_type)
return LogicalTypeCoder(
Expand Down
58 changes: 43 additions & 15 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

# pytype: skip-file

import datetime
import logging
import time
import typing
import unittest
from typing import Callable
from typing import Union

import pytz
from parameterized import parameterized

import apache_beam as beam
Expand All @@ -33,6 +36,9 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
Expand All @@ -53,19 +59,14 @@

JdbcReadTestRow = typing.NamedTuple(
"JdbcReadTestRow",
[
("f_int", int),
],
[("f_int", int), ("f_timestamp", Timestamp)],
)
coders.registry.register_coder(JdbcReadTestRow, coders.RowCoder)

JdbcWriteTestRow = typing.NamedTuple(
"JdbcWriteTestRow",
[
("f_id", int),
("f_real", float),
("f_string", str),
],
[("f_id", int), ("f_real", float), ("f_string", str),
("f_timestamp", Timestamp)],
)
coders.registry.register_coder(JdbcWriteTestRow, coders.RowCoder)

Expand Down Expand Up @@ -127,10 +128,15 @@ def test_xlang_jdbc_write(self, database):
self._setUpTestCase(container_init, db_string, driver)
table_name = 'jdbc_external_test_write'
self.engine.execute(
"CREATE TABLE {}(f_id INTEGER, f_real FLOAT, f_string VARCHAR(100))".
format(table_name))
"CREATE TABLE {}(f_id INTEGER, f_real FLOAT, f_string VARCHAR(100), f_timestamp TIMESTAMP(3))" # pylint: disable=line-too-long
.format(table_name))
inserted_rows = [
JdbcWriteTestRow(i, i + 0.1, 'Test{}'.format(i))
JdbcWriteTestRow(
i,
i + 0.1,
'Test{}'.format(i),
# In alignment with Java Instant which supports milli precision.
Timestamp.of(seconds=round(time.time(), 3)))
for i in range(ROW_COUNT)
]

Expand All @@ -152,7 +158,11 @@ def test_xlang_jdbc_write(self, database):

fetched_data = self.engine.execute("SELECT * FROM {}".format(table_name))
fetched_rows = [
JdbcWriteTestRow(int(row[0]), float(row[1]), str(row[2]))
JdbcWriteTestRow(
int(row[0]),
float(row[1]),
str(row[2]),
Timestamp.from_utc_datetime(row[3].replace(tzinfo=pytz.UTC)))
for row in fetched_data
]

Expand All @@ -168,10 +178,25 @@ def test_xlang_jdbc_read(self, database):
CrossLanguageJdbcIOTest.DB_CONTAINER_CLASSPATH_STRING[database])
self._setUpTestCase(container_init, db_string, driver)
table_name = 'jdbc_external_test_read'
self.engine.execute("CREATE TABLE {}(f_int INTEGER)".format(table_name))
self.engine.execute(
"CREATE TABLE {}(f_int INTEGER, f_timestamp TIMESTAMP)".format(
table_name))

all_timestamps = []
for i in range(ROW_COUNT):
self.engine.execute("INSERT INTO {} VALUES({})".format(table_name, i))
# prepare timestamp
strtime = Timestamp.now().to_utc_datetime().strftime('%Y-%m-%dT%H:%M:%S')
dttime = datetime.datetime.strptime(
strtime, '%Y-%m-%dT%H:%M:%S').replace(tzinfo=pytz.UTC)
all_timestamps.append(Timestamp.from_utc_datetime(dttime))

# write records using sqlalchemy engine
self.engine.execute(
"INSERT INTO {} VALUES({},'{}')".format(table_name, i, strtime))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

with TestPipeline() as p:
p.not_use_test_runner_api = True
Expand All @@ -188,7 +213,10 @@ def test_xlang_jdbc_read(self, database):
classpath=classpath))

assert_that(
result, equal_to([JdbcReadTestRow(i) for i in range(ROW_COUNT)]))
result,
equal_to([
JdbcReadTestRow(i, all_timestamps[i]) for i in range(ROW_COUNT)
]))

# Creating a container with testcontainers sometimes raises ReadTimeout
# error. In java there are 2 retries set by default.
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@
java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP

micros_instant = LogicalTypes.Enum.MICROS_INSTANT
millis_instant = LogicalTypes.Enum.MILLIS_INSTANT
python_callable = LogicalTypes.Enum.PYTHON_CALLABLE
52 changes: 50 additions & 2 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
bytes <-----> BYTES
ByteString ------> BYTES
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
Timestamp <------ LogicalType(urn="beam:logical_type:millis_instant:v1")
Mapping <-----> MapType
Sequence <-----> ArrayType
NamedTuple <-----> RowType
Expand Down Expand Up @@ -571,13 +572,13 @@ def argument(self):
"""Return the argument for this instance of the LogicalType."""
raise NotImplementedError()

def to_representation_type(value):
def to_representation_type(self, value):
# type: (LanguageT) -> RepresentationT

"""Convert an instance of LanguageT to RepresentationT."""
raise NotImplementedError()

def to_language_type(value):
def to_language_type(self, value):
# type: (RepresentationT) -> LanguageT

"""Convert an instance of RepresentationT to LanguageT."""
Expand All @@ -587,6 +588,7 @@ def to_language_type(value):
def register_logical_type(cls, logical_type_cls):
"""Register an implementation of LogicalType."""
cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls)
return logical_type_cls

@classmethod
def from_typing(cls, typ):
Expand Down Expand Up @@ -655,9 +657,54 @@ def _from_typing(cls, typ):
('micros', np.int64)])


@LogicalType.register_logical_type
class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]):
"""Millisecond-precision instant logical type handles values consistent with
that encoded by ``InstantCoder`` in the Java SDK.
This class handles :class:`apache_beam.utils.timestamp.Timestamp` language
type as :class:`MicrosInstant`, but it only provides millisecond precision,
because it is aimed to handle data encoded by Java sdk's InstantCoder which
has same precision level.
Timestamp is handled by `MicrosInstant` by default. In some scenario, such as
read from cross-language transform with rows containing InstantCoder encoded
timestamps, one may need to override the mapping of Timetamp to MillisInstant.
To do this, re-register this class with
:func:`~LogicalType.register_logical_type`.
"""
@classmethod
def representation_type(cls):
# type: () -> type
return np.int64

@classmethod
def urn(cls):
return common_urns.millis_instant.urn

@classmethod
def language_type(cls):
return Timestamp

def to_language_type(self, value):
# type: (np.int64) -> Timestamp

# value shifted as in apache_beams.coders.coder_impl.TimestampCoderImpl
if value < 0:
millis = int(value) + (1 << 63)
else:
millis = int(value) - (1 << 63)

return Timestamp(micros=millis * 1000)


# Make sure MicrosInstant is registered after MillisInstant so that it
# overwrites the mapping of Timestamp language type representation choice and
# thus does not lose microsecond precision inside python sdk.
@LogicalType.register_logical_type
class MicrosInstant(NoArgumentLogicalType[Timestamp,
MicrosInstantRepresentation]):
"""Microsecond-precision instant logical type that handles ``Timestamp``."""
@classmethod
def urn(cls):
return common_urns.micros_instant.urn
Expand All @@ -683,6 +730,7 @@ def to_language_type(self, value):

@LogicalType.register_logical_type
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
"""A logical type for PythonCallableSource objects."""
@classmethod
def urn(cls):
return common_urns.python_callable.urn
Expand Down

0 comments on commit 45cf3c4

Please sign in to comment.