Skip to content

Commit

Permalink
#2: Implemented retry_interval
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Aug 8, 2019
1 parent 60c9fc1 commit 9671c4d
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 23 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m
| **batch_size** | the number of data point to collect in batch | `1000` |
| **flush_interval** | the number of milliseconds before the batch is written | `1000` |
| **jitter_interval** | the number of milliseconds to increase the batch flush interval by a random amount | `0` |
| **retry_interval** | the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. | `1000` |

##### write_type
* `batching` - data are writes in batches defined by `batch_size`, `flush_interval`, ...
Expand Down
27 changes: 17 additions & 10 deletions influxdb2/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import rx
from rx import operators as ops, Observable
from rx.core import GroupedObservable
from rx.scheduler import NewThreadScheduler
from rx.scheduler import ThreadPoolScheduler
from rx.subject import Subject

from influxdb2 import WritePrecision
from influxdb2.client.abstract_client import AbstractClient
from influxdb2.client.write.point import Point
from influxdb2.rest import ApiException

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +28,7 @@ class WriteType(Enum):
class WriteOptions(object):

def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=0,
retry_interval=None, write_scheduler=NewThreadScheduler()) -> None:
retry_interval=1_000, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
self.write_type = write_type
self.batch_size = batch_size
self.flush_interval = flush_interval
Expand Down Expand Up @@ -100,10 +101,6 @@ def _group_to_batch(group: GroupedObservable):
ops.map(_create_batch(group)))


def _retry_handler(exception, source, data):
return rx.just(_BatchResponse(exception=exception, data=data))


def _window_to_group(value):
return value.pipe(
ops.to_iterable(),
Expand All @@ -124,7 +121,7 @@ def __init__(self, service, write_options=WriteOptions()) -> None:
.pipe(ops.window_with_time_or_count(count=write_options.batch_size,
timespan=timedelta(milliseconds=write_options.flush_interval)),
ops.flat_map(lambda v: _window_to_group(v)),
ops.map(mapper=lambda x: self._retryable(x)),
ops.map(mapper=lambda x: self._retryable(data=x, delay=self._jitter_delay())),
ops.merge_all()) \
.subscribe(self._on_next, self._on_error)
else:
Expand Down Expand Up @@ -207,14 +204,24 @@ def _post_write(self, _async_req, bucket, org, body, precision):
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
async_req=_async_req)

def _retryable(self, data: str):
def _retryable(self, data: str, delay: timedelta):

return rx.of(data).pipe(
ops.delay(duetime=self._jitter_delay()),
ops.delay(duetime=delay, scheduler=self._write_options.write_scheduler),
ops.map(lambda x: self._http(x)),
ops.catch(handler=lambda exception, source: _retry_handler(exception, source, data)),
ops.catch(handler=lambda exception, source: self._retry_handler(exception, source, data)),
)

def _retry_handler(self, exception, source, data):

if isinstance(exception, ApiException):

if exception.status == 429 or exception.status == 503:
_delay = self._jitter_delay() + timedelta(milliseconds=self._write_options.retry_interval)
return self._retryable(data, delay=_delay)

return rx.just(_BatchResponse(exception=exception, data=data))

def _jitter_delay(self):
return timedelta(milliseconds=random() * self._write_options.jitter_interval)

Expand Down
38 changes: 26 additions & 12 deletions influxdb2_test/rx_playground.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,23 @@ def __init__(self, data, exception=None):
pass

def __str__(self) -> str:
return '_Notification[status:\'{}\', \'{}\']'\
return '_Notification[status:\'{}\', \'{}\']' \
.format("failed" if self.exception else "success", self.data)


class _RxWriter(object):

success_count = 0
failed_count = 0
raise_retry_exception = 0

