Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription support for the dynamic client #838

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum OperationType {
QUERY,
MUTATION
MUTATION,
SUBSCRIPTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.smallrye.graphql.client.Request;
import io.smallrye.graphql.client.Response;
import io.smallrye.graphql.client.core.Document;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public interface DynamicGraphQLClient extends AutoCloseable {
Expand All @@ -17,4 +18,8 @@ public interface DynamicGraphQLClient extends AutoCloseable {

Uni<Response> executeAsync(Request request);

Multi<Response> subscription(Document document);

Multi<Response> subscription(Request request);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import io.smallrye.graphql.client.dynamic.SmallRyeGraphQLDynamicClientLogging;
import io.smallrye.graphql.client.dynamic.SmallRyeGraphQLDynamicClientMessages;
import io.smallrye.graphql.client.dynamic.api.DynamicGraphQLClient;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;

public class VertxDynamicGraphQLClient implements DynamicGraphQLClient {

private final WebClient webClient;
private final HttpClient httpClient;
private final String url;
private final MultiMap headers;

VertxDynamicGraphQLClient(WebClientOptions options, Vertx vertx, String url, MultiMap headers) {
this.webClient = WebClient.create(vertx, options);
VertxDynamicGraphQLClient(Vertx vertx, String url, MultiMap headers) {
this.httpClient = vertx.createHttpClient();
this.webClient = WebClient.wrap(httpClient);
this.headers = headers;
this.url = url;
}
Expand Down Expand Up @@ -73,13 +78,39 @@ public Uni<Response> executeAsync(Request request) {
.map(response -> readFrom(response.body()));
}

public Multi<Response> subscription(Document document) {
return subscription(buildRequest(document));
}

public Multi<Response> subscription(Request request) {
String WSURL = url.replaceFirst("http", "ws");
return Multi.createFrom()
.emitter(e -> {
httpClient.webSocketAbs(WSURL, headers, WebsocketVersion.V13, new ArrayList<>(), result -> {
if (result.succeeded()) {
WebSocket socket = result.result();
socket.writeTextMessage(request.toJson());
socket.handler(message -> {
e.emit(readFrom(message));
});
socket.closeHandler((v) -> {
e.complete();
});
e.onTermination(socket::close);
} else {
e.fail(result.cause());
}
});
});
}

public Request buildRequest(Document document) {
return new RequestImpl(document.build());
}

@Override
public void close() {
webClient.close();
httpClient.close();
}

private ResponseImpl readFrom(Buffer input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.ext.web.client.WebClientOptions;

/**
* Implementation of dynamic client builder that creates GraphQL clients using Vert.x under the hood.
Expand Down Expand Up @@ -39,12 +38,11 @@ public DynamicGraphQLClientBuilder url(String url) {

@Override
public DynamicGraphQLClient build() {
WebClientOptions options = new WebClientOptions();
if (url == null) {
throw new IllegalArgumentException("URL is required");
}
Vertx toUseVertx = vertx != null ? vertx : Vertx.vertx();
return new VertxDynamicGraphQLClient(options, toUseVertx, url, headersMap);
return new VertxDynamicGraphQLClient(toUseVertx, url, headersMap);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public String build() {
case MUTATION:
builder.append("mutation");
break;
case SUBSCRIPTION:
builder.append("subscription");
break;
default:
throw new BuildException("Operation type must be one of QUERY, MUTATION or SUBSCRIPTION");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

import java.io.IOException;
import java.io.StringReader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonReaderFactory;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import javax.json.bind.JsonbConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
Expand All @@ -23,7 +21,6 @@
import org.reactivestreams.Subscription;

import graphql.ExecutionResult;
import graphql.schema.GraphQLSchema;
import io.smallrye.graphql.cdi.config.GraphQLConfig;
import io.smallrye.graphql.execution.ExecutionResponse;
import io.smallrye.graphql.execution.ExecutionService;
Expand All @@ -37,25 +34,26 @@
public class SubscriptionWebSocket {

private static final JsonReaderFactory jsonReaderFactory = Json.createReaderFactory(null);
private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>();
private final ConcurrentHashMap<String, AtomicReference<Subscription>> subscriptionRefs = new ConcurrentHashMap<>();

@Inject
ExecutionService executionService;

@Inject
GraphQLSchema graphQLSchema;

@Inject
GraphQLConfig config;

@OnClose
public void onClose(Session session) throws IOException {
subscriptionRef.set(null);
public void onClose(Session session) {
unsubscribe(session.getId());
}

@OnError
public void onError(Session session, Throwable throwable) throws IOException {
session.getBasicRemote().sendText(throwable.getMessage());
throwable.printStackTrace();
unsubscribe(session.getId());
if (session.isOpen()) {
session.close();
}
}

@OnMessage
Expand All @@ -72,54 +70,69 @@ public void handleMessage(Session session, String message) {

@Override
public void onSubscribe(Subscription s) {
subscriptionRef.set(s);
request(1, session);
AtomicReference<Subscription> subRef = subscriptionRefs.get(session.getId());
if (subRef == null) {
subRef = new AtomicReference<>(s);
subscriptionRefs.put(session.getId(), subRef);
s.request(1);
return;
}
if (subRef.compareAndSet(null, s)) {
s.request(1);
} else {
s.cancel();
}
}

@Override
public void onNext(ExecutionResult er) {

try {
if (session.isOpen()) {
ExecutionResponse executionResponse = new ExecutionResponse(er, config);
session.getBasicRemote().sendText(executionResponse.getExecutionResultAsString());
Subscription s = subscriptionRefs.get(session.getId()).get();
s.request(1);
} else {
unsubscribe(session.getId());
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
request(1, session);

}

@Override
public void onError(Throwable t) {
t.printStackTrace();
unsubscribe(session.getId());
try {
session.getBasicRemote().sendText(t.getMessage());
} catch (IOException ex) {
throw new RuntimeException(ex);
session.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void onComplete() {
unsubscribe(session.getId());
try {
session.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
ex.printStackTrace();
}
}
});
}
}

}

private void request(int n, Session session) {
Subscription subscription = subscriptionRef.get();
if (subscription != null && session.isOpen()) {
subscription.request(n);
private void unsubscribe(String sessionId) {
AtomicReference<Subscription> subscription = subscriptionRefs.get(sessionId);
subscriptionRefs.remove(sessionId);

if (subscription != null && subscription.get() != null) {
subscription.get().cancel();
subscription.set(null);
}
}

private static final Jsonb JSONB = JsonbBuilder.create(new JsonbConfig().withNullValues(true));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.smallrye.graphql.tests.client.dynamic.subscription;

import org.eclipse.microprofile.graphql.GraphQLApi;
import org.eclipse.microprofile.graphql.Query;

import io.smallrye.graphql.api.Subscription;
import io.smallrye.mutiny.Multi;

@GraphQLApi
public class DynamicClientSubscriptionApi {

// this needs to be here because a GraphQLApi with only subscriptions does not work
@Query
public String nothing() {
return null;
}

@Subscription
public Multi<Integer> countToFive() {
return Multi.createFrom().range(0, 5);
}

@Subscription
public Multi<Integer> failingImmediately() {
return Multi.createFrom().failure(new RuntimeException("blabla"));
}

}
Loading