diff --git a/mockrxandroidble/src/main/java/com/polidea/rxandroidble/mockrxandroidble/RxBleConnectionMock.java b/mockrxandroidble/src/main/java/com/polidea/rxandroidble/mockrxandroidble/RxBleConnectionMock.java index 979257ba6..9813aee9c 100644 --- a/mockrxandroidble/src/main/java/com/polidea/rxandroidble/mockrxandroidble/RxBleConnectionMock.java +++ b/mockrxandroidble/src/main/java/com/polidea/rxandroidble/mockrxandroidble/RxBleConnectionMock.java @@ -10,6 +10,7 @@ import com.polidea.rxandroidble.RxBleDeviceServices; import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException; import com.polidea.rxandroidble.internal.connection.ImmediateSerializedBatchAckStrategy; +import com.polidea.rxandroidble.internal.connection.NoRetryStrategy; import com.polidea.rxandroidble.internal.util.ObservableUtil; import java.util.HashMap; @@ -283,6 +284,8 @@ public LongWriteOperationBuilder createNewLongWriteBuilder() { private WriteOperationAckStrategy writeOperationAckStrategy = // default new ImmediateSerializedBatchAckStrategy(); + private WriteOperationRetryStrategy writeOperationRetryStrategy = // default + new NoRetryStrategy(); @Override public LongWriteOperationBuilder setBytes(@NonNull byte[] bytes) { @@ -316,6 +319,13 @@ public LongWriteOperationBuilder setMaxBatchSize(int maxBatchSize) { return this; } + @Override + public LongWriteOperationBuilder setWriteOperationRetryStrategy( + @NonNull WriteOperationRetryStrategy writeOperationRetryStrategy) { + this.writeOperationRetryStrategy = writeOperationRetryStrategy; + return this; + } + @Override public LongWriteOperationBuilder setWriteOperationAckStrategy(@NonNull WriteOperationAckStrategy writeOperationAckStrategy) { this.writeOperationAckStrategy = writeOperationAckStrategy; diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/RxBleConnection.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/RxBleConnection.java index ea8e8f08b..cced48559 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/RxBleConnection.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/RxBleConnection.java @@ -12,6 +12,7 @@ import com.polidea.rxandroidble.exceptions.BleCannotSetCharacteristicNotificationException; import com.polidea.rxandroidble.exceptions.BleCharacteristicNotFoundException; import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException; +import com.polidea.rxandroidble.exceptions.BleException; import com.polidea.rxandroidble.exceptions.BleGattCannotStartException; import com.polidea.rxandroidble.exceptions.BleGattException; import com.polidea.rxandroidble.exceptions.BleGattOperationType; @@ -131,6 +132,23 @@ interface LongWriteOperationBuilder { */ LongWriteOperationBuilder setMaxBatchSize(@IntRange(from = 1, to = GATT_MTU_MAXIMUM - GATT_WRITE_MTU_OVERHEAD) int maxBatchSize); + /** + * Setter for a retry strategy in case something goes wrong when writing data. If any {@link BleException} is raised, + * a {@link WriteOperationRetryStrategy.LongWriteFailure} object is emitted. + * {@link WriteOperationRetryStrategy.LongWriteFailure} contains both the {@link BleException} and the batch number + * for which the write request failed. The {@link WriteOperationRetryStrategy.LongWriteFailure} emitted by the + * writeOperationRetryStrategy will be used to retry the specified batch number write request. + *
+ * If this is not specified - if batch write fails, the long write operation is stopped and whole operation is interrupted. + *
+ * It is expected that the Observable returned from the writeOperationRetryStrategy will emit exactly the same events as the source, + * however you may delay them at your pace. + * + * @param writeOperationRetryStrategy the retry strategy + * @return the LongWriteOperationBuilder + */ + LongWriteOperationBuilder setWriteOperationRetryStrategy(@NonNull WriteOperationRetryStrategy writeOperationRetryStrategy); + /** * Setter for a strategy used to mark batch write completed. Only after previous batch has finished, the next (if any left) can be * written. @@ -160,6 +178,54 @@ interface LongWriteOperationBuilder { Observable build(); } + /** + * Retry strategy allows retrying a long write operation. There are two supported scenarios: + * - Once the failure happens you may re-emit the failure you've received, applying your own transformations like a delay or any other, + * aiming to postpone the retry procedure. + * - If that Observable calls {@code onComplete} or {@code onError} then {@code retry} will call + * {@code onCompleted} or {@code onError} on the child subscription. The emission will be forwarded as an operation result. + * + * For general documentation related to retrying please refer to http://reactivex.io/documentation/operators/retry.html + */ + interface WriteOperationRetryStrategy extends Observable.Transformer { + + class LongWriteFailure { + + final int batchIndex; + final BleGattException cause; + + /** + * Default constructor + * + * @param batchIndex the zero-based batch index on which the write request failed + * @param cause the failed cause of the write request + */ + public LongWriteFailure(int batchIndex, BleGattException cause) { + this.batchIndex = batchIndex; + this.cause = cause; + } + + /** + * Get the batch index of the failed write request + * + * @return the zero-based batch index + */ + public int getBatchIndex() { + return batchIndex; + } + + /** + * Get the failed cause of the write request + * + * @return a {@link BleGattException} + */ + public BleGattException getCause() { + return cause; + } + } + } + interface WriteOperationAckStrategy extends Observable.Transformer { } @@ -442,7 +508,6 @@ Observable writeDescriptor(@NonNull UUID serviceUuid, @NonNull UUID char */ Observable writeDescriptor(@NonNull BluetoothGattDescriptor descriptor, @NonNull byte[] data); - /** * Performs a GATT request connection priority operation, which requests a connection parameter * update on the remote device. NOTE: peripheral may silently decline request. diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/LongWriteOperationBuilderImpl.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/LongWriteOperationBuilderImpl.java index c71c15bd0..17ab674d6 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/LongWriteOperationBuilderImpl.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/LongWriteOperationBuilderImpl.java @@ -23,6 +23,7 @@ public final class LongWriteOperationBuilderImpl implements RxBleConnection.Long private Observable writtenCharacteristicObservable; private PayloadSizeLimitProvider maxBatchSizeProvider; private RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy(); + private RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy = new NoRetryStrategy(); private byte[] bytes; @@ -63,6 +64,13 @@ public RxBleConnection.LongWriteOperationBuilder setMaxBatchSize(final int maxBa return this; } + @Override + public RxBleConnection.LongWriteOperationBuilder setWriteOperationRetryStrategy( + @NonNull RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy) { + this.writeOperationRetryStrategy = writeOperationRetryStrategy; + return this; + } + @Override public RxBleConnection.LongWriteOperationBuilder setWriteOperationAckStrategy( @NonNull RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy) { @@ -87,7 +95,7 @@ public Observable build() { public Observable call(BluetoothGattCharacteristic bluetoothGattCharacteristic) { return operationQueue.queue( operationsProvider.provideLongWriteOperation(bluetoothGattCharacteristic, - writeOperationAckStrategy, maxBatchSizeProvider, bytes) + writeOperationAckStrategy, writeOperationRetryStrategy, maxBatchSizeProvider, bytes) ); } }); diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/NoRetryStrategy.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/NoRetryStrategy.java new file mode 100644 index 000000000..1cbb8dea7 --- /dev/null +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/connection/NoRetryStrategy.java @@ -0,0 +1,19 @@ +package com.polidea.rxandroidble.internal.connection; + +import com.polidea.rxandroidble.RxBleConnection; + +import rx.Observable; +import rx.functions.Func1; + +public class NoRetryStrategy implements RxBleConnection.WriteOperationRetryStrategy { + + @Override + public Observable call(Observable observable) { + return observable.flatMap(new Func1>() { + @Override + public Observable call(LongWriteFailure longWriteFailure) { + return Observable.error(longWriteFailure.getCause()); + } + }); + } +} diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/CharacteristicLongWriteOperation.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/CharacteristicLongWriteOperation.java index a108c0604..9a181ffd8 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/CharacteristicLongWriteOperation.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/CharacteristicLongWriteOperation.java @@ -8,10 +8,13 @@ import com.polidea.rxandroidble.ClientComponent; import com.polidea.rxandroidble.RxBleConnection.WriteOperationAckStrategy; +import com.polidea.rxandroidble.RxBleConnection.WriteOperationRetryStrategy; import com.polidea.rxandroidble.exceptions.BleDisconnectedException; import com.polidea.rxandroidble.exceptions.BleException; import com.polidea.rxandroidble.exceptions.BleGattCallbackTimeoutException; import com.polidea.rxandroidble.exceptions.BleGattCannotStartException; +import com.polidea.rxandroidble.exceptions.BleGattCharacteristicException; +import com.polidea.rxandroidble.exceptions.BleGattException; import com.polidea.rxandroidble.exceptions.BleGattOperationType; import com.polidea.rxandroidble.internal.QueueOperation; import com.polidea.rxandroidble.internal.connection.ConnectionModule; @@ -25,7 +28,6 @@ import java.util.UUID; import bleshadow.javax.inject.Named; - import rx.Emitter; import rx.Observable; import rx.Scheduler; @@ -43,6 +45,7 @@ public class CharacteristicLongWriteOperation extends QueueOperation { private final BluetoothGattCharacteristic bluetoothGattCharacteristic; private final PayloadSizeLimitProvider batchSizeProvider; private final WriteOperationAckStrategy writeOperationAckStrategy; + private final WriteOperationRetryStrategy writeOperationRetryStrategy; private final byte[] bytesToWrite; private byte[] tempBatchArray; @@ -54,6 +57,7 @@ public class CharacteristicLongWriteOperation extends QueueOperation { BluetoothGattCharacteristic bluetoothGattCharacteristic, PayloadSizeLimitProvider batchSizeProvider, WriteOperationAckStrategy writeOperationAckStrategy, + WriteOperationRetryStrategy writeOperationRetryStrategy, byte[] bytesToWrite) { this.bluetoothGatt = bluetoothGatt; this.rxBleGattCallback = rxBleGattCallback; @@ -62,12 +66,13 @@ public class CharacteristicLongWriteOperation extends QueueOperation { this.bluetoothGattCharacteristic = bluetoothGattCharacteristic; this.batchSizeProvider = batchSizeProvider; this.writeOperationAckStrategy = writeOperationAckStrategy; + this.writeOperationRetryStrategy = writeOperationRetryStrategy; this.bytesToWrite = bytesToWrite; } @Override protected void protectedRun(final Emitter emitter, final QueueReleaseInterface queueReleaseInterface) throws Throwable { - int batchSize = batchSizeProvider.getPayloadSizeLimit(); + final int batchSize = batchSizeProvider.getPayloadSizeLimit(); if (batchSize <= 0) { throw new IllegalArgumentException("batchSizeProvider value must be greater than zero (now: " + batchSize + ")"); @@ -90,6 +95,7 @@ protected void protectedRun(final Emitter emitter, final QueueReleaseInt .repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed( writeOperationAckStrategy, byteBuffer, emitterWrapper )) + .retryWhen(errorIsRetryableAndAccordingTo(writeOperationRetryStrategy, byteBuffer, batchSize)) .toCompletable() .subscribe( new Action0() { @@ -209,4 +215,52 @@ public Boolean call(Object emission) { } }; } + + private static Func1, Observable> errorIsRetryableAndAccordingTo( + final WriteOperationRetryStrategy writeOperationRetryStrategy, + final ByteBuffer byteBuffer, + final int batchSize) { + return new Func1, Observable>() { + @Override + public Observable call(Observable emittedOnWriteFailure) { + return emittedOnWriteFailure + .flatMap(toLongWriteFailureOrError()) + .doOnNext(repositionByteBufferForRetry()) + .compose(writeOperationRetryStrategy); + } + + @NonNull + private Func1> toLongWriteFailureOrError() { + return new Func1>() { + @Override + public Observable call(Throwable throwable) { + if (!(throwable instanceof BleGattCharacteristicException || throwable instanceof BleGattCannotStartException)) { + return Observable.error(throwable); + } + final int failedBatchIndex = calculateFailedBatchIndex(byteBuffer, batchSize); + WriteOperationRetryStrategy.LongWriteFailure longWriteFailure = new WriteOperationRetryStrategy.LongWriteFailure( + failedBatchIndex, + (BleGattException) throwable + ); + return Observable.just(longWriteFailure); + } + }; + } + + @NonNull + private Action1 repositionByteBufferForRetry() { + return new Action1() { + @Override + public void call(WriteOperationRetryStrategy.LongWriteFailure longWriteFailure) { + final int newBufferPosition = longWriteFailure.getBatchIndex() * batchSize; + byteBuffer.position(newBufferPosition); + } + }; + } + + private int calculateFailedBatchIndex(ByteBuffer byteBuffer, int batchSize) { + return (int) Math.ceil(byteBuffer.position() / (float) batchSize) - 1; + } + }; + } } diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProvider.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProvider.java index 4526533d5..19d11dd9f 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProvider.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProvider.java @@ -15,6 +15,7 @@ public interface OperationsProvider { CharacteristicLongWriteOperation provideLongWriteOperation( BluetoothGattCharacteristic bluetoothGattCharacteristic, RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy, + RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy, PayloadSizeLimitProvider maxBatchSizeProvider, byte[] bytes); diff --git a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProviderImpl.java b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProviderImpl.java index 267d15117..53de16be2 100644 --- a/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProviderImpl.java +++ b/rxandroidble/src/main/java/com/polidea/rxandroidble/internal/operations/OperationsProviderImpl.java @@ -53,6 +53,7 @@ public class OperationsProviderImpl implements OperationsProvider { public CharacteristicLongWriteOperation provideLongWriteOperation( BluetoothGattCharacteristic bluetoothGattCharacteristic, RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy, + RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy, PayloadSizeLimitProvider maxBatchSizeProvider, byte[] bytes) { @@ -63,6 +64,7 @@ public CharacteristicLongWriteOperation provideLongWriteOperation( bluetoothGattCharacteristic, maxBatchSizeProvider, writeOperationAckStrategy, + writeOperationRetryStrategy, bytes); } diff --git a/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/OperationCharacteristicLongWriteTest.groovy b/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/OperationCharacteristicLongWriteTest.groovy index 58dc12d28..212845105 100644 --- a/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/OperationCharacteristicLongWriteTest.groovy +++ b/rxandroidble/src/test/groovy/com/polidea/rxandroidble/internal/operations/OperationCharacteristicLongWriteTest.groovy @@ -6,15 +6,14 @@ import android.bluetooth.BluetoothGattCharacteristic import com.polidea.rxandroidble.RxBleConnection import com.polidea.rxandroidble.exceptions.BleGattCallbackTimeoutException import com.polidea.rxandroidble.exceptions.BleGattCannotStartException +import com.polidea.rxandroidble.exceptions.BleGattCharacteristicException import com.polidea.rxandroidble.exceptions.BleGattOperationType -import com.polidea.rxandroidble.internal.serialization.QueueReleaseInterface import com.polidea.rxandroidble.internal.connection.ImmediateSerializedBatchAckStrategy +import com.polidea.rxandroidble.internal.connection.NoRetryStrategy import com.polidea.rxandroidble.internal.connection.RxBleGattCallback +import com.polidea.rxandroidble.internal.serialization.QueueReleaseInterface import com.polidea.rxandroidble.internal.util.ByteAssociation import com.polidea.rxandroidble.internal.util.MockOperationTimeoutConfiguration -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger import rx.Observable import rx.functions.Func1 import rx.internal.schedulers.ImmediateScheduler @@ -25,6 +24,10 @@ import spock.lang.Shared import spock.lang.Specification import spock.lang.Unroll +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + public class OperationCharacteristicLongWriteTest extends Specification { private static long DEFAULT_WRITE_DELAY = 1 @@ -35,6 +38,7 @@ public class OperationCharacteristicLongWriteTest extends Specification { RxBleGattCallback mockCallback = Mock RxBleGattCallback BluetoothGattCharacteristic mockCharacteristic = Mock BluetoothGattCharacteristic RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy + RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy def testSubscriber = new TestSubscriber() TestScheduler testScheduler = new TestScheduler() TestScheduler timeoutScheduler = new TestScheduler() @@ -42,7 +46,9 @@ public class OperationCharacteristicLongWriteTest extends Specification { PublishSubject> onCharacteristicWriteSubject = PublishSubject.create() QueueReleaseInterface mockQueueReleaseInterface = Mock QueueReleaseInterface CharacteristicLongWriteOperation objectUnderTest - @Shared Exception testException = new Exception("testException") + @Shared + Exception testException = new Exception("testException") + BleGattCharacteristicException bleGattCharacteristicException = Mock BleGattCharacteristicException def setup() { mockCharacteristic.getUuid() >> mockCharacteristicUUID @@ -197,30 +203,205 @@ public class OperationCharacteristicLongWriteTest extends Specification { 1 * mockCharacteristic.setValue(_) >> true } - def "should call next BluetoothGatt.writeCharacteristic() after the previous RxBleGattCallback.onCharacteristicWrite() emits and operation is acknowledged"() { + def "should attempt to write next batch after the previous has completed and has been acknowledged - no retry strategy"() { given: + this.writeOperationRetryStrategy = new NoRetryStrategy() AcknowledgementTrigger writeAckTrigger = givenWillTriggerWriteAcknowledgement() givenEachCharacteristicWriteOkAfterDefaultDelay() - prepareObjectUnderTest(20, byteArray(60)) + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) when: objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) advanceTimeForWritesToComplete(1) then: - 1 * mockCharacteristic.setValue(_) >> true + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true when: - advanceTimeForWrites(0) + writeAckTrigger.acknowledgeWrite() then: - 0 * mockCharacteristic.setValue(_) >> true + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + } + + def "should not attempt to write next batch or rewrite after the previous has failed - no retry strategy"() { + given: + this.writeOperationRetryStrategy = new NoRetryStrategy() + this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) when: - writeAckTrigger.acknowledgeWrite() + objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) then: - 1 * mockCharacteristic.setValue(_) >> true + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> false + testSubscriber.assertError(BleGattCannotStartException) + } + + def "attempt to rewrite the failed batch if the strategy has emitted the LongWriteFailure - first batch"() { + given: + this.writeOperationRetryStrategy = givenWillRetryWriteOperation() + this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + + when: + objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) + + then: + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> false + testSubscriber.assertNoTerminalEvent() + + when: + writeOperationRetryStrategy.triggerRetry() + + then: + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x3, 0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + testSubscriber.assertValueEquals([0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + testSubscriber.assertCompleted() + } + + def "attempt to rewrite the failed batch if the strategy has emitted the LongWriteFailure - mid batch"() { + given: + this.writeOperationRetryStrategy = givenWillRetryWriteOperation() + this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + + when: + objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) + + then: + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> false + + when: + writeOperationRetryStrategy.triggerRetry() + + then: + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x3, 0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + testSubscriber.assertValueEquals([0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + testSubscriber.assertCompleted() + } + + def "attempt to rewrite the failed batch if the strategy has emitted the LongWriteFailure - last batch"() { + given: + this.writeOperationRetryStrategy = givenWillRetryWriteOperation() + this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + + when: + objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) + + then: + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x3, 0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> false + + when: + writeOperationRetryStrategy.triggerRetry() + + then: + 1 * mockCharacteristic.setValue([0x3, 0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + testSubscriber.assertValueEquals([0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) + testSubscriber.assertCompleted() + } + + def "attempt to rewrite the failed batch if the strategy has emitted the LongWriteFailure - last batch, uneven count"() { + given: + this.writeOperationRetryStrategy = givenWillRetryWriteOperation() + this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3] as byte[]) + + when: + objectUnderTest.run(mockQueueReleaseInterface).subscribe(testSubscriber) + + then: + 1 * mockCharacteristic.setValue([0x1, 0x1] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x2, 0x2] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + then: + 1 * mockCharacteristic.setValue([0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> false + + when: + writeOperationRetryStrategy.triggerRetry() + + then: + 1 * mockCharacteristic.setValue([0x3] as byte[]) >> true + 1 * mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + onCharacteristicWriteSubject.onNext(new ByteAssociation(characteristic.getUuid(), [] as byte[])) + true + } + + testSubscriber.assertValueEquals([0x1, 0x1, 0x2, 0x2, 0x3] as byte[]) + testSubscriber.assertCompleted() } def "should release QueueReleaseInterface after successful write"() { @@ -364,7 +545,8 @@ public class OperationCharacteristicLongWriteTest extends Specification { } private void givenWillWriteNextBatchImmediatelyAfterPrevious() { - writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy(); + writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() + writeOperationRetryStrategy = new NoRetryStrategy() } private AcknowledgementTrigger givenWillTriggerWriteAcknowledgement() { @@ -373,6 +555,12 @@ public class OperationCharacteristicLongWriteTest extends Specification { return trigger } + private RetryWriteOperation givenWillRetryWriteOperation() { + def retry = new RetryWriteOperation() + this.writeOperationRetryStrategy = retry + return retry + } + class AcknowledgementTrigger implements RxBleConnection.WriteOperationAckStrategy { private final PublishSubject triggerSubject = PublishSubject.create() @@ -395,6 +583,28 @@ public class OperationCharacteristicLongWriteTest extends Specification { } } + class RetryWriteOperation implements RxBleConnection.WriteOperationRetryStrategy { + + private final PublishSubject triggerSubject = PublishSubject.create() + + public void triggerRetry() { + triggerSubject.with { + onNext(true) + onCompleted() + } + } + + @Override + Observable call( + Observable longWriteFailureObservable) { + return longWriteFailureObservable.flatMap({ longWriteFailure -> + return triggerSubject.map({ aBoolean -> + return longWriteFailure + }) + }) + } + } + private static byte[] byteArray(int size) { byte[] bytes = new byte[size]; for (int i = 0; i < size; i++) { @@ -498,6 +708,26 @@ public class OperationCharacteristicLongWriteTest extends Specification { } } + private givenCharacteristicWriteOkButEventuallyFailsToWrite(int failingWriteIndex) { + AtomicInteger writeIndex = new AtomicInteger(0) + + mockGatt.writeCharacteristic(mockCharacteristic) >> { BluetoothGattCharacteristic characteristic -> + UUID uuid = characteristic.getUuid() + byte[] returnBytes = new byte[0] + + testScheduler.createWorker().schedule({ + int currentIndex = writeIndex.getAndIncrement() + if (currentIndex == failingWriteIndex) { + onCharacteristicWriteSubject.onError(bleGattCharacteristicException) + } else { + onCharacteristicWriteSubject.onNext(new ByteAssociation(uuid, returnBytes)) + } + }, DEFAULT_WRITE_DELAY, TimeUnit.SECONDS) + + true + } + } + private givenCharacteristicWriteOkButEventuallyStalls(int failingWriteIndex) { AtomicInteger writeIndex = new AtomicInteger(0) @@ -525,6 +755,7 @@ public class OperationCharacteristicLongWriteTest extends Specification { mockCharacteristic, { maxBatchSize }, writeOperationAckStrategy, + writeOperationRetryStrategy, testData ) }