Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retry mechanism parameter for LongWriteOperation #357

Merged
merged 19 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException;
import com.polidea.rxandroidble.internal.connection.ImmediateSerializedBatchAckStrategy;
import com.polidea.rxandroidble.internal.connection.NoRetryStrategy;
import com.polidea.rxandroidble.internal.util.ObservableUtil;

import java.util.HashMap;
Expand Down Expand Up @@ -283,6 +284,8 @@ public LongWriteOperationBuilder createNewLongWriteBuilder() {

private WriteOperationAckStrategy writeOperationAckStrategy = // default
new ImmediateSerializedBatchAckStrategy();
private WriteOperationRetryStrategy writeOperationRetryStrategy = // default
new NoRetryStrategy();

@Override
public LongWriteOperationBuilder setBytes(@NonNull byte[] bytes) {
Expand Down Expand Up @@ -316,6 +319,13 @@ public LongWriteOperationBuilder setMaxBatchSize(int maxBatchSize) {
return this;
}

@Override
public LongWriteOperationBuilder setWriteOperationRetryStrategy(
@NonNull WriteOperationRetryStrategy writeOperationRetryStrategy) {
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
return this;
}

@Override
public LongWriteOperationBuilder setWriteOperationAckStrategy(@NonNull WriteOperationAckStrategy writeOperationAckStrategy) {
this.writeOperationAckStrategy = writeOperationAckStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.polidea.rxandroidble.exceptions.BleCannotSetCharacteristicNotificationException;
import com.polidea.rxandroidble.exceptions.BleCharacteristicNotFoundException;
import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.exceptions.BleGattCannotStartException;
import com.polidea.rxandroidble.exceptions.BleGattException;
import com.polidea.rxandroidble.exceptions.BleGattOperationType;
Expand Down Expand Up @@ -131,6 +132,23 @@ interface LongWriteOperationBuilder {
*/
LongWriteOperationBuilder setMaxBatchSize(@IntRange(from = 1, to = GATT_MTU_MAXIMUM - GATT_WRITE_MTU_OVERHEAD) int maxBatchSize);

/**
* Setter for a retry strategy in case something goes wrong when writing data. If any {@link BleException} is raised,
* a {@link WriteOperationRetryStrategy.LongWriteFailure} object is emitted.
* {@link WriteOperationRetryStrategy.LongWriteFailure} contains both the {@link BleException} and the batch number
* for which the write request failed. The {@link WriteOperationRetryStrategy.LongWriteFailure} emitted by the
* writeOperationRetryStrategy will be used to retry the specified batch number write request.
* <br>
* If this is not specified - if batch write fails, the long write operation is stopped and whole operation is interrupted.
* <br>
* It is expected that the Observable returned from the writeOperationRetryStrategy will emit exactly the same events as the source,
* however you may delay them at your pace.
*
* @param writeOperationRetryStrategy the retry strategy
* @return the LongWriteOperationBuilder
*/
LongWriteOperationBuilder setWriteOperationRetryStrategy(@NonNull WriteOperationRetryStrategy writeOperationRetryStrategy);

/**
* Setter for a strategy used to mark batch write completed. Only after previous batch has finished, the next (if any left) can be
* written.
Expand Down Expand Up @@ -160,6 +178,45 @@ interface LongWriteOperationBuilder {
Observable<byte[]> build();
}

interface WriteOperationRetryStrategy extends Observable.Transformer<WriteOperationRetryStrategy.LongWriteFailure,
WriteOperationRetryStrategy.LongWriteFailure> {

class LongWriteFailure {

final int batchIndex;
final BleGattException cause;

/**
* Default constructor
*
* @param batchIndex the zero-based batch index on which the write request failed
* @param cause the failed cause of the write request
*/
public LongWriteFailure(int batchIndex, BleGattException cause) {
this.batchIndex = batchIndex;
this.cause = cause;
}

/**
* Get the batch index of the failed write request
*
* @return the zero-based batch index
*/
public int getBatchIndex() {
return batchIndex;
}

/**
* Get the failed cause of the write request
*
* @return a {@link BleGattException}
*/
public BleGattException getCause() {
return cause;
}
}
}

interface WriteOperationAckStrategy extends Observable.Transformer<Boolean, Boolean> {

}
Expand Down Expand Up @@ -442,7 +499,6 @@ Observable<byte[]> writeDescriptor(@NonNull UUID serviceUuid, @NonNull UUID char
*/
Observable<byte[]> writeDescriptor(@NonNull BluetoothGattDescriptor descriptor, @NonNull byte[] data);


/**
* Performs a GATT request connection priority operation, which requests a connection parameter
* update on the remote device. NOTE: peripheral may silently decline request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class LongWriteOperationBuilderImpl implements RxBleConnection.Long
private Observable<BluetoothGattCharacteristic> writtenCharacteristicObservable;
private PayloadSizeLimitProvider maxBatchSizeProvider;
private RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy();
private RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy = new NoRetryStrategy();

private byte[] bytes;

Expand Down Expand Up @@ -63,6 +64,13 @@ public RxBleConnection.LongWriteOperationBuilder setMaxBatchSize(final int maxBa
return this;
}

@Override
public RxBleConnection.LongWriteOperationBuilder setWriteOperationRetryStrategy(
@NonNull RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy) {
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
return this;
}

@Override
public RxBleConnection.LongWriteOperationBuilder setWriteOperationAckStrategy(
@NonNull RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy) {
Expand All @@ -87,7 +95,7 @@ public Observable<byte[]> build() {
public Observable<byte[]> call(BluetoothGattCharacteristic bluetoothGattCharacteristic) {
return operationQueue.queue(
operationsProvider.provideLongWriteOperation(bluetoothGattCharacteristic,
writeOperationAckStrategy, maxBatchSizeProvider, bytes)
writeOperationAckStrategy, writeOperationRetryStrategy, maxBatchSizeProvider, bytes)
);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.polidea.rxandroidble.internal.connection;

import com.polidea.rxandroidble.RxBleConnection;

import rx.Observable;
import rx.functions.Func1;

public class NoRetryStrategy implements RxBleConnection.WriteOperationRetryStrategy {

@Override
public Observable<LongWriteFailure> call(Observable<LongWriteFailure> observable) {
return observable.flatMap(new Func1<LongWriteFailure, Observable<LongWriteFailure>>() {
@Override
public Observable<LongWriteFailure> call(LongWriteFailure longWriteFailure) {
return Observable.error(longWriteFailure.getCause());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import com.polidea.rxandroidble.ClientComponent;
import com.polidea.rxandroidble.RxBleConnection.WriteOperationAckStrategy;
import com.polidea.rxandroidble.RxBleConnection.WriteOperationRetryStrategy;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.exceptions.BleGattCallbackTimeoutException;
import com.polidea.rxandroidble.exceptions.BleGattCannotStartException;
import com.polidea.rxandroidble.exceptions.BleGattCharacteristicException;
import com.polidea.rxandroidble.exceptions.BleGattException;
import com.polidea.rxandroidble.exceptions.BleGattOperationType;
import com.polidea.rxandroidble.internal.QueueOperation;
import com.polidea.rxandroidble.internal.connection.ConnectionModule;
Expand All @@ -25,7 +28,6 @@
import java.util.UUID;

import bleshadow.javax.inject.Named;

import rx.Emitter;
import rx.Observable;
import rx.Scheduler;
Expand All @@ -43,6 +45,7 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
private final BluetoothGattCharacteristic bluetoothGattCharacteristic;
private final PayloadSizeLimitProvider batchSizeProvider;
private final WriteOperationAckStrategy writeOperationAckStrategy;
private final WriteOperationRetryStrategy writeOperationRetryStrategy;
private final byte[] bytesToWrite;
private byte[] tempBatchArray;

Expand All @@ -54,6 +57,7 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
BluetoothGattCharacteristic bluetoothGattCharacteristic,
PayloadSizeLimitProvider batchSizeProvider,
WriteOperationAckStrategy writeOperationAckStrategy,
WriteOperationRetryStrategy writeOperationRetryStrategy,
byte[] bytesToWrite) {
this.bluetoothGatt = bluetoothGatt;
this.rxBleGattCallback = rxBleGattCallback;
Expand All @@ -62,12 +66,13 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
this.bluetoothGattCharacteristic = bluetoothGattCharacteristic;
this.batchSizeProvider = batchSizeProvider;
this.writeOperationAckStrategy = writeOperationAckStrategy;
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
this.bytesToWrite = bytesToWrite;
}

@Override
protected void protectedRun(final Emitter<byte[]> emitter, final QueueReleaseInterface queueReleaseInterface) throws Throwable {
int batchSize = batchSizeProvider.getPayloadSizeLimit();
final int batchSize = batchSizeProvider.getPayloadSizeLimit();

if (batchSize <= 0) {
throw new IllegalArgumentException("batchSizeProvider value must be greater than zero (now: " + batchSize + ")");
Expand All @@ -90,6 +95,7 @@ protected void protectedRun(final Emitter<byte[]> emitter, final QueueReleaseInt
.repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed(
writeOperationAckStrategy, byteBuffer, emitterWrapper
))
.retryWhen(errorIsRetryableAndAccordingTo(writeOperationRetryStrategy, byteBuffer, batchSize))
.toCompletable()
.subscribe(
new Action0() {
Expand Down Expand Up @@ -209,4 +215,52 @@ public Boolean call(Object emission) {
}
};
}

private static Func1<Observable<? extends Throwable>, Observable<?>> errorIsRetryableAndAccordingTo(
final WriteOperationRetryStrategy writeOperationRetryStrategy,
final ByteBuffer byteBuffer,
final int batchSize) {
return new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> emittedOnWriteFailure) {
return emittedOnWriteFailure
.flatMap(toLongWriteFailureOrError())
.doOnNext(repositionByteBufferForRetry())
.compose(writeOperationRetryStrategy);
}

@NonNull
private Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>> toLongWriteFailureOrError() {
return new Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>>() {
@Override
public Observable<WriteOperationRetryStrategy.LongWriteFailure> call(Throwable throwable) {
if (!(throwable instanceof BleGattCharacteristicException || throwable instanceof BleGattCannotStartException)) {
return Observable.error(throwable);
}
final int failedBatchIndex = calculateFailedBatchIndex(byteBuffer, batchSize);
WriteOperationRetryStrategy.LongWriteFailure longWriteFailure = new WriteOperationRetryStrategy.LongWriteFailure(
failedBatchIndex,
(BleGattException) throwable
);
return Observable.just(longWriteFailure);
}
};
}

@NonNull
private Action1<WriteOperationRetryStrategy.LongWriteFailure> repositionByteBufferForRetry() {
return new Action1<WriteOperationRetryStrategy.LongWriteFailure>() {
@Override
public void call(WriteOperationRetryStrategy.LongWriteFailure longWriteFailure) {
final int newBufferPosition = longWriteFailure.getBatchIndex() * batchSize;
byteBuffer.position(newBufferPosition);
}
};
}

private int calculateFailedBatchIndex(ByteBuffer byteBuffer, int batchSize) {
return (int) Math.ceil(byteBuffer.position() / (float) batchSize) - 1;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface OperationsProvider {
CharacteristicLongWriteOperation provideLongWriteOperation(
BluetoothGattCharacteristic bluetoothGattCharacteristic,
RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy,
RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy,
PayloadSizeLimitProvider maxBatchSizeProvider,
byte[] bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class OperationsProviderImpl implements OperationsProvider {
public CharacteristicLongWriteOperation provideLongWriteOperation(
BluetoothGattCharacteristic bluetoothGattCharacteristic,
RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy,
RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy,
PayloadSizeLimitProvider maxBatchSizeProvider,
byte[] bytes) {

Expand All @@ -63,6 +64,7 @@ public CharacteristicLongWriteOperation provideLongWriteOperation(
bluetoothGattCharacteristic,
maxBatchSizeProvider,
writeOperationAckStrategy,
writeOperationRetryStrategy,
bytes);
}

Expand Down
Loading