Skip to content

Commit

Permalink
Merge branch 'main' into dogac/add-flat-schema-support
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhee17 authored Jul 19, 2024
2 parents 2001329 + 3596b3b commit 9319c0c
Show file tree
Hide file tree
Showing 221 changed files with 7,425 additions and 811 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/actions_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ jobs:
-PbuildJdkVersion=${{ env.BUILD_JDK_VERSION }} \
-PtestJavaVersion=${{ matrix.java }} \
${{ matrix.min-java && format('-PminimumJavaVersion={0}', matrix.min-java) || '' }} \
-Porg.gradle.java.installations.paths=${{ steps.setup-build-jdk.outputs.path }},${{ steps.setup-jdk.outputs.path }}
-Porg.gradle.java.installations.paths=${{ steps.setup-build-jdk.outputs.path }},${{ steps.setup-jdk.outputs.path }} \
-PpreferShadedTests=${{ github.ref_name != 'main' }}
# Unshaded tests are skipped for PRs to avoid running the same tests twice.
shell: bash
env:
COMMIT_SHA: ${{ github.event.pull_request.head.sha }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gradle-enterprise-postjob.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:

- name: Download artifact
id: download-artifact
uses: dawidd6/action-download-artifact@v3
uses: dawidd6/action-download-artifact@v6
with:
workflow_conclusion: ""
run_id: ${{ env.RUN_ID }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class RoutersBenchmark {
SERVICES = ImmutableList.of(newServiceConfig(route1), newServiceConfig(route2));
FALLBACK_SERVICE = newServiceConfig(Route.ofCatchAll());
HOST = new VirtualHost(
"localhost", "localhost", 0, null, SERVICES, FALLBACK_SERVICE, RejectedRouteHandler.DISABLED,
"localhost", "localhost", 0, null,
null, SERVICES, FALLBACK_SERVICE, RejectedRouteHandler.DISABLED,
unused -> NOPLogger.NOP_LOGGER, FALLBACK_SERVICE.defaultServiceNaming(),
FALLBACK_SERVICE.defaultLogName(), 0, 0, false,
AccessLogWriter.disabled(), CommonPools.blockingTaskExecutor(), 0, SuccessFunction.ofDefault(),
Expand Down
13 changes: 9 additions & 4 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,21 @@ mrJarVersions.each { version->
options.release.set(targetJavaVersion)
}

task "testJava${version}"(type: Test, group: 'Verification', description: "Runs unit tests for Java ${version} source set") {
tasks.register("testJava${version}", Test) {
group = 'Verification'
description = "Runs unit tests for Java ${version} source set"
testClassesDirs = sourceSets."java${version}Test".output.classesDirs
classpath = sourceSets."java${version}Test".runtimeClasspath

project.ext.configureCommonTestSettings(it)
enabled = project.ext.testJavaVersion >= targetJavaVersion

check.dependsOn it
}

configurations."java${version}Implementation".extendsFrom configurations.implementation
configurations."java${version}TestImplementation".extendsFrom configurations.testImplementation
configurations."java${version}TestRuntimeClasspath".extendsFrom configurations.testRuntimeClasspath

check.dependsOn "testJava${version}"
}

tasks.withType(Jar) {
Expand All @@ -63,7 +65,7 @@ tasks.trimShadedJar.doLast {
def trimmed = tasks.trimShadedJar.outJarFiles[0].toPath()

ant.jar(destfile: trimmed.toString(), update: true, duplicate: 'fail') {
zipfileset(src: tasks.shadedJar.archivePath) {
zipfileset(src: tasks.shadedJar.archiveFile.get().asFile) {
include(name: 'META-INF/versions/**')
}

Expand Down Expand Up @@ -225,6 +227,9 @@ if (tasks.findByName('trimShadedJar')) {
keep "class com.linecorp.armeria.internal.shaded.bouncycastle.jcajce.provider.asymmetric.ec.** { *; }"
keep "class com.linecorp.armeria.internal.shaded.bouncycastle.jcajce.provider.asymmetric.rsa.** { *; }"
keep "class com.linecorp.armeria.internal.shaded.bouncycastle.jcajce.provider.asymmetric.x509.** { *; }"
// Keep the Guava classes accessed during testing.
keep "class com.linecorp.armeria.internal.shaded.guava.net.HttpHeaders { *; }"
keep "class com.linecorp.armeria.internal.shaded.guava.net.MediaType { *; }"
dontnote
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.ResponseCompleteException;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.logging.RequestLogBuilder;
Expand All @@ -50,6 +52,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.proxy.ProxyConnectException;

Expand All @@ -61,6 +64,7 @@ enum State {
NEEDS_TO_WRITE_FIRST_HEADER,
NEEDS_DATA,
NEEDS_DATA_OR_TRAILERS,
NEEDS_100_CONTINUE,
DONE
}

Expand Down Expand Up @@ -143,6 +147,11 @@ public final void operationComplete(ChannelFuture future) throws Exception {
responseWrapper.initTimeout();
}

if (state == State.NEEDS_100_CONTINUE) {
assert responseWrapper != null;
responseWrapper.initTimeout();
}

onWriteSuccess();
return;
}
Expand All @@ -169,14 +178,14 @@ final boolean tryInitialize() {
"Can't send requests. ID: " + id + ", session active: " +
session.isAcquirable(responseDecoder.keepAliveHandler()));
}
session.deactivate();
session.markUnacquirable();
// No need to send RST because we didn't send any packet and this will be disconnected anyway.
fail(UnprocessedRequestException.of(exception));
return false;
}

this.session = session;
responseWrapper = responseDecoder.addResponse(id, originalRes, ctx, ch.eventLoop());
responseWrapper = responseDecoder.addResponse(this, id, originalRes, ctx, ch.eventLoop());

if (timeoutMillis > 0) {
// The timer would be executed if the first message has not been sent out within the timeout.
Expand All @@ -187,34 +196,39 @@ final boolean tryInitialize() {
return true;
}

RequestHeaders mergedRequestHeaders(RequestHeaders headers) {
final HttpHeaders internalHeaders;
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension == null) {
internalHeaders = HttpHeaders.of();
} else {
internalHeaders = ctxExtension.internalRequestHeaders();
}
return mergeRequestHeaders(
headers, ctx.defaultRequestHeaders(), ctx.additionalRequestHeaders(), internalHeaders);
}

/**
* Writes the {@link RequestHeaders} to the {@link Channel}.
* The {@link RequestHeaders} is merged with {@link ClientRequestContext#additionalRequestHeaders()}
* before being written.
* Note that the written data is not flushed by this method. The caller should explicitly call
* {@link Channel#flush()} when each write unit is done.
*/
final void writeHeaders(RequestHeaders headers) {
final void writeHeaders(RequestHeaders headers, boolean needs100Continue) {
final SessionProtocol protocol = session.protocol();
assert protocol != null;
if (headersOnly) {
if (needs100Continue) {
state = State.NEEDS_100_CONTINUE;
} else if (headersOnly) {
state = State.DONE;
} else if (allowTrailers) {
state = State.NEEDS_DATA_OR_TRAILERS;
} else {
state = State.NEEDS_DATA;
}

final HttpHeaders internalHeaders;
final ClientRequestContextExtension ctxExtension = ctx.as(ClientRequestContextExtension.class);
if (ctxExtension == null) {
internalHeaders = HttpHeaders.of();
} else {
internalHeaders = ctxExtension.internalRequestHeaders();
}
final RequestHeaders merged = mergeRequestHeaders(
headers, ctx.defaultRequestHeaders(), ctx.additionalRequestHeaders(), internalHeaders);
logBuilder.requestHeaders(merged);
logBuilder.requestHeaders(headers);

final String connectionOption = headers.get(HttpHeaderNames.CONNECTION);
if (CLOSE_STRING.equalsIgnoreCase(connectionOption) || !keepAlive) {
Expand All @@ -223,16 +237,44 @@ final void writeHeaders(RequestHeaders headers) {
// connection by sending a GOAWAY frame that will be sent after receiving the corresponding
// response from the remote peer. The "Connection: close" header is stripped when it is converted to
// a Netty HTTP/2 header.
session.deactivate();
session.markUnacquirable();
}

final ChannelPromise promise = ch.newPromise();
// Attach a listener first to make the listener early handle a cause raised while writing headers
// before any other callbacks like `onStreamClosed()` are invoked.
promise.addListener(this);
encoder.writeHeaders(id, streamId(), merged, headersOnly, promise);
encoder.writeHeaders(id, streamId(), headers, headersOnly, promise);
}

static boolean needs100Continue(RequestHeaders headers) {
return headers.contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE.toString());
}

void handle100Continue(ResponseHeaders responseHeaders) {
if (state != State.NEEDS_100_CONTINUE) {
return;
}

if (responseHeaders.status() == HttpStatus.CONTINUE) {
state = State.NEEDS_DATA_OR_TRAILERS;
resume();
// TODO(minwoox): reset the timeout
} else {
// We do not retry the request when HttpStatus.EXPECTATION_FAILED is received
// because:
// - Most servers support 100-continue.
// - It's much simpler to just fail the request and let the user retry.
state = State.DONE;
logBuilder.endRequest();
discardRequestBody();
}
}

abstract void resume();

abstract void discardRequestBody();

/**
* Writes the {@link HttpData} to the {@link Channel}.
* Note that the written data is not flushed by this method. The caller should explicitly call
Expand Down Expand Up @@ -329,6 +371,12 @@ private void fail(Throwable cause) {
}

final void failAndReset(Throwable cause) {
if (cause instanceof WriteTimeoutException) {
final HttpSession session = HttpSession.get(ch);
// Mark the session as unhealthy so that subsequent requests do not use it.
session.markUnacquirable();
}

if (cause instanceof ProxyConnectException || cause instanceof ResponseCompleteException) {
// - ProxyConnectException is handled by HttpSessionHandler.exceptionCaught().
// - ResponseCompleteException means the response is successfully received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ static AbstractHttpRequestSubscriber of(Channel channel, ClientHttpObjectEncoder
}

private final HttpRequest request;
private final boolean http1WebSocket;

@Nullable
private Subscription subscription;
Expand All @@ -62,10 +63,11 @@ static AbstractHttpRequestSubscriber of(Channel channel, ClientHttpObjectEncoder
HttpResponseDecoder responseDecoder,
HttpRequest request, DecodedHttpResponse originalRes,
ClientRequestContext ctx, long timeoutMillis, boolean allowTrailers,
boolean keepAlive) {
boolean keepAlive, boolean http1WebSocket) {
super(ch, encoder, responseDecoder, originalRes, ctx, timeoutMillis, request.isEmpty(), allowTrailers,
keepAlive);
this.request = request;
this.http1WebSocket = http1WebSocket;
}

@Override
Expand All @@ -77,15 +79,22 @@ public void onSubscribe(Subscription subscription) {
return;
}

final RequestHeaders headers = mergedRequestHeaders(mapHeaders(request.headers()));
final boolean needs100Continue = needs100Continue(headers);
if (needs100Continue && http1WebSocket) {
failRequest(new IllegalArgumentException(
"a WebSocket request is not allowed to have Expect: 100-continue header"));
return;
}

if (!tryInitialize()) {
return;
}

// NB: This must be invoked at the end of this method because otherwise the callback methods in this
// class can be called before the member fields (subscription, id, responseWrapper and
// timeoutFuture) are initialized.
// It is because the successful write of the first headers will trigger subscription.request(1).
writeHeaders(mapHeaders(request.headers()));
writeHeaders(headers, needs100Continue(headers));
channel().flush();
}

Expand All @@ -111,6 +120,13 @@ public void onComplete() {

@Override
void onWriteSuccess() {
if (state() == State.NEEDS_100_CONTINUE) {
return;
}
request();
}

private void request() {
// Request more messages regardless whether the state is DONE. It makes the producer have
// a chance to produce the last call such as 'onComplete' and 'onError' when there are
// no more messages it can produce.
Expand All @@ -126,4 +142,14 @@ void cancel() {
assert subscription != null;
subscription.cancel();
}

@Override
final void resume() {
request();
}

@Override
void discardRequestBody() {
cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public InboundTrafficController inboundTrafficController() {
}

@Override
public HttpResponseWrapper addResponse(
int id, DecodedHttpResponse res, ClientRequestContext ctx, EventLoop eventLoop) {
public HttpResponseWrapper addResponse(@Nullable AbstractHttpRequestHandler requestHandler,
int id, DecodedHttpResponse res,
ClientRequestContext ctx, EventLoop eventLoop) {
final HttpResponseWrapper newRes =
new HttpResponseWrapper(res, eventLoop, ctx,
new HttpResponseWrapper(requestHandler, res, eventLoop, ctx,
ctx.responseTimeoutMillis(), ctx.maxResponseLength());
final HttpResponseWrapper oldRes = responses.put(id, newRes);
keepAliveHandler().increaseNumRequests();
Expand Down
Loading

0 comments on commit 9319c0c

Please sign in to comment.