-
Notifications
You must be signed in to change notification settings - Fork 588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added retry mechanism parameter for LongWriteOperation #357
Added retry mechanism parameter for LongWriteOperation #357
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! Great that you have took on this one!
I have some comments I have put in the code which I think should get addressed. I would also like @uKL to look on this.
Anyway, this will be a very neat feature and definitely very useful!
@@ -442,6 +450,9 @@ public String toString() { | |||
*/ | |||
Observable<byte[]> writeDescriptor(@NonNull BluetoothGattDescriptor descriptor, @NonNull byte[] data); | |||
|
|||
interface WriteOperationRetryStrategy extends Func2<Integer, Throwable, Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be placed near WriteOperationAckStrategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
.repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed( | ||
writeOperationAckStrategy, byteBuffer, emitterWrapper | ||
)) | ||
.retry(retryStrategy(byteBuffer, batchSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should go with the .retryWhen()
function rather than with just .retry()
. This is because the .retryWhen()
is more flexible. It allows for differentiation of the retry behaviour i.e. one may want to retry the write after some time or depending on other external factor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, that makes sense.
this.bytesToWrite = bytesToWrite; | ||
this.retryCounter = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of variables that are changed as a side effect. This usually makes room for a mistake and makes the code harder to trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean we should not have a separated counter in this Operation class at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that Darek meant putting a reset in a doOnNext
. AFAIK you can safely reset the counter before the rx subscription (around line 86.)
this.bytesToWrite = bytesToWrite; | ||
this.retryCounter = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that Darek meant putting a reset in a doOnNext
. AFAIK you can safely reset the counter before the rx subscription (around line 86.)
public Boolean call(Integer integer, Throwable throwable) { | ||
// Individual counter for each batch payload. | ||
retryCounter++; | ||
final Boolean retry = writeOperationRetryStrategy.call(retryCounter, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The biggest question here is to:
- Define what data one will need to decide on a retry
- Smart way to extend that data without breaking the API in the future (DTO? @dariuszseweryn)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, probably we should go for WriteOperationRetryStrategy
with something like:
class LongWriteFailure {
final int batchNumber;
// final int retryCount; –> this should not be needed as it is trivial for the user to manage
final BleException cause;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're reading in my mind :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with the following retryWhen()
function:
private static Func1<Observable<? extends Throwable>, Observable<?>> retryOperationStrategy(
final WriteOperationRetryStrategy writeOperationRetryStrategy,
final ByteBuffer byteBuffer,
final int batchSize) {
return new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.flatMap(applyRetryStrategy())
.map(replaceByteBufferPositionForRetry());
}
@NonNull
private Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>> applyRetryStrategy() {
return new Func1<Throwable, Observable<WriteOperationRetryStrategy.LongWriteFailure>>() {
@Override
public Observable<WriteOperationRetryStrategy.LongWriteFailure> call(Throwable cause) {
if (!(cause instanceof BleException)) {
return Observable.error(cause);
}
final int failedBatchNumber = (byteBuffer.position() - batchSize) / batchSize;
final WriteOperationRetryStrategy.LongWriteFailure longWriteFailure =
new WriteOperationRetryStrategy.LongWriteFailure(failedBatchNumber, (BleException) cause);
return writeOperationRetryStrategy.call(Observable.just(longWriteFailure));
}
};
}
@NonNull
private Func1<WriteOperationRetryStrategy.LongWriteFailure, Object> replaceByteBufferPositionForRetry() {
return new Func1<WriteOperationRetryStrategy.LongWriteFailure, Object>() {
@Override
public Object call(WriteOperationRetryStrategy.LongWriteFailure longWriteFailure) {
final int newBufferPosition = longWriteFailure.getBatchNumber() * batchSize;
byteBuffer.position(newBufferPosition);
return longWriteFailure;
}
};
}
};
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you could push another commit to this PR I could then comment on specific lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better! :)
I have written some comments in the code
We are getting closer
class LongWriteFailure { | ||
|
||
final int batchNumber; | ||
final BleException cause; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually should be a BleGattCharacteristicException
as disabling the BluetoothAdapter
is not really retriable...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both should be final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok for BleGattCharacteristicException
but they are already both final :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I must be tired...
final WriteOperationRetryStrategy.LongWriteFailure longWriteFailure = | ||
new WriteOperationRetryStrategy.LongWriteFailure(failedBatchNumber, (BleException) cause); | ||
|
||
return writeOperationRetryStrategy.call(Observable.just(longWriteFailure)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an ideal solution since the writeOperationRetryStrategy
will get called each time the a new failure will happen where is could be called only once per subscription. Checkout how the Observable.compose()
function or bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed
works.
Basically this:
final int failedBatchNumber = (byteBuffer.position() - batchSize) / batchSize;
final WriteOperationRetryStrategy.LongWriteFailure longWriteFailure =
new WriteOperationRetryStrategy.LongWriteFailure(failedBatchNumber, (BleException) cause);
could be separated to a Observable.map()
and the writeOperationRetryStrategy
could be treated as a Observable.Transformer
(see .compose()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How close am I with something like that :) :
@Override
public Observable<?> call(Observable<? extends Throwable> emittedOnWriteFailure) {
return writeOperationRetryStrategy.call(
emittedOnWriteFailure.map(mapToLongWriteFailureObject())
)
.doOnNext(replaceByteBufferPositionForRetry());
}
No other Throwable
than BleException
can be emitted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No other than BleGattCharacteristicException
can be retried... Again, new commit in this PR makes it easier to see the whole picture. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I've pushed :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I have seen—just have a bit of work now. Will get back to you as soon as possible. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have checked out your branch and I hope I will have a moment Tomorrow to play with it and see if I can make it a little bit nicer. I cannot think of how to explain this so I will try to show it ;)
} | ||
|
||
@NonNull | ||
private Func1<WriteOperationRetryStrategy.LongWriteFailure, Object> replaceByteBufferPositionForRetry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be a simple .onNext()
as it only passes the longWriteFailure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
I didn't find a moment Today to play with it. Tomorrow should be the day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have looked a bit and made a one small change. I have added more thoughts in this review. This work has potential to be really useful 👍
return new Func1<Throwable, WriteOperationRetryStrategy.LongWriteFailure>() { | ||
@Override | ||
public WriteOperationRetryStrategy.LongWriteFailure call(Throwable throwable) { | ||
final int failedBatchNumber = (byteBuffer.position() - batchSize) / batchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calculation will be wrong if the last batch will not be equal size to default batchSize
. Ideally we should be passing the batch size to this function from the writeBatchAndObserve().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, but what advantage compared to something like that instead:
if (byteBuffer.hasRemaining()) {
return (byteBuffer.position() - batchSize) / batchSize;
} else {
return byteBuffer.position() / batchSize;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we should emit the batches to write byte[]
which should be indexed. Those pairs of bytes and indexes should then be written (and retried) accordingly to the strategy to be in line with in as immutable approach as possible. I am not sure how would the performance turn up...
} | ||
|
||
@NonNull | ||
private Action1<WriteOperationRetryStrategy.LongWriteFailure> replaceByteBufferPositionForRetry() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe repositionByteBufferForRetry()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
} | ||
|
||
@NonNull | ||
private Func1<Throwable, Observable<Throwable>> canRetryError() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe toLongWriteFailureOrError()
? It would then look like .flatMap(toLongWriteFailureOrError())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean merge those two?
.flatMap(canRetryError())
.map(toLongWriteFailureObject())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as the the .map()
contains a pretty straightforward logic. Maybe toLongWriteFailureIfRetryable()
(not sure about the spelling)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've split them based on comment: #357 (comment) :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was exactly more about using .compose(writeOperationRetryStrategy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok :)
return emittedOnWriteFailure | ||
.flatMap(canRetryError()) | ||
.map(toLongWriteFailureObject()) | ||
.compose(writeOperationRetryStrategy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I have changed which makes it easier to read in my opinion. I will refactor the ACK strategy one day...
@@ -209,4 +214,58 @@ public Boolean call(Object emission) { | |||
} | |||
}; | |||
} | |||
|
|||
private static Func1<Observable<? extends Throwable>, Observable<?>> retryOperationStrategy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe errorIsRetriableAndAccordingTo(...)
? Then we could have .retryWhen(errorIsRetriableAndAccordingTo(writeOperationRetryStrategy,...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
I have pushed my latest modifications accordingly to your remarks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
I think that we should have tests for this feature
This looks well :)
|
||
private int calculateFailedBatchNumber(ByteBuffer byteBuffer, int batchSize) { | ||
if (byteBuffer.hasRemaining()) { | ||
return (byteBuffer.position() - batchSize) / batchSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(byteBuffer.position() / batchSize) - 1
seems slightly easier to understand for me :)
return emittedOnWriteFailure | ||
.flatMap(toLongWriteFailureOrError()) | ||
.compose(writeOperationRetryStrategy) | ||
.doOnNext(repositionByteBufferForRetry()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should go to above the .compose()
as the user could potentially emit a different LongWriteFailure
than they were passed which could flaw the reposition of the buffer.
Hello @martiwi - any news? Maybe I can help you with something? |
Hi there, |
Hey @martiwi ! Could you tick "Allow edits from maintainers" in your PR? I not able to contribute to unit tests. |
Hey @uKL it was already selected, @dariuszseweryn was able to publish before. I've unselected it, and re-selected it. Is it better now ? |
Ok, I was doing it wrong. :) |
Hi @dariuszseweryn - I've added one unit test but I am not sure I am using it right as when running the test in debug mode, the second batch is failing to write but retryWhen Observable is triggered twice. Meaning |
Hello @martiwi |
- Fixed issue that caused the last batch to be written with incorrect data (empty byte array) - Added support for errors that were occurring when operation was started (not from the gatt callback)
Great job @martiwi! I've added some tests and fixed two issues + a documentation update.
|
given: | ||
this.writeOperationRetryStrategy = givenWillRetryWriteOperation() | ||
this.writeOperationAckStrategy = new ImmediateSerializedBatchAckStrategy() | ||
prepareObjectUnderTest(2, [0x1, 0x1, 0x2, 0x2, 0x3, 0x3] as byte[]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make the last batch to have an uneven number of bytes? I think we can have a wrong ByteBuffer
positioning on the last batch if is not full
} | ||
|
||
private int calculateFailedBatchNumber(ByteBuffer byteBuffer, int batchSize) { | ||
return (int) Math.ceil(byteBuffer.position() / (float) batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't it return a failed batch number + 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a [1,2,3,4,5] and 2 bytes per batch
If the first batch fails the byteBuffer.position()
would return 2
2/2 == 1 and the failed batch index should be 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data size 5, batch size 2, failure at the last batch (3rd)
(int) Math.ceil(5 / (float) 2) => 3
Data size 6, batch size 2, failure at the last batch (3rd)
(int) Math.ceil(6 / (float) 2) => 3
Data size 5, batch size 3, failure at the last batch (2nd)
(int) Math.ceil(5 / (float) 3) => 2
Is this what you wanted to check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calculateFailedBatchNumber
is misleading in this situation, either we should return
(int) Math.ceil(byteBuffer.position() / (float) batchSize) - 1
or name the function differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It returns the "number" which I assumed starts from number one. LongWriteFailure
also assumes "number". Do you think we should switch to zero-based indexes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. This is user (developer) facing data
I think we should switch to zero-based indexes and rename the function to calculateFailedBatchIndex
and longWriteFailure.getBatchIndex()
respectively
Dear Contributor, similar to many open source projects we kindly request to sign our CLA if you'd like to contribute to our project. This license is for your protection as a Contributor as well as the protection of Polidea; it does not change your rights to use your own Contributions for any other purpose. You can find a link here: https://cla-assistant.io/Polidea/RxAndroidBle |
PR for issue #352