Skip to content

Commit

Permalink
Fix logical type with same language type gets completely hidden intro…
Browse files Browse the repository at this point in the history
…duced in #22679
  • Loading branch information
Abacn committed Aug 26, 2022
1 parent e40ece1 commit b0484e7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand Down Expand Up @@ -192,6 +194,10 @@ def test_xlang_jdbc_read(self, database):
self.engine.execute(
"INSERT INTO {} VALUES({},'{}')".format(table_name, i, strtime))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

with TestPipeline() as p:
p.not_use_test_runner_api = True
result = (
Expand Down
15 changes: 12 additions & 3 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ def to_language_type(self, value):
def register_logical_type(cls, logical_type_cls):
"""Register an implementation of LogicalType."""
cls._known_logical_types.add(logical_type_cls.urn(), logical_type_cls)
return logical_type_cls

@classmethod
def from_typing(cls, typ):
Expand Down Expand Up @@ -661,9 +662,16 @@ class MillisInstant(NoArgumentLogicalType[Timestamp, np.int64]):
"""Millisecond-precision instant logical type handles values consistent with
that encoded by ``InstantCoder`` in the Java SDK.
This class handles Timestamp language type as :class:`MicrosInstant`, but it
only provides millisecond precision, because it is aimed to handle data
encoded by Java sdk's InstantCoder which has same precision level.
This class handles :class:`apache_beam.utils.timestamp.Timestamp` language
type as :class:`MicrosInstant`, but it only provides millisecond precision,
because it is aimed to handle data encoded by Java sdk's InstantCoder which
has same precision level.
Timestamp is handled by `MicrosInstant` by default. In some scenario, such as
read from cross-language transform with rows containing InstantCoder encoded
timestamps, one may need to override the mapping of Timetamp to MillisInstant.
To do this, re-register this class with
:func:`~LogicalType.register_logical_type`.
"""
@classmethod
def representation_type(cls):
Expand Down Expand Up @@ -722,6 +730,7 @@ def to_language_type(self, value):

@LogicalType.register_logical_type
class PythonCallable(NoArgumentLogicalType[PythonCallableWithSource, str]):
"""A logical type for PythonCallableSource objects."""
@classmethod
def urn(cls):
return common_urns.python_callable.urn
Expand Down

0 comments on commit b0484e7

Please sign in to comment.