Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document uses of ConnectionSharingAdapter #185

Closed
RobLewis opened this issue Apr 27, 2017 · 18 comments
Closed

Document uses of ConnectionSharingAdapter #185

RobLewis opened this issue Apr 27, 2017 · 18 comments

Comments

@RobLewis
Copy link

Summary

Please provide some discussion of the use of ConnectionSharingAdapter

Preconditions

I/O on more than one Characteristic using a BLE connection


Apologies for double-posting this: I first tried StackOverflow but need an answer ASAP.

I have been developing an app for a few months that connects to a BLE peripheral. The device has 4 characteristics: 2 write and 2 notify, all working simultaneously. I also monitor connection state changes and BLE signal strength.

It seems to be working OK without the use of ConnectionSharingAdapter, which I have only recently discovered. Have I just been lucky? When I first obtain the BLE connection, I store it in a static variable and reference the variable when I need a connection.

Could we have more extensive documentation of ConnectionSharingAdapter—what it does, when it's appropriate to use, etc.?

@dariuszseweryn
Copy link
Owner

Hello @RobLewis
As for what the ConnectionSharingAdapter does - it is a very simple class and well named. It is an adapter that shares the RxBleConnection between different Subscribers of an Observable<RxBleConnection> (which comes most probably from RxBleDevice.establishConnection(boolean). You can look into the sources and check it out yourself.
When it is appropriate to use? I am against using it at all as any usage of reactive programming that needs more than one .subscribe() is introducing a state somewhere and it is harder to reason of what is actually going on. But you could potentially use it in a situation where you would want to perform more than one specific task independently and those tasks are independent of each other nor they can interfere (for instance reading two different characteristics).

@RobLewis
Copy link
Author

Thanks. I almost understand what you're saying.
I've been using the following method to deliver Connection Observables to my various subscribers, and it seems to work:

    Observable<RxBleConnection> getConnectionObservable() {
        if( isConnected() ) {
            Log.d( LOG_TAG, "getConnectionObservable used existing connection");
            return Observable.just( bleConnection ); // variable where I store connection
        }
        else {
            Log.d( LOG_TAG, "getConnectionObservable re-establishing connection...");
            return bleDevice.establishConnection( NO_AUTO_CONNECT ); 
        }
    }

I'm thinking to replace this method with:

    Observable<RxBleConnection> getConnectionObservable() {
        return sharedConnectionObservable;  
    }

And this:

    Observable<RxBleConnection> sharedConnectionObservable = this.bleDevice
            .establishConnection( NO_AUTO_CONNECT )
            .retry( CONNECTION_RETRIES ) 
            .compose( new ConnectionSharingAdapter() ); 

Am I on the right track?

@dariuszseweryn
Copy link
Owner

Should work.

@NitroG42
Copy link

Hi, can we avoid using this class when we need to share the connection between multiple activity ? Let's suppose my app allows to control a ble device and there is multiple parameters (read battery stat, change a setting later, etc...)
Shoudln't the connection be kept (using the ConnectionSharingAdapter) for it to be more reactive ?
The other way around is to keep the RxBleConnection somewhere and unsubscribe when unneeded ?

@RobLewis
Copy link
Author

As I noted in my OP, when I first obtain the BLE connection, I store it in a static variable and reference the variable when I need a connection. I use it to send data on 2 different characteristics, receive notifications on two others, and read the RSSI, all at the same time. It works across two different Activities and a bound Service. So I don't understand the need for a separate "connection sharing" component, though I would love to know the rationale for it!
At some point I will probably try inserting the ConnectionSharingAdapter to see its effect. But things seem to work fine without it.

I wrote up an analysis of the ConnectionSharingAdapter code for my own education. Here it is, if you're interested:

Observable.Transformer is a class with a call() method that takes Observable<T> as its argument and returns Observable<R>
Here, T and R are both RxBleConnection, so the Transformer implemented by ConnectionSharingAdapter 
is transforming Observable<RxBleConnection> into another Observable<RxBleConnection>. 
What it does: 
	Instance member connectionObservable is an uninitialized AtomicReference to Observable<RxBleConnection> (so it's initially null). 
	Thus connectionObservable has atomic get() and set() methods. Also, the call() method is synchronized on it. Should be thread safe. 
	This call() method is supposed to transform one Observable<RxBleConnection> ("source") into another when invoked by .compose(). 
		It starts by getting the current value of connectionObservable (initially null) and copying it to rxBleConnectionObservable. 
		If this rxBleConnectionObservable is not null, it is returned as the "transformed" Observable<RxBleConnection>. 
		If it is null
			it creates newConnectionObservable from the supplied source
			adds .doOnUnsubscribe() that sets connectionObservable to null
			adds .replay(1) that returns a ConnectableObservable (doesn't emit RxBleConnection until ConnectableObservable's connect() method is called)
				(When connect() is called, all waiting Subscribers are served, but you can add more Subscribers later.)
				Any new subscribers get "replayed" the last 1 item emitted, which in this case would be the RxBleConnection. 
			adds .refCount() that operates on a ConnectableObservable and returns an ordinary Observable: 
				when the first Observer subscribes, it connects to the underlying ConnectableObservable
				when the last Observer unsubscribes, it disconnects (i.e., unsubscribes from the underlying ConnectableObservable)
			sets the instance member connectionObservable to this newConnectionObservable
			returns this newConnectionObservable

Narrative: 
You use .establishConnection() to establish the RxBleConnection, then .compose() a new ConnectionSharingAdapter. 
It transforms the Observable<RxBleConnection> into a different one: 
The first Subscriber finds that the transformation hasn't taken place (connectionObservable is null), so 
a new Observable is created that
	automatically connects to the original Observable when the first Subscriber comes along
	delivers the single RxBleConnection to the first and all subsequent subscribers
	when the last subscriber unsubscribes, unsubscribes from the original Observable and sets connectionObservable null so it will have to be re-created

Honestly, I still don't understand why it's needed. Can anyone explain?

@dariuszseweryn
Copy link
Owner

For reference — in this stackoverflow answer is a part regarding ConnectionSharingAdapter

@RobLewis
Copy link
Author

I guess there is no point in leaving this open.

@uKL
Copy link
Collaborator

uKL commented Jul 12, 2017

I'll reopen it anyway, we'll document it more in the future.

@uKL uKL reopened this Jul 12, 2017
@streetsofboston
Copy link

streetsofboston commented Oct 12, 2017

I had some issues using the ConnectionSharingAdapter when dealing with a Rx chain that re-subscribes over and over again (e.g reading a characteristic periodically).

I had to use the following construct instead to prevent a re-connection to happen over and over again, when a characteristic is re-read. E.g:

// Scan for myServiceUUID, connect to it, and then read myCharUUID every 5 seconds
bleClient.scanBleDevices(settings)
        .filter { scanResult -> isMyService(scanResult.scanRecord.serviceUuids, myServiceUUID) }
        .take(1)
        .flatMap { scanResult -> scanResult.bleDevice.establishConnection(false) }
        .replay()
        .autoConnect(1)
        .flatMap { connection -> connection.readDescriptor(myCharUUID) }
        .repeatWhen { completed -> completed.delay(5, TimeUnit.SECONDS) }

I needed the replay and autoConnect(1) to make this happen. Using the ConnectionSharingAdapter, it kept reconnecting (kept calling establishConnection each time).

@dariuszseweryn
Copy link
Owner

Hello @streetsofboston
That is the intended behaviour for ConnectionSharingAdapter. It allows for sharing the connection but it is not persisting it if there are no subscribers. So depending on how you have written the flow with ConnectionSharingAdapter it would connect each time a read was about to happen or persist the connection. If you have a flow with a single .subscribe() (which should be the case) then the ConnectionSharingAdapter is not needed at all.

@streetsofboston
Copy link

Hi Darius,

In my example, the subscription happens over and over again, each 5 seconds, through the repeatWhen statement.

If I were to replace the replay and autoConnect with the ConnectionSharingAdapter, the re-subscriptions of the repeatWhen don't share the connection:

// Here, the myCharUUID is read every 5 seconds, 
// but the `establishConnection` is called each time as well.
bleClient.scanBleDevices(settings)
        .filter { scanResult -> isMyService(scanResult.scanRecord.serviceUuids, myServiceUUID) }
        .take(1)
        .flatMap { scanResult -> scanResult.bleDevice.establishConnection(false) }
        .compose(new ConnectionSharingAdapter())
        // Share the connection?
        .flatMap { connection -> connection.readCharacteristic(myCharUUID) }
        .repeatWhen { completed -> completed.delay(5, TimeUnit.SECONDS) }

If i'm understanding correctly, the ConnectionSharingAdapter shares connections as long as the subscription-count is larger than 0. As soon as all subscribers unsubscribe (either through a terminal state of the observable or through a un-subscribe), the connection will be closed and no longer shared. Is this correct?

@dariuszseweryn
Copy link
Owner

If i'm understanding correctly, the ConnectionSharingAdapter shares connections as long as the subscription-count is larger than 0. As soon as all subscribers unsubscribe (either through a terminal state of the observable or through a un-subscribe), the connection will be closed and no longer shared. Is this correct?

Yes, correct.

The above example would never complete until the connection would get lost though. The .establishConnection() is not completing on it's own unless an error happens or the downstream will unsubscribe. So the .repeatWhen() would never been called in the above code.

@streetsofboston
Copy link

Thanks!

The example I gave is just showing the creation of the Observable of that characteristic. It doesn't show any subscription to that Observable. If the code in the example is all the code that there is, none of those things would happen, indeed; no scanning, no connection, no reading 😄

@dariuszseweryn
Copy link
Owner

Yes but still even if you would subscribe to it-it would never complete and the .repeatWhen() would not be executed.

@streetsofboston
Copy link

streetsofboston commented Oct 12, 2017

I modified the example somewhat, but the code is basically the same that I have in my project.
The code works; after subscribing, every 5 seconds the readCharacteristic is called....

@dariuszseweryn
Copy link
Owner

There is a little bit of a difference between .flatMap { connection -> connection.readCharacteristic() }.repeatWhen() and .flatMap { connection -> connection.readCharacteristic().repeatWhen() } ;)

Either way- good that you have managed to get your use case to work.

To not be completely offtopic: I will try to wrap up the documentation for the ConnectionSharingAdapter in the near future (as this may be the only thing I may have time for).

Best Regards

@streetsofboston
Copy link

Thank you, Dariusz! This library is great and makes working with BLE sooooooo much easier!

@uKL
Copy link
Collaborator

uKL commented Mar 16, 2018

ConnectionSharingAdapter is going to be deprecated. Related to #379

@uKL uKL closed this as completed Mar 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants