diff --git a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml index 994235440d80..2f3cfa560270 100644 --- a/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/ml/taxi_fare/streaming_taxifare_prediction.yaml @@ -53,8 +53,17 @@ pipeline: timestamp: callable: | from datetime import datetime, timezone + import sys + import re + def fn(row): - return datetime.fromisoformat(row.timestamp).astimezone(timezone.utc) + ts_str = row.timestamp + # For Python < 3.11, datetime.fromisoformat requires + # microseconds to be padded to 6 digits. + if sys.version_info < (3, 11): + _PAD_MICROS = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD_MICROS.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).astimezone(timezone.utc) # Assign windows to each element of the unbounded PCollection. - type: WindowInto @@ -178,28 +187,63 @@ pipeline: pickup_datetime_year: callable: | from datetime import datetime + import sys + import re + def fn(row): - return datetime.fromisoformat(row.pickup_datetime).year + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).year pickup_datetime_month: callable: | from datetime import datetime + import sys + import re + def fn(row): - return datetime.fromisoformat(row.pickup_datetime).month + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).month pickup_datetime_day: callable: | from datetime import datetime + import sys + import re + def fn(row): - return datetime.fromisoformat(row.pickup_datetime).day + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).day pickup_datetime_weekday: callable: | from datetime import datetime + import sys + import re + def fn(row): - return datetime.fromisoformat(row.pickup_datetime).weekday() + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).weekday() pickup_datetime_hour: callable: | from datetime import datetime + import sys + import re + def fn(row): - return datetime.fromisoformat(row.pickup_datetime).hour + ts_str = row.pickup_datetime + if sys.version_info < (3, 11): + _PAD = re.compile(r'\.(\d{1,5})([+-]\d{2}:\d{2}|Z)$') + ts_str = _PAD.sub(lambda m: '.' + m.group(1).ljust(6, '0') + m.group(2), ts_str) + return datetime.fromisoformat(ts_str).hour # With VertexAIModelHandlerJSON model handler, # RunInference transform performs remote inferences by