Skip to content

Commit

Permalink
Add callbacks to be notified of connect/disconnect/error events
Browse files Browse the repository at this point in the history
This enables a partial fix/workaround for #6, which otherwise wouldn't even
be possible in its current state.

Future changes will allow the client to automatically reconnect when the
connection has been broken for any reason other than explicit disconnect().
  • Loading branch information
Joe Hansche committed Mar 27, 2017
1 parent d4f154b commit 5ee79d9
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public interface ParseLiveQueryClient {

void disconnect();

void registerListener(ParseLiveQueryClientCallbacks listener);

void unregisterListener(ParseLiveQueryClientCallbacks listener);

class Factory {

public static ParseLiveQueryClient getClient() {
Expand All @@ -38,5 +42,4 @@ static ParseLiveQueryClient getClient(URI uri, WebSocketClientFactory webSocketC
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.parse;

public interface ParseLiveQueryClientCallbacks {
void onLiveQueryClientConnected(ParseLiveQueryClient client);

void onLiveQueryClientDisconnected(ParseLiveQueryClient client);

void onLiveQueryError(ParseLiveQueryClient client, LiveQueryException reason);

void onSocketError(ParseLiveQueryClient client, Throwable reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

Expand All @@ -30,6 +32,8 @@
private final WebSocketClientFactory webSocketClientFactory;
private final WebSocketClient.WebSocketClientCallback webSocketClientCallback;

private final List<ParseLiveQueryClientCallbacks> mCallbacks = new ArrayList<>();

private WebSocketClient webSocketClient;
private int requestIdCount = 1;
private boolean userInitiatedDisconnect = false;
Expand Down Expand Up @@ -139,6 +143,16 @@ public void disconnect() {
}
}

@Override
public void registerListener(ParseLiveQueryClientCallbacks listener) {
mCallbacks.add(listener);
}

@Override
public void unregisterListener(ParseLiveQueryClientCallbacks listener) {
mCallbacks.remove(listener);
}

// Private methods

private synchronized int requestIdGenerator() {
Expand Down Expand Up @@ -189,6 +203,7 @@ private void parseMessage(String message) throws LiveQueryException {

switch (rawOperation) {
case "connected":
dispatchConnected();
Log.v(LOG_TAG, "Connected, sending pending subscription");
for (int i = 0; i < subscriptions.size(); i++) {
sendSubscription(subscriptions.valueAt(i));
Expand Down Expand Up @@ -231,6 +246,31 @@ private void parseMessage(String message) throws LiveQueryException {
}
}

private void dispatchConnected() {
for (ParseLiveQueryClientCallbacks callback : mCallbacks) {
callback.onLiveQueryClientConnected(this);
}
}

private void dispatchDisconnected() {
for (ParseLiveQueryClientCallbacks callback : mCallbacks) {
callback.onLiveQueryClientDisconnected(this);
}
}


private void dispatchServerError(LiveQueryException exc) {
for (ParseLiveQueryClientCallbacks callback : mCallbacks) {
callback.onLiveQueryError(this, exc);
}
}

private void dispatchSocketError(Throwable reason) {
for (ParseLiveQueryClientCallbacks callback : mCallbacks) {
callback.onSocketError(this, reason);
}
}

private <T extends ParseObject> void handleSubscribedEvent(JSONObject jsonObject) throws JSONException {
final int requestId = jsonObject.getInt("requestId");
final Subscription<T> subscription = subscriptionForRequestId(requestId);
Expand Down Expand Up @@ -263,9 +303,13 @@ private <T extends ParseObject> void handleErrorEvent(JSONObject jsonObject) thr
String error = jsonObject.getString("error");
Boolean reconnect = jsonObject.getBoolean("reconnect");
final Subscription<T> subscription = subscriptionForRequestId(requestId);
LiveQueryException exc = new LiveQueryException.ServerReportedException(code, error, reconnect);

if (subscription != null) {
subscription.didEncounter(new LiveQueryException.ServerReportedException(code, error, reconnect), subscription.getQuery());
subscription.didEncounter(exc, subscription.getQuery());
}

dispatchServerError(exc);
}

private <T extends ParseObject> Subscription<T> subscriptionForRequestId(int requestId) {
Expand Down Expand Up @@ -341,11 +385,13 @@ public Void then(Task<Void> task) {
@Override
public void onClose() {
Log.v(LOG_TAG, "Socket onClose");
dispatchDisconnected();
}

@Override
public void onError(Throwable exception) {
Log.e(LOG_TAG, "Socket onError", exception);
dispatchSocketError(exception);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;
import org.robolectric.util.Transcript;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.Queue;
Expand Down Expand Up @@ -398,6 +399,46 @@ public void testDisconnectOnBackgroundThread() throws Exception {
verify(webSocketClient, times(1)).close();
}

@Test
public void testCallbackNotifiedOnDisconnect() throws Exception {
LoggingCallbacks callbacks = new LoggingCallbacks();
parseLiveQueryClient.registerListener(callbacks);
callbacks.transcript.assertNoEventsSoFar();

webSocketClientCallback.onClose();
callbacks.transcript.assertEventsSoFar("onLiveQueryClientDisconnected");
}

@Test
public void testCallbackNotifiedOnConnect() throws Exception {
LoggingCallbacks callbacks = new LoggingCallbacks();
parseLiveQueryClient.registerListener(callbacks);
callbacks.transcript.assertNoEventsSoFar();

reconnect();
callbacks.transcript.assertEventsSoFar("onLiveQueryClientConnected");
}

@Test
public void testCallbackNotifiedOnSocketError() throws Exception {
LoggingCallbacks callbacks = new LoggingCallbacks();
parseLiveQueryClient.registerListener(callbacks);
callbacks.transcript.assertNoEventsSoFar();

webSocketClientCallback.onError(new IOException("bad things happened"));
callbacks.transcript.assertEventsSoFar("onSocketError: java.io.IOException: bad things happened");
}

@Test
public void testCallbackNotifiedOnServerError() throws Exception {
LoggingCallbacks callbacks = new LoggingCallbacks();
parseLiveQueryClient.registerListener(callbacks);
callbacks.transcript.assertNoEventsSoFar();

webSocketClientCallback.onMessage(createErrorMessage(1).toString());
callbacks.transcript.assertEventsSoFar("onLiveQueryError: com.parse.LiveQueryException$ServerReportedException: Server reported error; code: 1, error: testError, reconnect: true");
}

private SubscriptionHandling<ParseObject> createSubscription(ParseQuery<ParseObject> parseQuery,
SubscriptionHandling.HandleSubscribeCallback<ParseObject> subscribeMockCallback) throws Exception {
SubscriptionHandling<ParseObject> subscriptionHandling = parseLiveQueryClient.subscribe(parseQuery).handleSubscribe(subscribeMockCallback);
Expand Down Expand Up @@ -498,6 +539,30 @@ private static JSONObject createObjectDeleteMessage(int requestId, ParseObject p
return jsonObject;
}

private static class LoggingCallbacks implements ParseLiveQueryClientCallbacks {
final Transcript transcript = new Transcript();

@Override
public void onLiveQueryClientConnected(ParseLiveQueryClient client) {
transcript.add("onLiveQueryClientConnected");
}

@Override
public void onLiveQueryClientDisconnected(ParseLiveQueryClient client) {
transcript.add("onLiveQueryClientDisconnected");
}

@Override
public void onLiveQueryError(ParseLiveQueryClient client, LiveQueryException reason) {
transcript.add("onLiveQueryError: " + reason);
}

@Override
public void onSocketError(ParseLiveQueryClient client, Throwable reason) {
transcript.add("onSocketError: " + reason);
}
}

private static class PauseableExecutor implements Executor {
private boolean isPaused = false;
private final Queue<Runnable> queue = new LinkedList<>();
Expand Down

0 comments on commit 5ee79d9

Please sign in to comment.