Skip to content

Commit

Permalink
refactor: make rest-swagger connector modern pt.2
Browse files Browse the repository at this point in the history
  • Loading branch information
zregvart committed Apr 15, 2019
1 parent dccdb02 commit 280d61d
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 128 deletions.
18 changes: 10 additions & 8 deletions app/connector/rest-swagger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
<dependency>
<groupId>io.syndesis.integration</groupId>
<artifactId>integration-component-proxy</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.syndesis.connector</groupId>
<artifactId>connector-support-processor</artifactId>
<groupId>io.syndesis.integration</groupId>
<artifactId>integration-runtime</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -109,12 +111,6 @@


<!-- testing -->
<dependency>
<groupId>io.syndesis.integration</groupId>
<artifactId>integration-runtime</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -146,6 +142,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@
public class AuthenticationCustomizer implements ComponentProxyCustomizer {

@Override
public void customize(final ComponentProxyComponent component, final Map<String, Object> options) {
public void customize(final ComponentProxyComponent proxyComponent, final Map<String, Object> options) {
consumeOption(options, "authenticationType", authenticationTypeObject -> {
final AuthenticationType authenticationType = AuthenticationType.valueOf(String.valueOf(authenticationTypeObject));
if (authenticationType == AuthenticationType.none) {
return;
}

final SwaggerProxyComponent component = (SwaggerProxyComponent) proxyComponent;

final CamelContext context = component.getCamelContext();
final Configuration configuration = new Configuration(this, context, options);

if (component instanceof SwaggerProxyComponent) {
((SwaggerProxyComponent) component).setConfiguration(configuration);
}
component.setConfiguration(configuration);

if (authenticationType == AuthenticationType.oauth2) {
OAuth.setup(component, configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public Configuration(final Map<String, Object> initial, final ComponentProxyCust
this.options = options;
}

public boolean booleanOption(final String key) {
final Boolean value = value(key, Boolean.class);

if (value == null) {
return false;
}

return value.booleanValue();
}

public long longOption(final String key) {
final Long value = value(key, Long.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class PayloadConverterBase implements Processor {

private final DataShapeKinds kind;

public PayloadConverterBase(final DataShapeKinds kind) {
PayloadConverterBase(final DataShapeKinds kind) {
this.kind = kind;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.syndesis.common.model.DataShape;
import io.syndesis.common.model.InputDataShapeAware;
import io.syndesis.common.model.OutputDataShapeAware;
import io.syndesis.connector.support.processor.SyndesisHeaderStrategy;
import io.syndesis.integration.component.proxy.ComponentProxyComponent;
import io.syndesis.integration.component.proxy.ComponentProxyCustomizer;
import io.syndesis.integration.component.proxy.Processors;
import io.syndesis.integration.runtime.util.SyndesisHeaderStrategy;

import org.apache.camel.CamelContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import io.syndesis.common.model.DataShape;
import io.syndesis.common.model.DataShapeKinds;
import io.syndesis.connector.support.processor.SyndesisHeaderStrategy;
import io.syndesis.integration.runtime.util.SyndesisHeaderStrategy;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package io.syndesis.connector.rest.swagger;

import io.syndesis.connector.rest.swagger.auth.oauth.OAuthRefreshingEndpoint;
import java.util.function.Function;

import io.syndesis.integration.component.proxy.ComponentProxyComponent;

import org.apache.camel.Endpoint;

public final class SwaggerProxyComponent extends ComponentProxyComponent {

private Configuration configuration;

private Function<Endpoint, Endpoint> endpointOverride = Function.identity();

public SwaggerProxyComponent(final String componentId, final String componentScheme) {
super(componentId, componentScheme);
}
Expand All @@ -38,7 +42,11 @@ public Endpoint createEndpoint(final String uri) throws Exception {
return endpoint;
}

return new OAuthRefreshingEndpoint(this, configuration, endpoint);
return endpointOverride.apply(endpoint);
}

public void overrideEndpoint(final Function<Endpoint, Endpoint> endpointOverride) {
this.endpointOverride = endpointOverride;
}

public void setConfiguration(final Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public boolean accept(final XMLStreamReader reader) {
processText(reader);
break;
default:
// do nothing
break;
}

return (inRequest && inPayload) || !inRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.syndesis.connector.rest.swagger.auth;

import io.syndesis.connector.support.processor.SyndesisHeaderStrategy;
import io.syndesis.integration.runtime.util.SyndesisHeaderStrategy;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package io.syndesis.connector.rest.swagger.auth.oauth;

import io.syndesis.connector.rest.swagger.Configuration;
import io.syndesis.connector.rest.swagger.SwaggerProxyComponent;
import io.syndesis.connector.rest.swagger.auth.SetAuthorizationHeader;
import io.syndesis.integration.component.proxy.ComponentProxyComponent;
import io.syndesis.integration.component.proxy.Processors;

public final class OAuth {
Expand All @@ -26,14 +26,41 @@ private OAuth() {
// utility class
}

public static void setup(final ComponentProxyComponent component, final Configuration configuration) {
final OAuthRefreshTokenProcessor refreshing = new OAuthRefreshTokenProcessor(configuration);
public static void setup(final SwaggerProxyComponent component, final Configuration configuration) {
final boolean canProcessRefresh = canProcessRefresh(configuration);
final boolean retriesOnAuthenticationErrors = retriesOnAuthenticationErrors(configuration);

final String authorizationHeaderValue = "Bearer " + configuration.stringOption("accessToken");
Processors.addBeforeProducer(component, new SetAuthorizationHeader(authorizationHeaderValue));
final OAuthState state = OAuthState.createFrom(configuration);

if (refreshing.canProcessRefresh()) {
Processors.addBeforeProducer(component, refreshing);
if (canProcessRefresh && !retriesOnAuthenticationErrors) {
Processors.addBeforeProducer(component, new OAuthRefreshTokenProcessor(state, configuration));
} else if (retriesOnAuthenticationErrors) {
Processors.addBeforeProducer(component, new OAuthRefreshTokenProcessor(state, configuration));

final OAuthRefreshTokenOnFailProcessor refreshOnFailure = new OAuthRefreshTokenOnFailProcessor(state, configuration);
component.overrideEndpoint(e -> new OAuthRefreshingEndpoint(component, e, refreshOnFailure));
} else {
final String authorizationHeaderValue = "Bearer " + configuration.stringOption("accessToken");
Processors.addBeforeProducer(component, new SetAuthorizationHeader(authorizationHeaderValue));
}
}

private static boolean canProcessRefresh(final Configuration configuration) {
final String clientId = configuration.stringOption("clientId");
final String clientSecret = configuration.stringOption("clientSecret");
final String refreshToken = configuration.stringOption("refreshToken");
final String authorizationEndpoint = configuration.stringOption("authorizationEndpoint");
final boolean authorizeUsingParameters = configuration.booleanOption("authorizeUsingParameters");

final boolean hasBasicRefreshOptions = refreshToken != null && authorizationEndpoint != null;
final boolean hasParametersIfNeeded = authorizeUsingParameters && clientId != null && clientSecret != null;

return hasBasicRefreshOptions && (!authorizeUsingParameters || hasParametersIfNeeded);
}

private static boolean retriesOnAuthenticationErrors(final Configuration configuration) {
final String statuses = configuration.stringOption("refreshTokenRetryStatuses");

return statuses != null && !statuses.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class OAuthRefreshTokenOnFailProcessor extends OAuthRefreshTokenProcessor {

private Set<Integer> statusesToRefreshFor = new HashSet<>();

OAuthRefreshTokenOnFailProcessor(final Configuration configuration) {
super(configuration);
OAuthRefreshTokenOnFailProcessor(final OAuthState state, final Configuration configuration) {
super(state, configuration);

final String statuses = configuration.stringOption("refreshTokenRetryStatuses");
if (statuses != null) {
Expand All @@ -46,21 +46,27 @@ class OAuthRefreshTokenOnFailProcessor extends OAuthRefreshTokenProcessor {

@Override
public void process(final Exchange exchange) throws Exception {
final HttpOperationFailedException httpFailure = (HttpOperationFailedException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT);
LOG.warn("Failed invoking the remote API, status: {} {}, response body: {}", httpFailure.getStatusCode(),
httpFailure.getStatusText(), httpFailure.getResponseBody());
final Exception caught = (Exception) exchange.getProperty(Exchange.EXCEPTION_CAUGHT);
if (caught instanceof HttpOperationFailedException) {
final HttpOperationFailedException httpFailure = (HttpOperationFailedException) caught;
LOG.warn("Failed invoking the remote API, status: {} {}, response body: {}", httpFailure.getStatusCode(),
httpFailure.getStatusText(), httpFailure.getResponseBody());

if (!shouldTryRefreshingAccessCode(httpFailure)) {
if (!shouldTryRefreshingAccessCode(httpFailure)) {
throw httpFailure;
}

// we don't check the return value as we will throw `httpFailure`
// anyhow
tryToRefreshAccessToken();

// we need to throw the failure so that the exchange fails,
// otherwise it might be considered successful and we do not perform
// any retry, and that would lead to data inconsistencies
throw httpFailure;
}

// we don't check the return value as we will throw `httpFailure` anyhow
tryToRefreshAccessToken();

// we need to throw the failure so that the exchange fails, otherwise it
// might be considered successful and we do not perform any
// retry, and that would lead to data inconsistencies
throw httpFailure;
super.process(exchange);
}

boolean shouldTryRefreshingAccessCode(final HttpOperationFailedException httpFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.syndesis.connector.rest.swagger.Configuration;
import io.syndesis.connector.support.processor.SyndesisHeaderStrategy;
import io.syndesis.integration.runtime.util.SyndesisHeaderStrategy;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
Expand Down Expand Up @@ -56,55 +56,37 @@ class OAuthRefreshTokenProcessor implements Processor {

private static final Logger LOG = LoggerFactory.getLogger(OAuthRefreshTokenProcessor.class);

String accessToken;

long accessTokenExpiresAt;

Optional<Long> expiresInOverride = Optional.ofNullable(System.getenv().get("AUTHTOKEN_EXPIRES_IN_OVERRIDE")).map(Long::valueOf);

// Always refresh on (re)start
final AtomicReference<Boolean> isFirstTime = new AtomicReference<>(Boolean.TRUE);

final AtomicReference<String> lastRefreshTokenTried = new AtomicReference<>(null);

String refreshToken;
final OAuthState state;

private final String authorizationEndpoint;

private boolean authorizeUsingParameters;

private final String clientId;
private final boolean authorizeUsingParameters;

private final String clientSecret;

public OAuthRefreshTokenProcessor(final Configuration configuration) {
clientId = configuration.stringOption("clientId");
clientSecret = configuration.stringOption("clientSecret");
accessToken = configuration.stringOption("accessToken");
refreshToken = configuration.stringOption("refreshToken");
OAuthRefreshTokenProcessor(final OAuthState state, final Configuration configuration) {
this.state = state;
authorizationEndpoint = configuration.stringOption("authorizationEndpoint");
accessTokenExpiresAt = configuration.longOption("accessTokenExpiresAt");
authorizeUsingParameters = configuration.booleanOption("authorizeUsingParameters");
}

@Override
public void process(final Exchange exchange) throws Exception {
if (refreshToken != null && (isFirstTime.get() || accessTokenExpiresAt - AHEAD_OF_TIME_REFRESH_MILIS <= now())) {
if (state.getRefreshToken() != null && (isFirstTime.get() || state.getAccessTokenExpiresAt() - AHEAD_OF_TIME_REFRESH_MILIS <= now())) {
tryToRefreshAccessToken();
}

final Message in = exchange.getIn();
in.setHeader("Authorization", "Bearer " + accessToken);
in.setHeader("Authorization", "Bearer " + state.getAccessToken());

SyndesisHeaderStrategy.whitelist(exchange, "Authorization");
}

boolean canProcessRefresh() {
final boolean hasBasicRefreshOptions = refreshToken != null && authorizationEndpoint != null;
final boolean hasParametersIfNeeded = authorizeUsingParameters && clientId != null && clientSecret != null;

return hasBasicRefreshOptions && (!authorizeUsingParameters || hasParametersIfNeeded);
}

CloseableHttpClient createHttpClient() {
return HttpClientBuilder.create().build();
}
Expand All @@ -113,11 +95,11 @@ HttpUriRequest createHttpRequest() {
final RequestBuilder builder = RequestBuilder.post(authorizationEndpoint);

if (authorizeUsingParameters) {
builder.addParameter("client_id", clientId)
.addParameter("client_secret", clientSecret);
builder.addParameter("client_id", state.getClientId())
.addParameter("client_secret", state.getClientSecret());
}

builder.addParameter("refresh_token", refreshToken)
builder.addParameter("refresh_token", state.getRefreshToken())
.addParameter("grant_type", "refresh_token");

return builder.build();
Expand All @@ -137,7 +119,7 @@ void processRefreshTokenResponse(final HttpEntity entity) throws IOException, Js

final JsonNode accessToken = body.get("access_token");
if (isPresentAndHasValue(accessToken)) {
this.accessToken = accessToken.asText();
final String accessTokenValue = accessToken.asText();
isFirstTime.set(Boolean.FALSE);
LOG.info("Successful access token refresh");

Expand All @@ -150,21 +132,26 @@ void processRefreshTokenResponse(final HttpEntity entity) throws IOException, Js
expiresInSeconds = expiresIn.asLong();
}
}

long accessTokenExpiresAt = 0;
if (expiresInSeconds != null) {
accessTokenExpiresAt = now() + expiresInSeconds * 1000;
}

final JsonNode refreshToken = body.get("refresh_token");
String refreshTokenValue = null;
if (isPresentAndHasValue(refreshToken)) {
this.refreshToken = refreshToken.asText();
refreshTokenValue = refreshToken.asText();

lastRefreshTokenTried.compareAndSet(this.refreshToken, null);
lastRefreshTokenTried.compareAndSet(refreshTokenValue, null);
}

state.update(accessTokenValue, accessTokenExpiresAt, refreshTokenValue);
}
}

void tryToRefreshAccessToken() {
final String currentRefreshToken = refreshToken;
final String currentRefreshToken = state.getRefreshToken();
lastRefreshTokenTried.getAndUpdate(last -> {
if (isFirstTime.get()) {
return null;
Expand Down
Loading

0 comments on commit 280d61d

Please sign in to comment.