Skip to content

Commit

Permalink
Fixed calling BluetoothGatt.disconnect() on a proper thread.
Browse files Browse the repository at this point in the history
Summary: #84

Reviewers: michal.zielinski, pawel.urban

Reviewed By: pawel.urban

Differential Revision: https://phabricator.polidea.com/D1906
  • Loading branch information
dariuszseweryn committed Oct 25, 2016
1 parent ae5029f commit 484e88f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.polidea.rxandroidble.internal.util.BleConnectionCompat;

import java.util.concurrent.atomic.AtomicReference;
import rx.android.schedulers.AndroidSchedulers;

public class RxBleConnectionConnectorOperationsProvider {

Expand Down Expand Up @@ -37,7 +38,8 @@ public RxBleOperations provide(Context context,
final RxBleRadioOperationDisconnect operationDisconnect = new RxBleRadioOperationDisconnect(
gattCallback,
bluetoothGattAtomicReference,
(BluetoothManager) context.getSystemService(Context.BLUETOOTH_SERVICE)
(BluetoothManager) context.getSystemService(Context.BLUETOOTH_SERVICE),
AndroidSchedulers.mainThread()
);
// getBluetoothGatt completed when the connection is unsubscribed
operationConnect.getBluetoothGatt().subscribe(bluetoothGattAtomicReference::set, ignored -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,32 @@
package com.polidea.rxandroidble.internal.operations;

import static rx.Observable.just;

import android.bluetooth.BluetoothGatt;
import android.bluetooth.BluetoothManager;
import android.bluetooth.BluetoothProfile;

import com.polidea.rxandroidble.RxBleConnection;
import com.polidea.rxandroidble.internal.RxBleRadioOperation;
import com.polidea.rxandroidble.internal.connection.RxBleGattCallback;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;

import static rx.Observable.just;
import rx.Scheduler;

public class RxBleRadioOperationDisconnect extends RxBleRadioOperation<Void> {

private static final int TIMEOUT_DISCONNECT = 10;
private final RxBleGattCallback rxBleGattCallback;
private final AtomicReference<BluetoothGatt> bluetoothGattAtomicReference;
private final BluetoothManager bluetoothManager;
private final Scheduler mainThreadScheduler;

public RxBleRadioOperationDisconnect(RxBleGattCallback rxBleGattCallback, AtomicReference<BluetoothGatt> bluetoothGattAtomicReference,
BluetoothManager bluetoothManager) {
BluetoothManager bluetoothManager, Scheduler mainThreadScheduler) {
this.rxBleGattCallback = rxBleGattCallback;
this.bluetoothGattAtomicReference = bluetoothGattAtomicReference;
this.bluetoothManager = bluetoothManager;
this.mainThreadScheduler = mainThreadScheduler;
}

@Override
Expand All @@ -37,7 +36,7 @@ protected void protectedRun() {
.filter(bluetoothGatt -> bluetoothGatt != null)
.flatMap(bluetoothGatt -> isDisconnected(bluetoothGatt) ? just(bluetoothGatt) : disconnect(bluetoothGatt))
.doOnTerminate(() -> releaseRadio())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(mainThreadScheduler)
.subscribe(
bluetoothGatt -> bluetoothGatt.close(),
throwable -> onError(throwable),
Expand All @@ -56,14 +55,26 @@ private boolean isDisconnected(BluetoothGatt bluetoothGatt) {
* 2. The same BluetoothGatt - in this situation we should probably cancel the pending BluetoothGatt.close() call
*/
private Observable<BluetoothGatt> disconnect(BluetoothGatt bluetoothGatt) {
return rxBleGattCallback
.getOnConnectionStateChange()
.doOnSubscribe(bluetoothGatt::disconnect)
// It should never happen because if connection was never acquired then it will complete earlier.
// Just in case timeout here.
.timeout(TIMEOUT_DISCONNECT, TimeUnit.SECONDS, just(RxBleConnection.RxBleConnectionState.DISCONNECTED))
.filter(rxBleConnectionState -> rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTED)
.take(1)
.map(rxBleConnectionState -> bluetoothGatt);
return new DisconnectGattObservable(bluetoothGatt, rxBleGattCallback, mainThreadScheduler)
.timeout(TIMEOUT_DISCONNECT, TimeUnit.SECONDS, just(bluetoothGatt));
}

private static class DisconnectGattObservable extends Observable<BluetoothGatt> {

DisconnectGattObservable(
BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler disconnectScheduler
) {
super(subscriber -> {
rxBleGattCallback
.getOnConnectionStateChange()
.filter(rxBleConnectionState -> rxBleConnectionState == RxBleConnection.RxBleConnectionState.DISCONNECTED)
.take(1)
.map(rxBleConnectionState -> bluetoothGatt)
.subscribe(subscriber);
disconnectScheduler.createWorker().schedule(bluetoothGatt::disconnect);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import rx.Scheduler
import rx.android.plugins.RxAndroidPlugins
import rx.android.plugins.RxAndroidSchedulersHook
import rx.android.schedulers.AndroidSchedulers
import rx.internal.schedulers.ImmediateScheduler
import rx.observers.TestSubscriber
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
Expand Down Expand Up @@ -133,7 +134,7 @@ public class RxBleRadioOperationDisconnectTest extends Specification {
}

private prepareObjectUnderTest() {
objectUnderTest = new RxBleRadioOperationDisconnect(mockGattCallback, gattAtomicReference, mockBluetoothManager)
objectUnderTest = new RxBleRadioOperationDisconnect(mockGattCallback, gattAtomicReference, mockBluetoothManager, ImmediateScheduler.INSTANCE)
objectUnderTest.setRadioBlockingSemaphore(mockSemaphore)
objectUnderTest.asObservable().subscribe(testSubscriber)
}
Expand Down

0 comments on commit 484e88f

Please sign in to comment.