Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add supports for write structured data - NamedTuple, Data Classes #330

Merged
merged 9 commits into from
Oct 12, 2021
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
pydocstyle --count influxdb_client
check-examples:
docker:
- image: *default-python
- image: "cimg/python:3.8"
environment:
PIPENV_VENV_IN_PROJECT: true
- image: *default-influxdb
Expand All @@ -123,6 +123,7 @@ jobs:
export PYTHONPATH="$PWD"
python examples/monitoring_and_alerting.py
python examples/buckets_management.py
python examples/write_structured_data.py
check-sphinx:
docker:
- image: *default-python
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.22.0 [unreleased]

### Features
1. [#330](https://github.com/influxdata/influxdb-client-python/pull/330): Add supports for write structured data - `NamedTuple`, `Data Classes`

### Documentation
1. [#331](https://github.com/influxdata/influxdb-client-python/pull/331): Add [Migration Guide](MIGRATION_GUIDE.rst)

Expand Down
13 changes: 8 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,14 @@ The data could be written as

1. ``string`` or ``bytes`` that is formatted as a InfluxDB's line protocol
2. `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16>`__ structure
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time``
4. List of above items
5. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item
3. Dictionary style mapping with keys: ``measurement``, ``tags``, ``fields`` and ``time`` or custom structure
4. `NamedTuple <https://docs.python.org/3/library/collections.html#collections.namedtuple>`_
5. `Data Classes <https://docs.python.org/3/library/dataclasses.html>`_
6. `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_
7. List of above items
8. A ``batching`` type of write also supports an ``Observable`` that produce one of an above item

You can find write examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#writes>`__.

Batching
""""""""
Expand Down Expand Up @@ -532,7 +535,7 @@ In a `init <https://docs.python.org/3/library/configparser.html>`_ configuration
customer = California Miner
data_center = ${env.data_center}

You could also use a `TOML <https://toml.io/en/>`_ format for the configuration file.
You can also use a `TOML <https://toml.io/en/>`_ format for the configuration file.

Via Environment Properties
__________________________
Expand Down Expand Up @@ -1048,7 +1051,7 @@ The second example shows how to use client capabilities to realtime visualizatio
Other examples
""""""""""""""

You could find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`_.
You can find all examples at GitHub: `influxdb-client-python/examples <https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples>`__.

.. marker-examples-end

Expand Down
6 changes: 6 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ WriteApi
.. autoclass:: influxdb_client.WriteApi
:members:

.. autoclass:: influxdb_client.client.write.point.Point
:members:

.. autoclass:: influxdb_client.domain.write_precision.WritePrecision
:members:

BucketsApi
""""""""""
.. autoclass:: influxdb_client.BucketsApi
Expand Down
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [ingest_large_dataframe.py](ingest_large_dataframe.py) - How to ingest large DataFrame
- [iot_sensor.py](iot_sensor.py) - How to write sensor data every minute by [RxPY](https://rxpy.readthedocs.io/en/latest/)
- [import_data_set_sync_batching.py](import_data_set_sync_batching.py) - How to use [RxPY](https://rxpy.readthedocs.io/en/latest/) to prepare batches for synchronous write into InfluxDB
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)

## Queries
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
Expand Down
2 changes: 1 addition & 1 deletion examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
print("val count: ", val_count)

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
print (codecs.decode(response.data))
print(codecs.decode(response.data))
66 changes: 66 additions & 0 deletions examples/write_structured_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS


class Sensor(namedtuple('Sensor', ['name', 'location', 'version', 'pressure', 'temperature', 'timestamp'])):
"""
Named structure - Sensor
"""
pass


@dataclass
class Car:
"""
DataClass structure - Car
"""
engine: str
type: str
speed: float


"""
Initialize client
"""
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)

"""
Sensor "current" state
"""
sensor = Sensor(name="sensor_pt859",
location="warehouse_125",
version="2021.06.05.5874",
pressure=125,
temperature=10,
timestamp=datetime.utcnow())
print(sensor)

"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=sensor,
record_measurement_key="name",
record_time_key="timestamp",
record_tag_keys=["location", "version"],
record_field_keys=["pressure", "temperature"])

"""
Car "current" speed
"""
car = Car('12V-BT', 'sport-cars', 125.25)
print(car)

"""
Synchronous write
"""
write_api.write(bucket="my-bucket",
record=car,
record_measurement_name="performance",
record_tag_keys=["engine", "type"],
record_field_keys=["speed"])
82 changes: 74 additions & 8 deletions influxdb_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,82 @@ def measurement(measurement):
return p

@staticmethod
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION):
"""Initialize point from 'dict' structure."""
point = Point(dictionary['measurement'])
if 'tags' in dictionary:
def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs):
"""
Initialize point from 'dict' structure.

The expected dict structure is:
- measurement
- tags
- fields
- time

Example:
.. code-block:: python

# Use default dictionary structure
dict_structure = {
"measurement": "h2o_feet",
"tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0},
"time": 1
}
point = Point.from_dict(dict_structure, WritePrecision.NS)

Example:
.. code-block:: python

# Use custom dictionary structure
dictionary = {
"name": "sensor_pt859",
"location": "warehouse_125",
"version": "2021.06.05.5874",
"pressure": 125,
"temperature": 10,
"created": 1632208639,
}
point = Point.from_dict(dictionary,
write_precision=WritePrecision.S,
record_measurement_key="name",
record_time_key="created",
record_tag_keys=["location", "version"],
record_field_keys=["pressure", "temperature"])

:param dictionary: dictionary for serialize into data Point
:param write_precision: sets the precision for the supplied time values
:key record_measurement_key: key of dictionary with specified measurement
:key record_measurement_name: static measurement name for data Point
:key record_time_key: key of dictionary with specified timestamp
:key record_tag_keys: list of dictionary keys to use as a tag
:key record_field_keys: list of dictionary keys to use as a field
:return: new data point
"""
measurement_ = kwargs.get('record_measurement_name', None)
if measurement_ is None:
measurement_ = dictionary[kwargs.get('record_measurement_key', 'measurement')]
point = Point(measurement_)

record_tag_keys = kwargs.get('record_tag_keys', None)
if record_tag_keys is not None:
for tag_key in record_tag_keys:
if tag_key in dictionary:
point.tag(tag_key, dictionary[tag_key])
elif 'tags' in dictionary:
for tag_key, tag_value in dictionary['tags'].items():
point.tag(tag_key, tag_value)
for field_key, field_value in dictionary['fields'].items():
point.field(field_key, field_value)
if 'time' in dictionary:
point.time(dictionary['time'], write_precision=write_precision)

record_field_keys = kwargs.get('record_field_keys', None)
if record_field_keys is not None:
for field_key in record_field_keys:
if field_key in dictionary:
point.field(field_key, dictionary[field_key])
else:
for field_key, field_value in dictionary['fields'].items():
point.field(field_key, field_value)

record_time_key = kwargs.get('record_time_key', 'time')
if record_time_key in dictionary:
point.time(dictionary[record_time_key], write_precision=write_precision)
return point

def __init__(self, measurement_name):
Expand Down
82 changes: 72 additions & 10 deletions influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from enum import Enum
from random import random
from time import sleep
from typing import Union, Any, Iterable
from typing import Union, Any, Iterable, NamedTuple

import rx
from rx import operators as ops, Observable
Expand All @@ -24,6 +24,15 @@
logger = logging.getLogger(__name__)


try:
import dataclasses
from dataclasses import dataclass

_HAS_DATACLASS = True
except ModuleNotFoundError:
_HAS_DATACLASS = False


class WriteType(Enum):
"""Configuration which type of writes will client use."""

Expand Down Expand Up @@ -173,7 +182,20 @@ def _body_reduce(batch_items):


class WriteApi:
"""Implementation for '/api/v2/write' endpoint."""
"""
Implementation for '/api/v2/write' endpoint.

Example:
.. code-block:: python

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS


# Initialize SYNCHRONOUS instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
"""

def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions(),
point_settings: PointSettings = PointSettings()) -> None:
Expand Down Expand Up @@ -217,21 +239,51 @@ def __init__(self, influxdb_client, write_options: WriteOptions = WriteOptions()
def write(self, bucket: str, org: str = None,
record: Union[
str, Iterable['str'], Point, Iterable['Point'], dict, Iterable['dict'], bytes, Iterable['bytes'],
Observable] = None,
Observable, NamedTuple, Iterable['NamedTuple'], 'dataclass', Iterable['dataclass']
] = None,
write_precision: WritePrecision = DEFAULT_WRITE_PRECISION, **kwargs) -> Any:
"""
Write time-series data into InfluxDB.

:param str bucket: specifies the destination bucket for writes (required)
:param str, Organization org: specifies the destination organization for writes;
take the ID, Name or Organization;
if it's not specified then is used default from client.org.
:param str bucket: specifies the destination bucket for writes (required)
:param WritePrecision write_precision: specifies the precision for the unix timestamps within
the body line-protocol. The precision specified on a Point has precedes
and is use for write.
:param record: Points, line protocol, Pandas DataFrame, RxPY Observable to write
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame
:key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields
:param record: Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or
RxPY Observable to write
:key data_frame_measurement_name: name of measurement for writing Pandas DataFrame - ``DataFrame``
:key data_frame_tag_columns: list of DataFrame columns which are tags,
rest columns will be fields - ``DataFrame``
:key record_measurement_key: key of record with specified measurement -
``dictionary``, ``NamedTuple``, ``dataclass``
:key record_measurement_name: static measurement name - ``dictionary``, ``NamedTuple``, ``dataclass``
:key record_time_key: key of record with specified timestamp - ``dictionary``, ``NamedTuple``, ``dataclass``
:key record_tag_keys: list of record keys to use as a tag - ``dictionary``, ``NamedTuple``, ``dataclass``
:key record_field_keys: list of record keys to use as a field - ``dictionary``, ``NamedTuple``, ``dataclass``

Example:
.. code-block:: python

# Record as Line Protocol
write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1")

# Record as Dictionary
dictionary = {
"measurement": "h2o_feet",
"tags": {"location": "us-west"},
"fields": {"level": 125},
"time": 1
}
write_api.write("my-bucket", "my-org", dictionary)

# Record as Point
from influxdb_client import Point
point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1)
write_api.write("my-bucket", "my-org", point)

"""
org = get_org_query_param(org=org, client=self._influxdb_client)

Expand Down Expand Up @@ -309,12 +361,16 @@ def _serialize(self, record, write_precision, payload, **kwargs):
self._serialize(record.to_line_protocol(), record.write_precision, payload, **kwargs)

elif isinstance(record, dict):
self._serialize(Point.from_dict(record, write_precision=write_precision),
self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),
write_precision, payload, **kwargs)
elif 'DataFrame' in type(record).__name__:
serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs)
self._serialize(serializer.serialize(), write_precision, payload, **kwargs)

elif hasattr(record, "_asdict"):
# noinspection PyProtectedMember
self._serialize(record._asdict(), write_precision, payload, **kwargs)
elif _HAS_DATACLASS and dataclasses.is_dataclass(record):
self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs)
elif isinstance(record, Iterable):
for item in record:
self._serialize(item, write_precision, payload, **kwargs)
Expand All @@ -334,7 +390,7 @@ def _write_batching(self, bucket, org, data,
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)

elif isinstance(data, dict):
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision),
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),
precision, **kwargs)

elif 'DataFrame' in type(data).__name__:
Expand All @@ -344,6 +400,12 @@ def _write_batching(self, bucket, org, data,
self._write_batching(bucket, org,
serializer.serialize(chunk_idx),
precision, **kwargs)
elif hasattr(data, "_asdict"):
# noinspection PyProtectedMember
self._write_batching(bucket, org, data._asdict(), precision, **kwargs)

elif _HAS_DATACLASS and dataclasses.is_dataclass(data):
self._write_batching(bucket, org, dataclasses.asdict(data), precision, **kwargs)

elif isinstance(data, Iterable):
for item in data:
Expand Down
Loading