Skip to content

Commit

Permalink
Implement idempotent REST publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
paddybyers committed Nov 19, 2018
1 parent 4439cbd commit b0d68c6
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 19 deletions.
4 changes: 2 additions & 2 deletions lib/src/main/java/io/ably/lib/debug/DebugOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public interface RawProtocolListener {

public interface RawHttpListener {
public void onRawHttpRequest(String id, HttpURLConnection conn, String method, String authHeader, Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody);
public void onRawHttpResponse(String id, HttpCore.Response response);
public void onRawHttpException(String id, Throwable t);
public void onRawHttpResponse(String id, String method, HttpCore.Response response);
public void onRawHttpException(String id, String method, Throwable t);
}

public DebugOptions(String key) throws AblyException { super(key); }
Expand Down
5 changes: 3 additions & 2 deletions lib/src/main/java/io/ably/lib/http/HttpCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,12 @@ <T> T httpExecute(HttpURLConnection conn, String method, Param[] headers, Reques
}
response = readResponse(conn);
if(rawHttpListener != null) {
rawHttpListener.onRawHttpResponse(id, response);
rawHttpListener.onRawHttpResponse(id, method, response);
}
} catch(IOException ioe) {
ioe.printStackTrace();
if(rawHttpListener != null) {
rawHttpListener.onRawHttpException(id, ioe);
rawHttpListener.onRawHttpException(id, method, ioe);
}
throw AblyException.fromThrowable(ioe);
}
Expand Down
12 changes: 12 additions & 0 deletions lib/src/main/java/io/ably/lib/rest/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.ably.lib.types.Param;
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.PresenceSerializer;
import io.ably.lib.util.Crypto;

/**
* A class representing a Channel in the Ably REST API.
Expand Down Expand Up @@ -93,11 +94,22 @@ private Http.Request<Void> publishImpl(final Message[] messages) {
return ably.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, final Callback<Void> callback) throws AblyException {
/* handle message ids */
boolean hasClientSuppliedId = false;
for(Message message : messages) {
/* RSL1k2 */
hasClientSuppliedId |= (message.id != null);
/* RTL6g3 */
ably.auth.checkClientId(message, true, false);
message.encode(options);
}
if(!hasClientSuppliedId && ably.options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
for (int i = 0; i < messages.length; i++) {
messages[i].id = messageId + ':' + i;
}
}

HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages);

Expand Down
5 changes: 4 additions & 1 deletion lib/src/main/java/io/ably/lib/transport/Defaults.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import io.ably.lib.BuildConfig;
import io.ably.lib.types.ClientOptions;

import java.text.DecimalFormat;

public class Defaults {
/* versions */
public static final String ABLY_VERSION = "1.0";
public static final float ABLY_VERSION_NUMBER = 1.0f;
public static final String ABLY_VERSION = new DecimalFormat("0.0").format(ABLY_VERSION_NUMBER);
public static final String ABLY_LIB_VERSION = String.format("%s-%s", BuildConfig.LIBRARY_NAME, BuildConfig.VERSION);

/* params */
Expand Down
5 changes: 5 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public ClientOptions(String key) throws AblyException {
*/
public String environment;

/**
* Spec: TO3n
*/
public boolean idempotentRestPublishing = (Defaults.ABLY_VERSION_NUMBER >= 1.2);

/**
* Spec: TO313
*/
Expand Down
6 changes: 6 additions & 0 deletions lib/src/main/java/io/ably/lib/util/Crypto.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ private static final int getPaddedLength(int plaintextLength) {
};
}

public static String getRandomMessageId() {
byte[] entropy = new byte[9];
secureRandom.nextBytes(entropy);
return Base64Coder.encode(entropy).toString();
}

/**
* Determine whether or not 256-bit AES is supported. (If this determines that
* it is not supported, install the JCE unlimited strength JCE extensions).
Expand Down
4 changes: 2 additions & 2 deletions lib/src/test/java/io/ably/lib/test/common/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ public void onRawHttpRequest(String id, HttpURLConnection conn, String method, S
}

@Override
public void onRawHttpResponse(String id, HttpCore.Response response) {
public void onRawHttpResponse(String id, String method, HttpCore.Response response) {
/* duplicating if necessary, ensure lower-case versions of header names are present */
Map<String, List<String>> headers = response.headers;
Map<String, List<String>> normalisedHeaders = new HashMap<String, List<String>>();
Expand All @@ -744,7 +744,7 @@ public void onRawHttpResponse(String id, HttpCore.Response response) {
}

@Override
public void onRawHttpException(String id, Throwable t) {
public void onRawHttpException(String id, String method, Throwable t) {
RawHttpRequest req = get(id);
if(req != null) {
req.error = t;
Expand Down
8 changes: 4 additions & 4 deletions lib/src/test/java/io/ably/lib/test/rest/RestAuthTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1390,10 +1390,10 @@ public void onRawHttpRequest(String id, HttpURLConnection conn, String method, S
}

@Override
public void onRawHttpResponse(String id, HttpCore.Response response) {}
public void onRawHttpResponse(String id, String method, HttpCore.Response response) {}

@Override
public void onRawHttpException(String id, Throwable t) {}
public void onRawHttpException(String id, String method, Throwable t) {}
};
}};
fillInOptions(options);
Expand Down Expand Up @@ -1457,10 +1457,10 @@ public void onRawHttpRequest(String id, HttpURLConnection conn, String method, S
}

@Override
public void onRawHttpResponse(String id, HttpCore.Response response) {}
public void onRawHttpResponse(String id, String method, HttpCore.Response response) {}

@Override
public void onRawHttpException(String id, Throwable t) {}
public void onRawHttpException(String id, String method, Throwable t) {}
};
}};
fillInOptions(options);
Expand Down
Loading

0 comments on commit b0d68c6

Please sign in to comment.