Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retry mechanism parameter for LongWriteOperation #357

Merged
merged 19 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.polidea.rxandroidble.RxBleDeviceServices;
import com.polidea.rxandroidble.exceptions.BleConflictingNotificationAlreadySetException;
import com.polidea.rxandroidble.internal.connection.ImmediateSerializedBatchAckStrategy;
import com.polidea.rxandroidble.internal.connection.NoRetryStrategy;
import com.polidea.rxandroidble.internal.util.ObservableUtil;

import java.util.HashMap;
Expand Down Expand Up @@ -283,6 +284,8 @@ public LongWriteOperationBuilder createNewLongWriteBuilder() {

private WriteOperationAckStrategy writeOperationAckStrategy = // default
new ImmediateSerializedBatchAckStrategy();
private WriteOperationRetryStrategy writeOperationRetryStrategy = // default
new NoRetryStrategy();

@Override
public LongWriteOperationBuilder setBytes(@NonNull byte[] bytes) {
Expand Down Expand Up @@ -316,6 +319,13 @@ public LongWriteOperationBuilder setMaxBatchSize(int maxBatchSize) {
return this;
}

@Override
public LongWriteOperationBuilder setWriteOperationRetryStrategy(
@NonNull WriteOperationRetryStrategy writeOperationRetryStrategy) {
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
return this;
}

@Override
public LongWriteOperationBuilder setWriteOperationAckStrategy(@NonNull WriteOperationAckStrategy writeOperationAckStrategy) {
this.writeOperationAckStrategy = writeOperationAckStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func2;

/**
* The BLE connection handle, supporting GATT operations. Operations are enqueued and the library makes sure that they are not
Expand Down Expand Up @@ -131,6 +132,13 @@ interface LongWriteOperationBuilder {
*/
LongWriteOperationBuilder setMaxBatchSize(@IntRange(from = 1, to = GATT_MTU_MAXIMUM - GATT_WRITE_MTU_OVERHEAD) int maxBatchSize);

/**
* Setter for a retry strategy in case something goes wrong when writing data.
* @param writeOperationRetryStrategy the retry strategy
* @return the LongWriteOperationBuilder
*/
LongWriteOperationBuilder setWriteOperationRetryStrategy(@NonNull WriteOperationRetryStrategy writeOperationRetryStrategy);

/**
* Setter for a strategy used to mark batch write completed. Only after previous batch has finished, the next (if any left) can be
* written.
Expand Down Expand Up @@ -442,6 +450,9 @@ Observable<byte[]> writeDescriptor(@NonNull UUID serviceUuid, @NonNull UUID char
*/
Observable<byte[]> writeDescriptor(@NonNull BluetoothGattDescriptor descriptor, @NonNull byte[] data);

interface WriteOperationRetryStrategy extends Func2<Integer, Throwable, Boolean> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be placed near WriteOperationAckStrategy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.


}

/**
* Performs a GATT request connection priority operation, which requests a connection parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public final class LongWriteOperationBuilderImpl implements RxBleConnection.Long
private Observable<BluetoothGattCharacteristic> writtenCharacteristicObservable;
private PayloadSizeLimitProvider maxBatchSizeProvider;
private RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy();
private RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy = new NoRetryStrategy();

private byte[] bytes;

Expand Down Expand Up @@ -63,6 +64,13 @@ public RxBleConnection.LongWriteOperationBuilder setMaxBatchSize(final int maxBa
return this;
}

@Override
public RxBleConnection.LongWriteOperationBuilder setWriteOperationRetryStrategy(
@NonNull RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy) {
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
return this;
}

@Override
public RxBleConnection.LongWriteOperationBuilder setWriteOperationAckStrategy(
@NonNull RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy) {
Expand All @@ -87,7 +95,7 @@ public Observable<byte[]> build() {
public Observable<byte[]> call(BluetoothGattCharacteristic bluetoothGattCharacteristic) {
return operationQueue.queue(
operationsProvider.provideLongWriteOperation(bluetoothGattCharacteristic,
writeOperationAckStrategy, maxBatchSizeProvider, bytes)
writeOperationAckStrategy, writeOperationRetryStrategy, maxBatchSizeProvider, bytes)
);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.polidea.rxandroidble.internal.connection;

import com.polidea.rxandroidble.RxBleConnection;

public class NoRetryStrategy implements RxBleConnection.WriteOperationRetryStrategy {

@Override
public Boolean call(Integer integer, Throwable throwable) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.polidea.rxandroidble.ClientComponent;
import com.polidea.rxandroidble.RxBleConnection.WriteOperationAckStrategy;
import com.polidea.rxandroidble.RxBleConnection.WriteOperationRetryStrategy;
import com.polidea.rxandroidble.exceptions.BleDisconnectedException;
import com.polidea.rxandroidble.exceptions.BleException;
import com.polidea.rxandroidble.exceptions.BleGattCallbackTimeoutException;
Expand All @@ -33,6 +34,7 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {

Expand All @@ -43,8 +45,10 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
private final BluetoothGattCharacteristic bluetoothGattCharacteristic;
private final PayloadSizeLimitProvider batchSizeProvider;
private final WriteOperationAckStrategy writeOperationAckStrategy;
private final WriteOperationRetryStrategy writeOperationRetryStrategy;
private final byte[] bytesToWrite;
private byte[] tempBatchArray;
private int retryCounter;

CharacteristicLongWriteOperation(
BluetoothGatt bluetoothGatt,
Expand All @@ -54,6 +58,7 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
BluetoothGattCharacteristic bluetoothGattCharacteristic,
PayloadSizeLimitProvider batchSizeProvider,
WriteOperationAckStrategy writeOperationAckStrategy,
WriteOperationRetryStrategy writeOperationRetryStrategy,
byte[] bytesToWrite) {
this.bluetoothGatt = bluetoothGatt;
this.rxBleGattCallback = rxBleGattCallback;
Expand All @@ -62,12 +67,14 @@ public class CharacteristicLongWriteOperation extends QueueOperation<byte[]> {
this.bluetoothGattCharacteristic = bluetoothGattCharacteristic;
this.batchSizeProvider = batchSizeProvider;
this.writeOperationAckStrategy = writeOperationAckStrategy;
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
this.bytesToWrite = bytesToWrite;
this.retryCounter = 0;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of variables that are changed as a side effect. This usually makes room for a mistake and makes the code harder to trace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean we should not have a separated counter in this Operation class at all?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that Darek meant putting a reset in a doOnNext. AFAIK you can safely reset the counter before the rx subscription (around line 86.)

}

@Override
protected void protectedRun(final Emitter<byte[]> emitter, final QueueReleaseInterface queueReleaseInterface) throws Throwable {
int batchSize = batchSizeProvider.getPayloadSizeLimit();
final int batchSize = batchSizeProvider.getPayloadSizeLimit();

if (batchSize <= 0) {
throw new IllegalArgumentException("batchSizeProvider value must be greater than zero (now: " + batchSize + ")");
Expand All @@ -87,9 +94,11 @@ protected void protectedRun(final Emitter<byte[]> emitter, final QueueReleaseInt
timeoutObservable,
timeoutConfiguration.timeoutScheduler
)
.doOnNext(resetRetryCounter())
.repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed(
writeOperationAckStrategy, byteBuffer, emitterWrapper
))
.retry(retryStrategy(byteBuffer, batchSize))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should go with the .retryWhen() function rather than with just .retry(). This is because the .retryWhen() is more flexible. It allows for differentiation of the retry behaviour i.e. one may want to retry the write after some time or depending on other external factor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, that makes sense.

.toCompletable()
.subscribe(
new Action0() {
Expand Down Expand Up @@ -209,4 +218,30 @@ public Boolean call(Object emission) {
}
};
}

private Action1<ByteAssociation<UUID>> resetRetryCounter() {
return new Action1<ByteAssociation<UUID>>() {
@Override
public void call(ByteAssociation<UUID> uuidByteAssociation) {
retryCounter = 0;
}
};
}

private Func2<Integer, Throwable, Boolean> retryStrategy(final ByteBuffer byteBuffer, final int batchSize) {
return new Func2<Integer, Throwable, Boolean>() {
@Override
public Boolean call(Integer integer, Throwable throwable) {
// Individual counter for each batch payload.
retryCounter++;
final Boolean retry = writeOperationRetryStrategy.call(retryCounter, throwable);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The biggest question here is to:

  1. Define what data one will need to decide on a retry
  2. Smart way to extend that data without breaking the API in the future (DTO? @dariuszseweryn)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, probably we should go for WriteOperationRetryStrategy with something like:

class LongWriteFailure {
  final int batchNumber;
  // final int retryCount; –> this should not be needed as it is trivial for the user to manage
  final BleException cause;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're reading in my mind :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with the following retryWhen() function:

private static Func1<Observable<? extends Throwable>, Observable<?>> retryOperationStrategy(
            final WriteOperationRetryStrategy writeOperationRetryStrategy,
            final ByteBuffer byteBuffer,
            final int batchSize) {
        return new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable
                        .flatMap(applyRetryStrategy())
                        .map(replaceByteBufferPositionForRetry());
            }

            @NonNull
            private Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>> applyRetryStrategy() {
                return new Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>>() {
                    @Override
                    public Observable<WriteOperationRetryStrategy.LongWriteFailure> call(Throwable cause) {
                        if (!(cause instanceof BleException)) {
                            return Observable.error(cause);
                        }

                        final int failedBatchNumber = (byteBuffer.position() - batchSize) / batchSize;
                        final WriteOperationRetryStrategy.LongWriteFailure longWriteFailure =
                                new WriteOperationRetryStrategy.LongWriteFailure(failedBatchNumber, (BleException) cause);

                        return writeOperationRetryStrategy.call(Observable.just(longWriteFailure));
                    }
                };
            }

            @NonNull
            private Func1<WriteOperationRetryStrategy.LongWriteFailure, Object> replaceByteBufferPositionForRetry() {
                return new Func1<WriteOperationRetryStrategy.LongWriteFailure, Object>() {
                    @Override
                    public Object call(WriteOperationRetryStrategy.LongWriteFailure longWriteFailure) {
                        final int newBufferPosition = longWriteFailure.getBatchNumber() * batchSize;
                        byteBuffer.position(newBufferPosition);
                        return longWriteFailure;
                    }
                };
            }
        };
    }

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you could push another commit to this PR I could then comment on specific lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done!

if (!retry) {
return false;
}
// Reset buffer to last position
byteBuffer.position(byteBuffer.position() - batchSize);
return true;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface OperationsProvider {
CharacteristicLongWriteOperation provideLongWriteOperation(
BluetoothGattCharacteristic bluetoothGattCharacteristic,
RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy,
RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy,
PayloadSizeLimitProvider maxBatchSizeProvider,
byte[] bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class OperationsProviderImpl implements OperationsProvider {
public CharacteristicLongWriteOperation provideLongWriteOperation(
BluetoothGattCharacteristic bluetoothGattCharacteristic,
RxBleConnection.WriteOperationAckStrategy writeOperationAckStrategy,
RxBleConnection.WriteOperationRetryStrategy writeOperationRetryStrategy,
PayloadSizeLimitProvider maxBatchSizeProvider,
byte[] bytes) {

Expand All @@ -59,6 +60,7 @@ public CharacteristicLongWriteOperation provideLongWriteOperation(
bluetoothGattCharacteristic,
maxBatchSizeProvider,
writeOperationAckStrategy,
writeOperationRetryStrategy,
bytes);
}

Expand Down