Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request_id query param if addRequestIds is enabled #678

Merged
merged 7 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions android/src/main/java/io/ably/lib/push/ActivationStateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.ably.lib.types.*;
import io.ably.lib.util.IntentUtils;
import io.ably.lib.util.Log;
import io.ably.lib.util.ParamsUtils;
import io.ably.lib.util.Serialisation;

public class ActivationStateMachine {
Expand Down Expand Up @@ -135,10 +136,7 @@ public ActivationStateMachine.State transition(final ActivationStateMachine.Even
ably.http.request(new Http.Execute<JsonObject>() {
@Override
public void execute(HttpScheduler http, Callback<JsonObject> callback) throws AblyException {
Param[] params = null;
if(ably.options.pushFullWait) {
params = Param.push(null, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, ably.options);
/* this is authenticated using the Ably library credentials, plus the deviceSecret in the request body */
http.post("/push/deviceRegistrations", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, body, new Serialisation.HttpResponseHandler<JsonObject>(), true, callback);
}
Expand Down Expand Up @@ -425,11 +423,7 @@ private void updateRegistration() {
ably.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}

Param[] params = ParamsUtils.enrichParams(null, ably.options);
http.patch("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, body, null, false, callback);
}
}).async(new Callback<Void>() {
Expand Down Expand Up @@ -473,11 +467,7 @@ private void validateRegistration() {
ably.http.request(new Http.Execute<JsonObject>() {
@Override
public void execute(HttpScheduler http, Callback<JsonObject> callback) throws AblyException {
Param[] params = null;
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}

Param[] params = ParamsUtils.enrichParams(null, ably.options);
final HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(device.toJsonObject(), ably.options.useBinaryProtocol);
http.put("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, body, new Serialisation.HttpResponseHandler<JsonObject>(), true, callback);
}
Expand Down Expand Up @@ -521,10 +511,7 @@ private void deregister() {
ably.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = new Param[0];
if (ably.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(new Param[0], ably.options);
http.del("/push/deviceRegistrations/" + device.id, ably.push.pushRequestHeaders(true), params, null, true, callback);
}
}).async(new Callback<Void>() {
Expand Down
12 changes: 3 additions & 9 deletions android/src/main/java/io/ably/lib/push/PushChannel.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.ably.lib.push;

import android.content.Context;
import com.google.gson.JsonObject;
import io.ably.lib.http.*;
import io.ably.lib.realtime.CompletionListener;
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Channel;
import io.ably.lib.rest.DeviceDetails;
import io.ably.lib.types.*;
import io.ably.lib.util.ParamsUtils;

public class PushChannel {
protected final Channel channel;
Expand Down Expand Up @@ -64,10 +64,7 @@ protected Http.Request<Void> postSubscription(JsonObject bodyJson) {
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.post("/push/channelSubscriptions", rest.push.pushRequestHeaders(true), params, body, null, true, callback);
}
});
Expand Down Expand Up @@ -109,10 +106,7 @@ protected Http.Request<Void> unsubscribeDeviceImpl() {
}

protected Http.Request<Void> delSubscription(Param[] params) {
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = params;
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down
13 changes: 11 additions & 2 deletions lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ private AblyRequestWithFallback(
this.path = path;
this.requireAblyAuth = requireAblyAuth;
}

private String extendMessage(String msg) {
return Param.getFirst(params, "request_id") == null ?
msg : String.format("%s request_id=%s", msg, Param.getFirst(params, "request_id"));
}

@Override
public void run() {
String candidateHost = httpCore.hosts.getPreferredHost();
Expand All @@ -202,17 +208,20 @@ public void run() {
break;
} catch (AblyException.HostFailedException e) {
if(--retryCountRemaining < 0) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
}
Log.d(TAG, "Connection failed to host `" + candidateHost + "`. Searching for new host...");
Log.d(TAG, extendMessage("Connection failed to host `" + candidateHost + "`. Searching for new host..."));
candidateHost = httpCore.hosts.getFallback(candidateHost);
if (candidateHost == null) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
}
Log.d(TAG, "Switched to `" + candidateHost + "`.");
Log.d(TAG, extendMessage("Switched to `" + candidateHost + "`."));
} catch(AblyException e) {
e.errorInfo.message = extendMessage(e.errorInfo.message);
setError(e.errorInfo);
break;
} finally {
Expand Down
39 changes: 9 additions & 30 deletions lib/src/main/java/io/ably/lib/push/PushBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.ably.lib.types.PaginatedResult;
import io.ably.lib.types.Param;
import io.ably.lib.util.Log;
import io.ably.lib.util.ParamsUtils;
import io.ably.lib.util.Serialisation;
import io.ably.lib.util.StringUtils;

Expand Down Expand Up @@ -71,11 +72,7 @@ public void execute(HttpScheduler http, Callback<Void> callback) throws AblyExce
bodyJson.add(entry.getKey(), entry.getValue());
}
HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(bodyJson, rest.options.useBinaryProtocol);

Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);

http.post("/push/publish", HttpUtils.defaultAcceptHeaders(rest.options.useBinaryProtocol), params, body, null, true, callback);
}
Expand All @@ -101,11 +98,8 @@ protected Http.Request<DeviceDetails> saveImpl(final DeviceDetails device) {
final HttpCore.RequestBody body = HttpUtils.requestBodyFromGson(device.toJsonObject(), rest.options.useBinaryProtocol);
return rest.http.request(new Http.Execute<DeviceDetails>() {
@Override
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) {
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.put("/push/deviceRegistrations/" + device.id, rest.push.pushRequestHeaders(device.id), params, body, DeviceDetails.httpResponseHandler, true, callback);
}
});
Expand All @@ -124,10 +118,7 @@ protected Http.Request<DeviceDetails> getImpl(final String deviceId) {
return rest.http.request(new Http.Execute<DeviceDetails>() {
@Override
public void execute(HttpScheduler http, Callback<DeviceDetails> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.get("/push/deviceRegistrations/" + deviceId, rest.push.pushRequestHeaders(deviceId), params, DeviceDetails.httpResponseHandler, true, callback);
}
});
Expand Down Expand Up @@ -167,10 +158,7 @@ protected Http.Request<Void> removeImpl(final String deviceId) {
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.del("/push/deviceRegistrations/" + deviceId, rest.push.pushRequestHeaders(deviceId), params, null, true, callback);
}
});
Expand All @@ -186,10 +174,7 @@ public void removeWhereAsync(Param[] params, CompletionListener listener) {

protected Http.Request<Void> removeWhereImpl(Param[] params) {
Log.v(TAG, "removeWhereImpl(): params=" + Arrays.toString(params));
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = params;
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down Expand Up @@ -222,10 +207,7 @@ protected Http.Request<ChannelSubscription> saveImpl(final ChannelSubscription s
return rest.http.request(new Http.Execute<ChannelSubscription>() {
@Override
public void execute(HttpScheduler http, Callback<ChannelSubscription> callback) throws AblyException {
Param[] params = null;
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
Param[] params = ParamsUtils.enrichParams(null, rest.options);
http.post("/push/channelSubscriptions", rest.push.pushRequestHeaders(subscription.deviceId), params, body, ChannelSubscription.httpResponseHandler, true, callback);
}
});
Expand Down Expand Up @@ -279,11 +261,8 @@ public void removeWhereAsync(Param[] params, CompletionListener listener) {
protected Http.Request<Void> removeWhereImpl(Param[] params) {
Log.v(TAG, "removeWhereImpl(): params=" + Arrays.toString(params));
String deviceId = HttpUtils.getParam(params, "deviceId");
if (rest.options.pushFullWait) {
params = Param.push(params, "fullWait", "true");
}
final Param[] finalParams = ParamsUtils.enrichParams(params, rest.options);
final Param[] finalHeaders = rest.push.pushRequestHeaders(deviceId);
final Param[] finalParams = params;
return rest.http.request(new Http.Execute<Void>() {
@Override
public void execute(HttpScheduler http, Callback<Void> callback) throws AblyException {
Expand Down
8 changes: 5 additions & 3 deletions lib/src/main/java/io/ably/lib/rest/AblyBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ public void timeAsync(Callback<Long> callback) {
}

private Http.Request<Long> timeImpl() {
final Param[] params = this.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
return http.request(new Http.Execute<Long>() {
@Override
public void execute(HttpScheduler http, Callback<Long> callback) throws AblyException {
http.get("/time", HttpUtils.defaultAcceptHeaders(false), null, new HttpCore.ResponseHandler<Long>() {
http.get("/time", HttpUtils.defaultAcceptHeaders(false), params, new HttpCore.ResponseHandler<Long>() {
@Override
public Long handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
if(error != null) {
Expand Down Expand Up @@ -251,7 +252,7 @@ public void publishBatchAsync(Message.Batch[] pubSpecs, ChannelOptions channelOp
publishBatchImpl(pubSpecs, channelOptions, params).async(callback);
}

private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] params) throws AblyException {
private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] pubSpecs, ChannelOptions channelOptions, final Param[] initialParams) throws AblyException {
boolean hasClientSuppliedId = false;
for(Message.Batch spec : pubSpecs) {
for(Message message : spec.messages) {
Expand All @@ -264,7 +265,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
}
if(!hasClientSuppliedId && options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
String messageId = Crypto.getRandomId();
for (int i = 0; i < spec.messages.length; i++) {
spec.messages[i].id = messageId + ':' + i;
}
Expand All @@ -274,6 +275,7 @@ private Http.Request<PublishResponse[]> publishBatchImpl(final Message.Batch[] p
@Override
public void execute(HttpScheduler http, final Callback<PublishResponse[]> callback) throws AblyException {
HttpCore.RequestBody requestBody = options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(pubSpecs) : MessageSerializer.asJSONRequest(pubSpecs);
final Param[] params = options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams ; // RSC7c
http.post("/messages", HttpUtils.defaultAcceptHeaders(options.useBinaryProtocol), params, requestBody, new HttpCore.ResponseHandler<PublishResponse[]>() {
@Override
public PublishResponse[] handleResponse(HttpCore.Response response, ErrorInfo error) throws AblyException {
Expand Down
14 changes: 9 additions & 5 deletions lib/src/main/java/io/ably/lib/rest/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ public void execute(HttpScheduler http, final Callback<Void> callback) throws Ab
}
if(!hasClientSuppliedId && ably.options.idempotentRestPublishing) {
/* RSL1k1: populate the message id with a library-generated id */
String messageId = Crypto.getRandomMessageId();
String messageId = Crypto.getRandomId();
for (int i = 0; i < messages.length; i++) {
messages[i].id = messageId + ':' + i;
}
}

HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages);
final Param[] params = ably.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c

http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), null, requestBody, null, true, callback);
http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, null, true, callback);
}
});
}
Expand Down Expand Up @@ -139,8 +140,9 @@ public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<Message>>
historyImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] initialParams) {
HttpCore.BodyHandler<Message> bodyHandler = MessageSerializer.getMessageResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<Message>(ably.http, basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand Down Expand Up @@ -169,8 +171,9 @@ public void getAsync(Param[] params, Callback<AsyncPaginatedResult<PresenceMessa
getImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<PresenceMessage> getImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<PresenceMessage> getImpl(Param[] initialParams) {
HttpCore.BodyHandler<PresenceMessage> bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<PresenceMessage>(ably.http, basePath + "/presence", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand All @@ -195,8 +198,9 @@ public void historyAsync(Param[] params, Callback<AsyncPaginatedResult<PresenceM
historyImpl(params).async(callback);
}

private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] params) {
private BasePaginatedQuery.ResultRequest<PresenceMessage> historyImpl(Param[] initialParams) {
HttpCore.BodyHandler<PresenceMessage> bodyHandler = PresenceSerializer.getPresenceResponseHandler(options);
final Param[] params = ably.options.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
return (new BasePaginatedQuery<PresenceMessage>(ably.http, basePath + "/presence/history", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, bodyHandler)).get();
}

Expand Down
6 changes: 6 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,10 @@ public ClientOptions(String key) throws AblyException {
* before responding.
*/
public boolean pushFullWait = false;

/**
If enabled, every REST request to Ably includes a `request_id` query string parameter. This request ID
remain the same if a request is retried to a fallback host.
*/
public boolean addRequestIds = false;
}
Loading