From 32c6891b070f5ac31fb537c22f3ee45de80b5626 Mon Sep 17 00:00:00 2001 From: Khorbaladze A Date: Fri, 1 Aug 2025 20:20:46 +0400 Subject: [PATCH 1/2] fix value error affecting XVR tests --- .../streaming_taxifare_prediction.yaml | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml index 994235440d80..6c4c65ff425e 100644 --- a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml @@ -53,8 +53,13 @@ pipeline: timestamp: callable: | from datetime import datetime, timezone + import re + + _PAD_MICROS = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def fn(row): - return datetime.fromisoformat(row.timestamp).astimezone(timezone.utc) + ts = _PAD_MICROS.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), row.timestamp) + return datetime.fromisoformat(ts).astimezone(timezone.utc) # Assign windows to each element of the unbounded PCollection. - type: WindowInto @@ -178,28 +183,48 @@ pipeline: pickup_datetime_year: callable: | from datetime import datetime + import re + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def normalize(ts): + return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) def fn(row): - return datetime.fromisoformat(row.pickup_datetime).year + return datetime.fromisoformat(normalize(row.pickup_datetime)).year pickup_datetime_month: callable: | from datetime import datetime + import re + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def normalize(ts): + return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) def fn(row): - return datetime.fromisoformat(row.pickup_datetime).month + return datetime.fromisoformat(normalize(row.pickup_datetime)).month pickup_datetime_day: callable: | from datetime import datetime + import re + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def normalize(ts): + return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) def fn(row): - return datetime.fromisoformat(row.pickup_datetime).day + return datetime.fromisoformat(normalize(row.pickup_datetime)).day pickup_datetime_weekday: callable: | from datetime import datetime + import re + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def normalize(ts): + return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) def fn(row): - return datetime.fromisoformat(row.pickup_datetime).weekday() + return datetime.fromisoformat(normalize(row.pickup_datetime)).weekday() pickup_datetime_hour: callable: | from datetime import datetime + import re + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') + def normalize(ts): + return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) def fn(row): - return datetime.fromisoformat(row.pickup_datetime).hour + return datetime.fromisoformat(normalize(row.pickup_datetime)).hour # With VertexAIModelHandlerJSON model handler, # RunInference transform performs remote inferences by From 39cf1cad0df7fdd87d8e98e9748d01f1de5b17f5 Mon Sep 17 00:00:00 2001 From: Khorbaladze A Date: Sat, 2 Aug 2025 05:04:36 +0400 Subject: [PATCH 2/2] only apply fn(row) to python <3.11 --- .../streaming_taxifare_prediction.yaml | 67 ++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml index 6c4c65ff425e..2f3cfa560270 100644 --- a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml @@ -53,13 +53,17 @@ pipeline: timestamp: callable: | from datetime import datetime, timezone + import sys import re - _PAD_MICROS = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def fn(row): - ts = _PAD_MICROS.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), row.timestamp) - return datetime.fromisoformat(ts).astimezone(timezone.utc) + ts_str = row.timestamp + # For Python < 3.11, datetime.fromisoformat requires + # microseconds to be padded to 6 digits. + if sys.version_info < (3, 11): + _PAD_MICROS = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD_MICROS.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).astimezone(timezone.utc) # Assign windows to each element of the unbounded PCollection. - type: WindowInto @@ -183,48 +187,63 @@ pipeline: pickup_datetime_year: callable: | from datetime import datetime + import sys import re - _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def normalize(ts): - return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) + def fn(row): - return datetime.fromisoformat(normalize(row.pickup_datetime)).year + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).year pickup_datetime_month: callable: | from datetime import datetime + import sys import re - _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def normalize(ts): - return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) + def fn(row): - return datetime.fromisoformat(normalize(row.pickup_datetime)).month + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).month pickup_datetime_day: callable: | from datetime import datetime + import sys import re - _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def normalize(ts): - return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) + def fn(row): - return datetime.fromisoformat(normalize(row.pickup_datetime)).day + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).day pickup_datetime_weekday: callable: | from datetime import datetime + import sys import re - _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def normalize(ts): - return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) + def fn(row): - return datetime.fromisoformat(normalize(row.pickup_datetime)).weekday() + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).weekday() pickup_datetime_hour: callable: | from datetime import datetime + import sys import re - _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2})$') - def normalize(ts): - return _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts) + def fn(row): - return datetime.fromisoformat(normalize(row.pickup_datetime)).hour + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).hour # With VertexAIModelHandlerJSON model handler, # RunInference transform performs remote inferences by