From 70276e6d2f947f9d8a73028629a259a9d4ab7b2a Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 7 Aug 2020 08:59:28 +0200 Subject: [PATCH 1/3] feat: prepare for customization of date operation --- influxdb_client/client/date_utils.py | 47 +++++++++++++++++++---- influxdb_client/client/flux_csv_parser.py | 7 +--- influxdb_client/client/write/point.py | 19 ++------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/influxdb_client/client/date_utils.py b/influxdb_client/client/date_utils.py index 7afb5287..1bea4b9f 100644 --- a/influxdb_client/client/date_utils.py +++ b/influxdb_client/client/date_utils.py @@ -2,17 +2,48 @@ from dateutil import parser -parse_function = None +date_helper = None -def get_date_parse_function(): - """If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'.""" - global parse_function - if parse_function is None: +class DateHelper: + """DateHelper to groups different implementations of date operations.""" + + def parse_date(self, date_string: str): + """ + Parse string into Date or Timestamp. + + :return: Returns a :class:`datetime.datetime` object or compliant implementation + like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp` + """ + pass + + def to_nanoseconds(self, delta): + """ + Get number of nanoseconds in timedelta. + + Solution comes from v1 client. Thx. + https://github.com/influxdata/influxdb-python/pull/811 + """ + nanoseconds_in_days = delta.days * 86400 * 10 ** 9 + nanoseconds_in_seconds = delta.seconds * 10 ** 9 + nanoseconds_in_micros = delta.microseconds * 10 ** 3 + + return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros + + +def get_date_helper() -> DateHelper: + """ + Return DateHelper with proper implementation. + + If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'. + """ + global date_helper + if date_helper is None: + date_helper = DateHelper() try: import ciso8601 - parse_function = ciso8601.parse_datetime + date_helper.parse_date = ciso8601.parse_datetime except ModuleNotFoundError: - parse_function = parser.parse + date_helper.parse_date = parser.parse - return parse_function + return date_helper diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 21595933..7995782d 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -9,7 +9,7 @@ from urllib3 import HTTPResponse -from influxdb_client.client.date_utils import get_date_parse_function +from influxdb_client.client.date_utils import get_date_helper from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord @@ -208,10 +208,7 @@ def _to_value(self, str_val, column): return base64.b64decode(str_val) if "dateTime:RFC3339" == column.data_type or "dateTime:RFC3339Nano" == column.data_type: - # todo nanosecods precision - # return str_val - return get_date_parse_function()(str_val) - # return timestamp_parser(str_val) + return get_date_helper().parse_date(str_val) if "duration" == column.data_type: # todo better type ? diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 1cb4b052..81443dd8 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -10,7 +10,7 @@ from pytz import UTC from six import iteritems -from influxdb_client.client.date_utils import get_date_parse_function +from influxdb_client.client.date_utils import get_date_helper from influxdb_client.domain.write_precision import WritePrecision EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) @@ -164,24 +164,13 @@ def _escape_string(value): return str(value).translate(_ESCAPE_STRING) -def _to_nanoseconds(delta): - """ - Solution comes from v1 client. Thx. - - https://github.com/influxdata/influxdb-python/pull/811 - """ - nanoseconds_in_days = delta.days * 86400 * 10 ** 9 - nanoseconds_in_seconds = delta.seconds * 10 ** 9 - nanoseconds_in_micros = delta.microseconds * 10 ** 3 - return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros - - def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): + date_helper = get_date_helper() if isinstance(timestamp, Integral): return timestamp # assume precision is correct if timestamp is int if isinstance(timestamp, str): - timestamp = get_date_parse_function()(timestamp) + timestamp = date_helper.parse_date(timestamp) if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime): @@ -192,7 +181,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION): timestamp = timestamp.astimezone(UTC) timestamp = timestamp - EPOCH - ns = _to_nanoseconds(timestamp) + ns = date_helper.to_nanoseconds(timestamp) if precision is None or precision == WritePrecision.NS: return ns From afae27d3280c49f3742f09511e2f43bae8805629 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 7 Aug 2020 10:09:14 +0200 Subject: [PATCH 2/3] feat: add possibility to customize parsing and serialization method --- influxdb_client/client/flux_csv_parser.py | 2 +- influxdb_client/client/util/__init__.py | 1 + .../client/{ => util}/date_utils.py | 0 .../client/util/date_utils_pandas.py | 15 ++++++++ influxdb_client/client/write/point.py | 2 +- tests/test_PandasDateTimeHelper.py | 35 ++++++++++++++++++ tests/test_WriteApi.py | 36 +++++++++++++++++++ 7 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 influxdb_client/client/util/__init__.py rename influxdb_client/client/{ => util}/date_utils.py (100%) create mode 100644 influxdb_client/client/util/date_utils_pandas.py create mode 100644 tests/test_PandasDateTimeHelper.py diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 7995782d..979fb70e 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -9,7 +9,7 @@ from urllib3 import HTTPResponse -from influxdb_client.client.date_utils import get_date_helper +from influxdb_client.client.util.date_utils import get_date_helper from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord diff --git a/influxdb_client/client/util/__init__.py b/influxdb_client/client/util/__init__.py new file mode 100644 index 00000000..f9a83206 --- /dev/null +++ b/influxdb_client/client/util/__init__.py @@ -0,0 +1 @@ +"""Utils package.""" diff --git a/influxdb_client/client/date_utils.py b/influxdb_client/client/util/date_utils.py similarity index 100% rename from influxdb_client/client/date_utils.py rename to influxdb_client/client/util/date_utils.py diff --git a/influxdb_client/client/util/date_utils_pandas.py b/influxdb_client/client/util/date_utils_pandas.py new file mode 100644 index 00000000..9a87c6ab --- /dev/null +++ b/influxdb_client/client/util/date_utils_pandas.py @@ -0,0 +1,15 @@ +"""Pandas date utils.""" +from influxdb_client.client.util.date_utils import DateHelper +from influxdb_client.extras import pd + + +class PandasDateTimeHelper(DateHelper): + """DateHelper that use Pandas library with nanosecond precision.""" + + def parse_date(self, date_string: str): + """Parse date string into `class 'pandas._libs.tslibs.timestamps.Timestamp`.""" + return pd.to_datetime(date_string) + + def to_nanoseconds(self, delta): + """Get number of nanoseconds with nanos precision.""" + return super().to_nanoseconds(delta) + (delta.nanoseconds if hasattr(delta, 'nanoseconds') else 0) diff --git a/influxdb_client/client/write/point.py b/influxdb_client/client/write/point.py index 81443dd8..a09099c2 100644 --- a/influxdb_client/client/write/point.py +++ b/influxdb_client/client/write/point.py @@ -10,7 +10,7 @@ from pytz import UTC from six import iteritems -from influxdb_client.client.date_utils import get_date_helper +from influxdb_client.client.util.date_utils import get_date_helper from influxdb_client.domain.write_precision import WritePrecision EPOCH = UTC.localize(datetime.utcfromtimestamp(0)) diff --git a/tests/test_PandasDateTimeHelper.py b/tests/test_PandasDateTimeHelper.py new file mode 100644 index 00000000..c77f7163 --- /dev/null +++ b/tests/test_PandasDateTimeHelper.py @@ -0,0 +1,35 @@ +import unittest +from datetime import datetime, timedelta + +from pytz import UTC + +from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper + + +class PandasDateTimeHelperTest(unittest.TestCase): + + def setUp(self) -> None: + self.helper = PandasDateTimeHelper() + + def test_parse_date(self): + date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z') + + self.assertEqual(date.year, 2020) + self.assertEqual(date.month, 8) + self.assertEqual(date.day, 7) + self.assertEqual(date.hour, 6) + self.assertEqual(date.minute, 21) + self.assertEqual(date.second, 57) + self.assertEqual(date.microsecond, 331249) + self.assertEqual(date.nanosecond, 158) + + def test_to_nanoseconds(self): + date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z') + nanoseconds = self.helper.to_nanoseconds(date - UTC.localize(datetime.utcfromtimestamp(0))) + + self.assertEqual(nanoseconds, 1596781317331249158) + + def test_to_nanoseconds_buildin_timedelta(self): + nanoseconds = self.helper.to_nanoseconds(timedelta(days=1)) + + self.assertEqual(nanoseconds, 86400000000000) diff --git a/tests/test_WriteApi.py b/tests/test_WriteApi.py index 4d0c5f17..d5c97b68 100644 --- a/tests/test_WriteApi.py +++ b/tests/test_WriteApi.py @@ -383,6 +383,42 @@ def test_check_write_permission_by_empty_data(self): client.__del__() + def test_write_query_data_nanoseconds(self): + + from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper + import influxdb_client.client.util.date_utils as date_utils + + date_utils.date_helper = PandasDateTimeHelper() + + bucket = self.create_test_bucket() + + point = Point("h2o_feet") \ + .field("water_level", 155) \ + .tag("location", "creek level")\ + .time('1996-02-25T21:20:00.001001231Z') + + self.write_client.write(bucket.name, self.org, [point]) + + flux_result = self.client.query_api().query( + f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)') + self.assertEqual(1, len(flux_result)) + + record = flux_result[0].records[0] + + self.assertEqual(self.id_tag, record["id"]) + self.assertEqual(record["_value"], 155) + self.assertEqual(record["location"], "creek level") + self.assertEqual(record["_time"].year, 1996) + self.assertEqual(record["_time"].month, 2) + self.assertEqual(record["_time"].day, 25) + self.assertEqual(record["_time"].hour, 21) + self.assertEqual(record["_time"].minute, 20) + self.assertEqual(record["_time"].second, 00) + self.assertEqual(record["_time"].microsecond, 1001) + self.assertEqual(record["_time"].nanosecond, 231) + + date_utils.date_helper = None + class AsynchronousWriteTest(BaseTest): From 55492c540891720949d366f8c59abaae4e004e27 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 7 Aug 2020 11:06:35 +0200 Subject: [PATCH 3/3] docs: updated CHANGELOG and README --- CHANGELOG.md | 3 +- README.rst | 68 +++++++++++++++++++++++++++++++- examples/nanosecond_precision.py | 50 +++++++++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) create mode 100644 examples/nanosecond_precision.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 29207c13..6d8b7860 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ### Features 1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate -1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties +1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties +1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp` ## 1.9.0 [2020-07-17] diff --git a/README.rst b/README.rst index 7f414500..738c23dc 100644 --- a/README.rst +++ b/README.rst @@ -899,10 +899,76 @@ The following forward compatible APIs are available: For detail info see `InfluxDB 1.8 example `_. +Nanosecond precision +^^^^^^^^^^^^^^^^^^^^ + +The Python's `datetime `_ doesn't support precision with nanoseconds +so the library during writes and queries ignores everything after microseconds. + +If you would like to use ``datetime`` with nanosecond precision you should use +`pandas.Timestamp `_ +that is replacement for python ``datetime.datetime`` object and also you should set a proper ``DateTimeHelper`` to the client. + +* sources - `nanosecond_precision.py `_ + +.. code-block:: python + + from influxdb_client import Point, InfluxDBClient + from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper + from influxdb_client.client.write_api import SYNCHRONOUS + + """ + Set PandasDate helper which supports nanoseconds. + """ + import influxdb_client.client.util.date_utils as date_utils + + date_utils.date_helper = PandasDateTimeHelper() + + """ + Prepare client. + """ + client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org") + + write_api = client.write_api(write_options=SYNCHRONOUS) + query_api = client.query_api() + + """ + Prepare data + """ + + point = Point("h2o_feet") \ + .field("water_level", 10) \ + .tag("location", "pacific") \ + .time('1996-02-25T21:20:00.001001231Z') + + print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}') + print() + + write_api.write(bucket="my-bucket", record=point) + + """ + Query: using Stream + """ + query = ''' + from(bucket:"my-bucket") + |> range(start: 0, stop: now()) + |> filter(fn: (r) => r._measurement == "h2o_feet") + ''' + records = query_api.query_stream(query) + + for record in records: + print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}') + + """ + Close client + """ + client.__del__() + + Local tests ----------- -.. code-block:: python +.. code-block:: console # start/restart InfluxDB2 on local machine using docker ./scripts/influxdb-restart.sh diff --git a/examples/nanosecond_precision.py b/examples/nanosecond_precision.py new file mode 100644 index 00000000..e242d282 --- /dev/null +++ b/examples/nanosecond_precision.py @@ -0,0 +1,50 @@ +from influxdb_client import Point, InfluxDBClient +from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper +from influxdb_client.client.write_api import SYNCHRONOUS + +""" +Set PandasDate helper which supports nanoseconds. +""" +import influxdb_client.client.util.date_utils as date_utils + +date_utils.date_helper = PandasDateTimeHelper() + +""" +Prepare client. +""" +client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org") + +write_api = client.write_api(write_options=SYNCHRONOUS) +query_api = client.query_api() + +""" +Prepare data +""" + +point = Point("h2o_feet") \ + .field("water_level", 10) \ + .tag("location", "pacific") \ + .time('1996-02-25T21:20:00.001001231Z') + +print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}') +print() + +write_api.write(bucket="my-bucket", record=point) + +""" +Query: using Stream +""" +query = ''' +from(bucket:"my-bucket") + |> range(start: 0, stop: now()) + |> filter(fn: (r) => r._measurement == "h2o_feet") +''' +records = query_api.query_stream(query) + +for record in records: + print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}') + +""" +Close client +""" +client.__del__()