Skip to content

Commit

Permalink
Merge pull request #220 from trustin/fix_context_propagation
Browse files Browse the repository at this point in the history
Fix RequestContext propagation and callback invocation
  • Loading branch information
trustin authored Aug 9, 2016
2 parents 6780ee4 + c9b17f3 commit eb3f036
Show file tree
Hide file tree
Showing 16 changed files with 674 additions and 183 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client;

import java.time.Duration;

import com.linecorp.armeria.common.RequestContextWrapper;

public class ClientRequestContextWrapper
extends RequestContextWrapper<ClientRequestContext> implements ClientRequestContext {

protected ClientRequestContextWrapper(ClientRequestContext delegate) {
super(delegate);
}

@Override
public Endpoint endpoint() {
return delegate().endpoint();
}

@Override
public ClientOptions options() {
return delegate().options();
}

@Override
public long writeTimeoutMillis() {
return delegate().writeTimeoutMillis();
}

@Override
public void setWriteTimeoutMillis(long writeTimeoutMillis) {
delegate().setWriteTimeoutMillis(writeTimeoutMillis);
}

@Override
public void setWriteTimeout(Duration writeTimeout) {
delegate().setWriteTimeout(writeTimeout);
}

@Override
public long responseTimeoutMillis() {
return delegate().responseTimeoutMillis();
}

@Override
public void setResponseTimeoutMillis(long responseTimeoutMillis) {
delegate().setResponseTimeoutMillis(responseTimeoutMillis);
}

@Override
public void setResponseTimeout(Duration responseTimeout) {
delegate().setResponseTimeout(responseTimeout);
}

@Override
public long maxResponseLength() {
return delegate().maxResponseLength();
}

@Override
public void setMaxResponseLength(long maxResponseLength) {
delegate().setMaxResponseLength(maxResponseLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import com.linecorp.armeria.common.AbstractRequestContext;
import com.linecorp.armeria.common.NonWrappingRequestContext;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.http.DefaultHttpHeaders;
import com.linecorp.armeria.common.http.HttpHeaders;
Expand All @@ -38,7 +38,8 @@
/**
* Default {@link ClientRequestContext} implementation.
*/
public final class DefaultClientRequestContext extends AbstractRequestContext implements ClientRequestContext {
public final class DefaultClientRequestContext extends NonWrappingRequestContext
implements ClientRequestContext {

private final EventLoop eventLoop;
private final ClientOptions options;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/linecorp/armeria/client/UserClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.function.Supplier;

import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RequestContext.PushHandle;
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.SessionProtocol;

Expand Down Expand Up @@ -74,7 +76,7 @@ protected final O execute(

final ClientRequestContext ctx = new DefaultClientRequestContext(
eventLoop, sessionProtocol, endpoint, method, path, options, req);
try {
try (PushHandle ignored = RequestContext.push(ctx)) {
return delegate().execute(ctx, req);
} catch (Throwable cause) {
ctx.responseLogBuilder().end(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@

package com.linecorp.armeria.common;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

import com.linecorp.armeria.internal.DefaultAttributeMap;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
Expand All @@ -33,49 +29,7 @@
/**
* Default {@link RequestContext} implementation.
*/
public abstract class AbstractRequestContext extends DefaultAttributeMap implements RequestContext {

private final SessionProtocol sessionProtocol;
private final String method;
private final String path;
private final Object request;
private List<Runnable> onEnterCallbacks;
private List<Runnable> onExitCallbacks;

/**
* Creates a new instance.
*
* @param sessionProtocol the {@link SessionProtocol} of the invocation
* @param request the request associated with this context
*/
protected AbstractRequestContext(
SessionProtocol sessionProtocol, String method, String path, Object request) {
this.sessionProtocol = sessionProtocol;
this.method = method;
this.path = path;
this.request = request;
}

@Override
public final SessionProtocol sessionProtocol() {
return sessionProtocol;
}

@Override
public final String method() {
return method;
}

@Override
public final String path() {
return path;
}

@Override
@SuppressWarnings("unchecked")
public final <T> T request() {
return (T) request;
}
public abstract class AbstractRequestContext implements RequestContext {

@Override
public final EventLoop contextAwareEventLoop() {
Expand Down Expand Up @@ -139,36 +93,7 @@ private PushHandle propagateContextIfNotPresent() {
"sure you are not saving callbacks into shared state.");
}
return () -> {};
}, () -> {
final PushHandle handle = RequestContext.push(this);
final List<Runnable> onEnterCallbacks = this.onEnterCallbacks;
if (onEnterCallbacks != null) {
onEnterCallbacks.forEach(Runnable::run);
}
return (PushHandle) () -> {
handle.close();
final List<Runnable> onExitCallbacks = this.onExitCallbacks;
if (onExitCallbacks != null) {
onExitCallbacks.forEach(Runnable::run);
}
};
});
}

@Override
public final void onEnter(Runnable callback) {
if (onEnterCallbacks == null) {
onEnterCallbacks = new ArrayList<>(4);
}
onEnterCallbacks.add(callback);
}

@Override
public final void onExit(Runnable callback) {
if (onExitCallbacks == null) {
onExitCallbacks = new ArrayList<>(4);
}
onExitCallbacks.add(callback);
}, () -> RequestContext.push(this, true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2016 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import com.linecorp.armeria.internal.DefaultAttributeMap;

import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

/**
* Default {@link RequestContext} implementation.
*/
public abstract class NonWrappingRequestContext extends AbstractRequestContext {

private final DefaultAttributeMap attrs = new DefaultAttributeMap();
private final SessionProtocol sessionProtocol;
private final String method;
private final String path;
private final Object request;
private List<Runnable> onEnterCallbacks;
private List<Runnable> onExitCallbacks;

/**
* Creates a new instance.
*
* @param sessionProtocol the {@link SessionProtocol} of the invocation
* @param request the request associated with this context
*/
protected NonWrappingRequestContext(
SessionProtocol sessionProtocol, String method, String path, Object request) {
this.sessionProtocol = sessionProtocol;
this.method = method;
this.path = path;
this.request = request;
}

@Override
public final SessionProtocol sessionProtocol() {
return sessionProtocol;
}

@Override
public final String method() {
return method;
}

@Override
public final String path() {
return path;
}

@Override
@SuppressWarnings("unchecked")
public final <T> T request() {
return (T) request;
}

@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return attrs.attr(key);
}

@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return attrs.hasAttr(key);
}

@Override
public Iterator<Attribute<?>> attrs() {
return attrs.attrs();
}

@Override
public final void onEnter(Runnable callback) {
if (onEnterCallbacks == null) {
onEnterCallbacks = new ArrayList<>(4);
}
onEnterCallbacks.add(callback);
}

@Override
public final void onExit(Runnable callback) {
if (onExitCallbacks == null) {
onExitCallbacks = new ArrayList<>(4);
}
onExitCallbacks.add(callback);
}

@Override
public void invokeOnEnterCallbacks() {
final List<Runnable> onEnterCallbacks = this.onEnterCallbacks;
if (onEnterCallbacks != null) {
onEnterCallbacks.forEach(Runnable::run);
}
}

@Override
public void invokeOnExitCallbacks() {
final List<Runnable> onExitCallbacks = this.onExitCallbacks;
if (onExitCallbacks != null) {
onExitCallbacks.forEach(Runnable::run);
}
}
}
Loading

0 comments on commit eb3f036

Please sign in to comment.