Skip to content

Commit

Permalink
spotlessApply changes
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Peterson <matt.peterson@swirldslabs.com>
  • Loading branch information
mattp-swirldslabs committed Nov 13, 2024
1 parent c8d7f81 commit ea87a87
Show file tree
Hide file tree
Showing 16 changed files with 601 additions and 369 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import io.helidon.http.Header;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.common.buffers.BufferData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2StreamState;

import java.util.function.UnaryOperator;

public interface GrpcDataProcessor {
void data(@NonNull final Http2FrameHeader header, @NonNull final BufferData data);

void setCurrentStreamState(UnaryOperator<Http2StreamState> operator);

Http2StreamState getCurrentStreamState();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import static java.util.Objects.requireNonNull;

import com.hedera.pbj.grpc.helidon.config.PbjConfig;
import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
Expand All @@ -10,12 +28,9 @@
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2StreamState;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;

import static java.util.Objects.requireNonNull;

public class GrpcDataProcessorImpl implements GrpcDataProcessor {

/**
Expand Down Expand Up @@ -68,8 +83,7 @@ enum ReadState {
private Pipeline<? super Bytes> pipeline;

public GrpcDataProcessorImpl(
@NonNull final PbjConfig config,
@NonNull final Http2StreamState currentStreamState) {
@NonNull final PbjConfig config, @NonNull final Http2StreamState currentStreamState) {

this.config = requireNonNull(config);
this.currentStreamState = new AtomicReference<>(requireNonNull(currentStreamState));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import com.hedera.pbj.runtime.grpc.Pipeline;
Expand All @@ -7,6 +23,8 @@

interface HeadersProcessor {
void setPipeline(@NonNull final Pipeline<? super Bytes> pipeline);

void cancelDeadlineFuture(boolean isCancelled);

ServiceInterface.RequestOptions options();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,33 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.pbj.grpc.helidon;

import static com.hedera.pbj.grpc.helidon.Constants.GRPC_ENCODING_IDENTITY;
import static com.hedera.pbj.grpc.helidon.Constants.IDENTITY;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_TIMEOUT;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO;
import static java.lang.System.Logger.Level.ERROR;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import com.hedera.pbj.runtime.grpc.Pipeline;
Expand All @@ -19,7 +47,6 @@
import io.helidon.http.http2.Http2StreamState;
import io.helidon.http.http2.Http2StreamWriter;
import io.helidon.http.http2.StreamFlowControl;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -28,23 +55,12 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static com.hedera.pbj.grpc.helidon.Constants.GRPC_ENCODING_IDENTITY;
import static com.hedera.pbj.grpc.helidon.Constants.IDENTITY;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ACCEPT_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_ENCODING;
import static com.hedera.pbj.grpc.helidon.GrpcHeaders.GRPC_TIMEOUT;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_JSON;
import static com.hedera.pbj.runtime.grpc.ServiceInterface.RequestOptions.APPLICATION_GRPC_PROTO;
import static java.lang.System.Logger.Level.ERROR;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

public class HeadersProcessorImpl implements HeadersProcessor {
private final System.Logger LOGGER = System.getLogger(this.getClass().getName());

/** The regular expression used to parse the grpc-timeout header. */
private static final String GRPC_TIMEOUT_REGEX = "(\\d{1,8})([HMSmun])";

private static final Pattern GRPC_TIMEOUT_PATTERN = Pattern.compile(GRPC_TIMEOUT_REGEX);

/**
Expand All @@ -68,14 +84,14 @@ public class HeadersProcessorImpl implements HeadersProcessor {
* {@link ServiceInterface} and method to invoke, as well as metrics, and other information.
*/
private final PbjMethodRoute route;

private final Http2StreamWriter streamWriter;
private final StreamFlowControl flowControl;
private final int streamId;
private ServiceInterface.RequestOptions options;
private final GrpcDataProcessor grpcDataProcessor;
private Pipeline<? super Bytes> pipeline;


HeadersProcessorImpl(
@NonNull final Http2Headers headers,
@NonNull final Http2StreamWriter streamWriter,
Expand Down Expand Up @@ -199,7 +215,8 @@ public class HeadersProcessorImpl implements HeadersProcessor {
route.failedUnknownRequestCounter().increment();
LOGGER.log(ERROR, "Failed to initialize grpc protocol handler", unknown);
new TrailerOnlyBuilder(streamWriter, streamId, flowControl)
.grpcStatus(GrpcStatus.UNKNOWN).send();
.grpcStatus(GrpcStatus.UNKNOWN)
.send();
error();
}
}
Expand Down Expand Up @@ -289,19 +306,19 @@ private ScheduledFuture<?> scheduleDeadline(@NonNull final String timeout) {
final var deadline =
System.nanoTime()
* TimeUnit.NANOSECONDS.convert(
num,
switch (unit) {
case "H" -> TimeUnit.HOURS;
case "M" -> TimeUnit.MINUTES;
case "S" -> TimeUnit.SECONDS;
case "m" -> TimeUnit.MILLISECONDS;
case "u" -> TimeUnit.MICROSECONDS;
case "n" -> TimeUnit.NANOSECONDS;
// This should NEVER be reachable, because the matcher
// would not have matched.
default -> throw new GrpcException(
GrpcStatus.INTERNAL, "Invalid unit: " + unit);
});
num,
switch (unit) {
case "H" -> TimeUnit.HOURS;
case "M" -> TimeUnit.MINUTES;
case "S" -> TimeUnit.SECONDS;
case "m" -> TimeUnit.MILLISECONDS;
case "u" -> TimeUnit.MICROSECONDS;
case "n" -> TimeUnit.NANOSECONDS;
// This should NEVER be reachable, because the matcher
// would not have matched.
default -> throw new GrpcException(
GrpcStatus.INTERNAL, "Invalid unit: " + unit);
});
return deadlineDetector.scheduleDeadline(
deadline,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,19 @@

package com.hedera.pbj.grpc.helidon;

import com.hedera.pbj.runtime.grpc.GrpcException;
import com.hedera.pbj.runtime.grpc.GrpcStatus;
import static java.util.Objects.requireNonNull;

import com.hedera.pbj.runtime.grpc.Pipeline;
import com.hedera.pbj.runtime.grpc.ServiceInterface;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.helidon.common.buffers.BufferData;
import io.helidon.http.http2.Http2Flag;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2RstStream;
import io.helidon.http.http2.Http2StreamState;
import io.helidon.http.http2.Http2StreamWriter;
import io.helidon.http.http2.Http2WindowUpdate;
import io.helidon.http.http2.StreamFlowControl;
import io.helidon.webserver.http2.spi.Http2SubProtocolSelector;

import java.util.Objects;
import java.util.concurrent.Flow;

import static java.lang.System.Logger.Level.ERROR;
import static java.util.Objects.requireNonNull;

/**
* Implementation of gRPC based on PBJ. This class specifically contains the glue logic for bridging
Expand All @@ -52,6 +42,7 @@ final class PbjProtocolHandler implements Http2SubProtocolSelector.SubProtocolHa
* {@link ServiceInterface} and method to invoke, as well as metrics, and other information.
*/
private final PbjMethodRoute route;

private final Pipeline<? super Bytes> pipeline;
private final GrpcDataProcessor grpcDataProcessor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,36 @@ public SubProtocolResult subProtocol(
true, new RouteNotFoundHandler(streamWriter, streamId, currentStreamState));
}

final GrpcDataProcessorImpl grpcDataProcessor = new GrpcDataProcessorImpl(config, currentStreamState);
final HeadersProcessor headersProcessor = new HeadersProcessorImpl(
headers, streamWriter, streamId, flowControl, route, deadlineDetector, grpcDataProcessor);
final GrpcDataProcessorImpl grpcDataProcessor =
new GrpcDataProcessorImpl(config, currentStreamState);
final HeadersProcessor headersProcessor =
new HeadersProcessorImpl(
headers,
streamWriter,
streamId,
flowControl,
route,
deadlineDetector,
grpcDataProcessor);

final SendToClientSubscriber sendToClientSubscriber = new SendToClientSubscriber(
streamWriter, streamId, flowControl, route, grpcDataProcessor, headersProcessor);
final PipelineBuilder pipelineBuilder = new PipelineBuilder(
streamWriter,
streamId,
flowControl,
route,
headersProcessor.options(),
sendToClientSubscriber.subscriber(),
grpcDataProcessor,
headersProcessor);
final SendToClientSubscriber sendToClientSubscriber =
new SendToClientSubscriber(
streamWriter,
streamId,
flowControl,
route,
grpcDataProcessor,
headersProcessor);
final PipelineBuilder pipelineBuilder =
new PipelineBuilder(
streamWriter,
streamId,
flowControl,
route,
headersProcessor.options(),
sendToClientSubscriber.subscriber(),
grpcDataProcessor,
headersProcessor);

final Pipeline<? super Bytes> pipeline = pipelineBuilder.createPipeline();
grpcDataProcessor.setPipeline(pipeline);
Expand All @@ -150,10 +165,6 @@ public SubProtocolResult subProtocol(

// This is a valid call!
return new SubProtocolResult(
true,
new PbjProtocolHandler(
route,
grpcDataProcessor,
pipeline));
true, new PbjProtocolHandler(route, grpcDataProcessor, pipeline));
}
}
Loading

0 comments on commit ea87a87

Please sign in to comment.