Skip to content

Commit

Permalink
Merge pull request #678 from ably/574-RSC7c-addRequestIds-clientOptions
Browse files Browse the repository at this point in the history
Add request_id query param if addRequestIds is enabled
  • Loading branch information
QuintinWillison authored Jul 28, 2021
2 parents 370a0f1 + 636842a commit 28d2f81
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 69 deletions.
23 changes: 5 additions & 18 deletions android/src/main/java/io/ably/lib/push/ActivationStateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.ably.lib.types.*;
import io.ably.lib.util.IntentUtils;
import io.ably.lib.util.Log;
import io.ably.lib.util.ParamsUtils;
import io.ably.lib.util.Serialisation;

public class ActivationStateMachine {
Expand Down Expand Up @@ -135,10 +136,7 @@ public ActivationStateMachine.State transition(final ActivationStateMachine.Even
ably.http.request(new Http.Execute<JsonObject>() {
@Override
public void execute(HttpScheduler http, Callback<JsonObject> callback) throws AblyException {
Param[] params = null;
if(ably.options.pushFullWait) {
params = Param.push(null, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, ably.options);
/* this is authenticated using the Ably library credentials, plus the deviceSecret in the request body */
http.post("/push/deviceRegistrations", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, body, new Serialisation.HttpResponseHandler<JsonObject>(), true, callback);
}
Expand Down Expand Up @@ -425,11 +423,7 @@ private void updateRegistration() {
ably.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}

Param[] params = ParamsUtils.enrichParams(null, ably.options);
http.patch("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, body, null, false, callback);
}
}).async(new Callback<Void>() {
Expand Down Expand Up @@ -473,11 +467,7 @@ private void validateRegistration() {
ably.http.request(new Http.Execute<JsonObject>() {
@Override
public void execute(HttpScheduler http, Callback<JsonObject> callback) throws AblyException {
Param[] params = null;
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}

Param[] params = ParamsUtils.enrichParams(null, ably.options);
final HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(device.toJsonObject(), ably.options.useBinaryProtocol);
http.put("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, body, new Serialisation.HttpResponseHandler<JsonObject>(), true, callback);
}
Expand Down Expand Up @@ -521,10 +511,7 @@ private void deregister() {
ably.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = new Param[0];
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(new Param[0], ably.options);
http.del("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, null, true, callback);
}
}).async(new Callback<Void>() {
Expand Down
12 changes: 3 additions & 9 deletions android/src/main/java/io/ably/lib/push/PushChannel.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.ably.lib.push;

import android.content.Context;
import com.google.gson.JsonObject;
import io.ably.lib.http.*;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Channel;
import io.ably.lib.rest.DeviceDetails;
import io.ably.lib.types.*;
import io.ably.lib.util.ParamsUtils;

public class PushChannel {
protected final Channel channel;
Expand Down Expand Up @@ -64,10 +64,7 @@ protected Http.Request<Void> postSubscription(JsonObject bodyJson) {
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.post("/push/channelSubscriptions", rest.push.pushRequestHeaders(true), params, body, null, true, callback);
}
});
Expand Down Expand Up @@ -109,10 +106,7 @@ protected Http.Request<Void> unsubscribeDeviceImpl() {
}

protected Http.Request<Void> delSubscription(Param[] params) {
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = params;
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down
13 changes: 11 additions & 2 deletions lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ private AblyRequestWithFallback(
this.path = path;
this.requireAblyAuth = requireAblyAuth;
}

private String extendMessage(String msg) {
return Param.getFirst(params, "request_id") == null ?
msg : String.format("%s request_id=%s", msg, Param.getFirst(params, "request_id"));
}

@Override
public void run() {
String candidateHost = httpCore.hosts.getPreferredHost();
Expand All @@ -202,17 +208,20 @@ public void run() {
break;
} catch (AblyException.HostFailedException e) {
if(--retryCountRemaining < 0) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
}
Log.d(TAG, "Connection failed to host `" + candidateHost + "`. Searching for new host...");
Log.d(TAG, extendMessage("Connection failed to host `" + candidateHost + "`. Searching for new host..."));
candidateHost = httpCore.hosts.getFallback(candidateHost);
if (candidateHost == null) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
}
Log.d(TAG, "Switched to `" + candidateHost + "`.");
Log.d(TAG, extendMessage("Switched to `" + candidateHost + "`."));
} catch(AblyException e) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
} finally {
Expand Down
39 changes: 9 additions & 30 deletions lib/src/main/java/io/ably/lib/push/PushBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import io.ably.lib.util.Log;
import io.ably.lib.util.ParamsUtils;
import io.ably.lib.util.Serialisation;
import io.ably.lib.util.StringUtils;

Expand Down Expand Up @@ -71,11 +72,7 @@ public void execute(HttpScheduler http, Callback<Void> callback) throws AblyExce
bodyJson.add(entry.getKey(), entry.getValue());
}
HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(bodyJson, rest.options.useBinaryProtocol);

Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);

http.post("/push/publish", HttpUtils.defaultAcceptHeaders(rest.options.useBinaryProtocol), params, body, null, true, callback);
}
Expand All @@ -101,11 +98,8 @@ protected Http.Request<DeviceDetails> saveImpl(final DeviceDetails device) {
final HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(device.toJsonObject(), rest.options.useBinaryProtocol);
return rest.http.request(new Http.Execute<DeviceDetails>() {
@Override
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) {
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.put("/push/deviceRegistrations/" + device.id, rest.push.pushRequestHeaders(device.id), params, body, DeviceDetails.httpResponseHandler, true, callback);
}
});
Expand All @@ -124,10 +118,7 @@ protected Http.Request<DeviceDetails> getImpl(final String deviceId) {
return rest.http.request(new Http.Execute<DeviceDetails>() {
@Override
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.get("/push/deviceRegistrations/" + deviceId, rest.push.pushRequestHeaders(deviceId), params, DeviceDetails.httpResponseHandler, true, callback);
}
});
Expand Down Expand Up @@ -167,10 +158,7 @@ protected Http.Request<Void> removeImpl(final String deviceId) {
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.del("/push/deviceRegistrations/" + deviceId, rest.push.pushRequestHeaders(deviceId), params, null, true, callback);
}
});
Expand All @@ -186,10 +174,7 @@ public void removeWhereAsync(Param[] params, CompletionListener listener) {

protected Http.Request<Void> removeWhereImpl(Param[] params) {
Log.v(TAG, "removeWhereImpl(): params=" + Arrays.toString(params));
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = params;
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down Expand Up @@ -222,10 +207,7 @@ protected Http.Request<ChannelSubscription> saveImpl(final ChannelSubscription s
return rest.http.request(new Http.Execute<ChannelSubscription>() {
@Override
public void execute(HttpScheduler http, Callback<ChannelSubscription> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.post("/push/channelSubscriptions", rest.push.pushRequestHeaders(subscription.deviceId), params, body, ChannelSubscription.httpResponseHandler, true, callback);
}
});
Expand Down Expand Up @@ -279,11 +261,8 @@ public void removeWhereAsync(Param[] params, CompletionListener listener) {
protected Http.Request<Void> removeWhereImpl(Param[] params) {
Log.v(TAG, "removeWhereImpl(): params=" + Arrays.toString(params));
String deviceId = HttpUtils.getParam(params, "deviceId");
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
final Param[] finalHeaders = rest.push.pushRequestHeaders(deviceId);
final Param[] finalParams = params;
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down
8 changes: 5 additions & 3 deletions lib/src/main/java/io/ably/lib/rest/AblyBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ public void timeAsync(Callback<Long> callback) {
}

private Http.Request<Long> timeImpl() {
final Param[] params = this.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
return http.request(new Http.Execute<Long>() {
@Override
public void execute(HttpScheduler http, Callback<Long> callback) throws AblyException {
http.get("/time", HttpUtils.defaultAcceptHeaders(false), null, new HttpCore.ResponseHandler<Long>() {
http.get("/time", HttpUtils.defaultAcceptHeaders(false), params, new HttpCore.ResponseHandler<Long>() {
@Override
public Long handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
if(error != null) {
Expand Down Expand Up @@ -251,7 +252,7 @@ public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOp
publishBatchImpl(pubSpecs, channelOptions, params).async(callback);
}

private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] params) throws AblyException {
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] initialParams) throws AblyException {
boolean hasClientSuppliedId = false;
for(Message.Batch spec : pubSpecs) {
for(Message message : spec.messages) {
Expand All @@ -264,7 +265,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
}
if(!hasClientSuppliedId && options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
String messageId = Crypto.getRandomId();
for (int i = 0; i < spec.messages.length; i++) {
spec.messages[i].id = messageId + ':' + i;
}
Expand All @@ -274,6 +275,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
@Override
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
@Override
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
Expand Down
14 changes: 9 additions & 5 deletions lib/src/main/java/io/ably/lib/rest/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ public void execute(HttpScheduler http, final Callback<Void> callback) throws Ab
}
if(!hasClientSuppliedId && ably.options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
String messageId = Crypto.getRandomId();
for (int i = 0; i < messages.length; i++) {
messages[i].id = messageId + ':' + i;
}
}

HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages);
final Param[] params = ably.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c

http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), null, requestBody, null, true, callback);
http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, null, true, callback);
}
});
}
Expand Down Expand Up @@ -139,8 +140,9 @@ public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<Message>>
historyImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] initialParams) {
HttpCore.BodyHandler<Message> bodyHandler = MessageSerializer.getMessageResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<Message>(ably.http, basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand Down Expand Up @@ -169,8 +171,9 @@ public void getAsync(Param[] params, Callback<AsyncPaginatedResult<PresenceMessa
getImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<PresenceMessage> getImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<PresenceMessage> getImpl(Param[] initialParams) {
HttpCore.BodyHandler<PresenceMessage> bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<PresenceMessage>(ably.http, basePath + "/presence", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand All @@ -195,8 +198,9 @@ public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<PresenceM
historyImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] initialParams) {
HttpCore.BodyHandler<PresenceMessage> bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<PresenceMessage>(ably.http, basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand Down
6 changes: 6 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,10 @@ public ClientOptions(String key) throws AblyException {
* before responding.
*/
public boolean pushFullWait = false;

/**
If enabled, every REST request to Ably includes a `request_id` query string parameter. This request ID
remain the same if a request is retried to a fallback host.
*/
public boolean addRequestIds = false;
}
Loading

0 comments on commit 28d2f81

Please sign in to comment.