def __init__(self) -> None:
self._subject = Subject()
obs = self._subject.pipe(ops.observe_on(ThreadPoolScheduler(max_workers=1)))
self._scheduler = ThreadPoolScheduler(max_workers=1)
obs = self._subject.pipe(ops.observe_on(self._scheduler))
self._disposable = obs \
.pipe(ops.window_with_time_or_count(count=5, timespan=datetime.timedelta(milliseconds=10_000)),
ops.flat_map(lambda x: self._window_to_group(x)),
ops.map(mapper=lambda x: self._retryable(x)),
ops.map(mapper=lambda x: self._retryable(data=x, delay=self._jitter_delay(jitter_interval=1000))),
ops.merge_all()) \
.subscribe(self._result)
pass
Expand All @@ -73,12 +74,12 @@ def _window_to_group(self, value):
ops.group_by(_group_by), ops.map(_group_to_batch), ops.merge_all())),
ops.merge_all())

def _retryable(self, data: str):
def _retryable(self, data: str, delay: datetime.timedelta):

return rx.of(data).pipe(
ops.delay(duetime=self._jitter_delay(jitter_interval=1000)),
ops.delay(duetime=delay, scheduler=self._scheduler),
ops.map(lambda x: self._http(x)),
ops.catch(handler=lambda exception, source: _retry_handler(exception, source, data)),
ops.catch(handler=lambda exception, source: self._retry_handler(exception, source, data)),
)

def _http(self, data: str):
Expand All @@ -87,6 +88,15 @@ def _http(self, data: str):
raise Exception('unexpected token: {}'.format(data))
pass

if "alpha" in data:
if self.raise_retry_exception < 2:
self.raise_retry_exception += 1
print('server is temporarily unavailable to accept writes[{}]: {}'.format(current_thread().name, data))
raise Exception('server is temporarily unavailable to accept writes: {}'.format(data))
else:
print("server is OK: {}".format(datetime.datetime.now()))
pass

print("http[" + current_thread().name + "]: " + data)
return _Notification(data=data)

Expand All @@ -111,12 +121,16 @@ def _jitter_delay(self, jitter_interval=0):
print('jitter: {}'.format(_jitter))
return _jitter

def _retry_handler(self, exception, source, data):
print('retry_handler: {}, source: {}'.format(exception, source))

def _retry_handler(exception, source, data):
print('retry_handler: {}, source: {}'.format(exception, source))
notification = _Notification(exception=exception, data=data)
if "server is temporarily" in str(exception):
print("RETRY!!!: {}".format(datetime.datetime.now()))
return self._retryable(data, delay=datetime.timedelta(seconds=2))

return rx.just(notification)
notification = _Notification(exception=exception, data=data)

return rx.just(notification)


def _create_batch(group: GroupedObservable):
Expand Down Expand Up @@ -159,7 +173,7 @@ def _group_to_batch(group: GroupedObservable):
rxWriter.write("balloon")

print("\n== finish writing ==\n")
time.sleep(2)
time.sleep(5)

print("\n== __del__ ==\n")
rxWriter.__del__()
Expand Down
35 changes: 34 additions & 1 deletion influxdb2_test/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def setUp(self) -> None:
header_value="Token my-token")

self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2, flush_interval=5_000))
write_options=WriteOptions(batch_size=2, flush_interval=5_000,
retry_interval=3_000))

def tearDown(self) -> None:
pass
Expand Down Expand Up @@ -184,6 +185,38 @@ def test_jitter_interval(self):
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
httpretty.httpretty.latest_requests[1].parsed_body)

def test_retry_interval(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=429)
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=503)

self._write_client.write("my-bucket", "my-org",
["h2o_feet,location=coyote_creek level\\ water_level=1.0 1",
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"])

time.sleep(1)
self.assertEqual(1, len(httpretty.httpretty.latest_requests))

time.sleep(3)

self.assertEqual(2, len(httpretty.httpretty.latest_requests))

time.sleep(3)

self.assertEqual(3, len(httpretty.httpretty.latest_requests))

self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
httpretty.httpretty.latest_requests[0].parsed_body)
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
httpretty.httpretty.latest_requests[1].parsed_body)
self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=1.0 1\n"
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2",
httpretty.httpretty.latest_requests[2].parsed_body)

pass

def test_recover_from_error(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=400)
Expand Down

0 comments on commit 9671c4d

Please sign in to comment.