Skip to content

Commit

Permalink
feat: Added possibility to use datetime nanoseconds precision by `pan…
Browse files Browse the repository at this point in the history
…das.Timestamp` (#141)
  • Loading branch information
bednar authored Aug 11, 2020
1 parent 7d5f2c2 commit bfa0ac4
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 40 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

### Features
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`

## 1.9.0 [2020-07-17]

Expand Down
68 changes: 67 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,76 @@ The following forward compatible APIs are available:

For detail info see `InfluxDB 1.8 example <examples/influxdb_18_example.py>`_.

Nanosecond precision
^^^^^^^^^^^^^^^^^^^^

The Python's `datetime <https://docs.python.org/3/library/datetime.html>`_ doesn't support precision with nanoseconds
so the library during writes and queries ignores everything after microseconds.

If you would like to use ``datetime`` with nanosecond precision you should use
`pandas.Timestamp <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html#pandas.Timestamp>`_
that is replacement for python ``datetime.datetime`` object and also you should set a proper ``DateTimeHelper`` to the client.

* sources - `nanosecond_precision.py <https://github.com/influxdata/influxdb-client-python/blob/master/examples/nanosecond_precision.py>`_

.. code-block:: python
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS
"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils
date_utils.date_helper = PandasDateTimeHelper()
"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')
print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()
write_api.write(bucket="my-bucket", record=point)
"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)
for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')
"""
Close client
"""
client.__del__()
Local tests
-----------

.. code-block:: python
.. code-block:: console
# start/restart InfluxDB2 on local machine using docker
./scripts/influxdb-restart.sh
Expand Down
50 changes: 50 additions & 0 deletions examples/nanosecond_precision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS

"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils

date_utils.date_helper = PandasDateTimeHelper()

"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

"""
Prepare data
"""

point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')

print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()

write_api.write(bucket="my-bucket", record=point)

"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)

for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')

"""
Close client
"""
client.__del__()
18 changes: 0 additions & 18 deletions influxdb_client/client/date_utils.py

This file was deleted.

7 changes: 2 additions & 5 deletions influxdb_client/client/flux_csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from urllib3 import HTTPResponse

from influxdb_client.client.date_utils import get_date_parse_function
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.flux_table import FluxTable, FluxColumn, FluxRecord


Expand Down Expand Up @@ -208,10 +208,7 @@ def _to_value(self, str_val, column):
return base64.b64decode(str_val)

if "dateTime:RFC3339" == column.data_type or "dateTime:RFC3339Nano" == column.data_type:
# todo nanosecods precision
# return str_val
return get_date_parse_function()(str_val)
# return timestamp_parser(str_val)
return get_date_helper().parse_date(str_val)

if "duration" == column.data_type:
# todo better type ?
Expand Down
1 change: 1 addition & 0 deletions influxdb_client/client/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Utils package."""
49 changes: 49 additions & 0 deletions influxdb_client/client/util/date_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""Utils to get right Date parsing function."""

from dateutil import parser

date_helper = None


class DateHelper:
"""DateHelper to groups different implementations of date operations."""

def parse_date(self, date_string: str):
"""
Parse string into Date or Timestamp.
:return: Returns a :class:`datetime.datetime` object or compliant implementation
like :class:`class 'pandas._libs.tslibs.timestamps.Timestamp`
"""
pass

def to_nanoseconds(self, delta):
"""
Get number of nanoseconds in timedelta.
Solution comes from v1 client. Thx.
https://github.com/influxdata/influxdb-python/pull/811
"""
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
nanoseconds_in_seconds = delta.seconds * 10 ** 9
nanoseconds_in_micros = delta.microseconds * 10 ** 3

return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros


def get_date_helper() -> DateHelper:
"""
Return DateHelper with proper implementation.
If there is a 'ciso8601' than use 'ciso8601.parse_datetime' else use 'dateutil.parse'.
"""
global date_helper
if date_helper is None:
date_helper = DateHelper()
try:
import ciso8601
date_helper.parse_date = ciso8601.parse_datetime
except ModuleNotFoundError:
date_helper.parse_date = parser.parse

return date_helper
15 changes: 15 additions & 0 deletions influxdb_client/client/util/date_utils_pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Pandas date utils."""
from influxdb_client.client.util.date_utils import DateHelper
from influxdb_client.extras import pd


class PandasDateTimeHelper(DateHelper):
"""DateHelper that use Pandas library with nanosecond precision."""

def parse_date(self, date_string: str):
"""Parse date string into `class 'pandas._libs.tslibs.timestamps.Timestamp`."""
return pd.to_datetime(date_string)

def to_nanoseconds(self, delta):
"""Get number of nanoseconds with nanos precision."""
return super().to_nanoseconds(delta) + (delta.nanoseconds if hasattr(delta, 'nanoseconds') else 0)
19 changes: 4 additions & 15 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pytz import UTC
from six import iteritems

from influxdb_client.client.date_utils import get_date_parse_function
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.domain.write_precision import WritePrecision

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
Expand Down Expand Up @@ -164,24 +164,13 @@ def _escape_string(value):
return str(value).translate(_ESCAPE_STRING)


def _to_nanoseconds(delta):
"""
Solution comes from v1 client. Thx.
https://github.com/influxdata/influxdb-python/pull/811
"""
nanoseconds_in_days = delta.days * 86400 * 10 ** 9
nanoseconds_in_seconds = delta.seconds * 10 ** 9
nanoseconds_in_micros = delta.microseconds * 10 ** 3
return nanoseconds_in_days + nanoseconds_in_seconds + nanoseconds_in_micros


def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
date_helper = get_date_helper()
if isinstance(timestamp, Integral):
return timestamp # assume precision is correct if timestamp is int

if isinstance(timestamp, str):
timestamp = get_date_parse_function()(timestamp)
timestamp = date_helper.parse_date(timestamp)

if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):

Expand All @@ -192,7 +181,7 @@ def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
timestamp = timestamp.astimezone(UTC)
timestamp = timestamp - EPOCH

ns = _to_nanoseconds(timestamp)
ns = date_helper.to_nanoseconds(timestamp)

if precision is None or precision == WritePrecision.NS:
return ns
Expand Down
35 changes: 35 additions & 0 deletions tests/test_PandasDateTimeHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import unittest
from datetime import datetime, timedelta

from pytz import UTC

from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper


class PandasDateTimeHelperTest(unittest.TestCase):

def setUp(self) -> None:
self.helper = PandasDateTimeHelper()

def test_parse_date(self):
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')

self.assertEqual(date.year, 2020)
self.assertEqual(date.month, 8)
self.assertEqual(date.day, 7)
self.assertEqual(date.hour, 6)
self.assertEqual(date.minute, 21)
self.assertEqual(date.second, 57)
self.assertEqual(date.microsecond, 331249)
self.assertEqual(date.nanosecond, 158)

def test_to_nanoseconds(self):
date = self.helper.parse_date('2020-08-07T06:21:57.331249158Z')
nanoseconds = self.helper.to_nanoseconds(date - UTC.localize(datetime.utcfromtimestamp(0)))

self.assertEqual(nanoseconds, 1596781317331249158)

def test_to_nanoseconds_buildin_timedelta(self):
nanoseconds = self.helper.to_nanoseconds(timedelta(days=1))

self.assertEqual(nanoseconds, 86400000000000)
36 changes: 36 additions & 0 deletions tests/test_WriteApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,42 @@ def test_check_write_permission_by_empty_data(self):

client.__del__()

def test_write_query_data_nanoseconds(self):

from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
import influxdb_client.client.util.date_utils as date_utils

date_utils.date_helper = PandasDateTimeHelper()

bucket = self.create_test_bucket()

point = Point("h2o_feet") \
.field("water_level", 155) \
.tag("location", "creek level")\
.time('1996-02-25T21:20:00.001001231Z')

self.write_client.write(bucket.name, self.org, [point])

flux_result = self.client.query_api().query(
f'from(bucket:"{bucket.name}") |> range(start: 1970-01-01T00:00:00.000000001Z)')
self.assertEqual(1, len(flux_result))

record = flux_result[0].records[0]

self.assertEqual(self.id_tag, record["id"])
self.assertEqual(record["_value"], 155)
self.assertEqual(record["location"], "creek level")
self.assertEqual(record["_time"].year, 1996)
self.assertEqual(record["_time"].month, 2)
self.assertEqual(record["_time"].day, 25)
self.assertEqual(record["_time"].hour, 21)
self.assertEqual(record["_time"].minute, 20)
self.assertEqual(record["_time"].second, 00)
self.assertEqual(record["_time"].microsecond, 1001)
self.assertEqual(record["_time"].nanosecond, 231)

date_utils.date_helper = None


class AsynchronousWriteTest(BaseTest):

Expand Down

0 comments on commit bfa0ac4

Please sign in to comment.