Skip to content

Commit

Permalink
Revert "Replacing InboundMessage with NativeInboundMessage for deprec…
Browse files Browse the repository at this point in the history
…ation (opensearch-project#13126)"

This reverts commit f5c3ef9.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Aug 28, 2024
1 parent 8d17c8d commit 13a2304
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -114,7 +113,7 @@ public void aggregate(ReleasableBytesReference content) {
}
}

public NativeInboundMessage finishAggregation() throws IOException {
public InboundMessage finishAggregation() throws IOException {
ensureOpen();
final ReleasableBytesReference releasableContent;
if (isFirstContent()) {
Expand All @@ -128,7 +127,7 @@ public NativeInboundMessage finishAggregation() throws IOException {
}

final BreakerControl breakerControl = new BreakerControl(circuitBreaker);
final NativeInboundMessage aggregated = new NativeInboundMessage(currentHeader, releasableContent, breakerControl);
final InboundMessage aggregated = new InboundMessage(currentHeader, releasableContent, breakerControl);
boolean success = false;
try {
if (aggregated.getHeader().needsToReadVariableHeader()) {
Expand All @@ -143,7 +142,7 @@ public NativeInboundMessage finishAggregation() throws IOException {
if (isShortCircuited()) {
aggregated.close();
success = true;
return new NativeInboundMessage(aggregated.getHeader(), aggregationException);
return new InboundMessage(aggregated.getHeader(), aggregationException);
} else {
success = true;
return aggregated;
Expand Down
108 changes: 108 additions & 0 deletions server/src/main/java/org/opensearch/transport/InboundMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.transport;

import org.opensearch.common.annotation.DeprecatedApi;
import org.opensearch.common.bytes.ReleasableBytesReference;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;

/**
* Inbound data as a message
* This api is deprecated, please use {@link org.opensearch.transport.nativeprotocol.NativeInboundMessage} instead.
* @opensearch.api
*/
@DeprecatedApi(since = "2.14.0")
public class InboundMessage implements Releasable, ProtocolInboundMessage {

private final NativeInboundMessage nativeInboundMessage;

public InboundMessage(Header header, ReleasableBytesReference content, Releasable breakerRelease) {
this.nativeInboundMessage = new NativeInboundMessage(header, content, breakerRelease);
}

public InboundMessage(Header header, Exception exception) {
this.nativeInboundMessage = new NativeInboundMessage(header, exception);
}

public InboundMessage(Header header, boolean isPing) {
this.nativeInboundMessage = new NativeInboundMessage(header, isPing);
}

public Header getHeader() {
return this.nativeInboundMessage.getHeader();
}

public int getContentLength() {
return this.nativeInboundMessage.getContentLength();
}

public Exception getException() {
return this.nativeInboundMessage.getException();
}

public boolean isPing() {
return this.nativeInboundMessage.isPing();
}

public boolean isShortCircuit() {
return this.nativeInboundMessage.getException() != null;
}

public Releasable takeBreakerReleaseControl() {
return this.nativeInboundMessage.takeBreakerReleaseControl();
}

public StreamInput openOrGetStreamInput() throws IOException {
return this.nativeInboundMessage.openOrGetStreamInput();
}

@Override
public void close() {
this.nativeInboundMessage.close();
}

@Override
public String toString() {
return this.nativeInboundMessage.toString();
}

@Override
public String getProtocol() {
return this.nativeInboundMessage.getProtocol();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
import org.opensearch.transport.nativeprotocol.NativeOutboundHandler;

import java.io.EOFException;
Expand Down Expand Up @@ -119,7 +118,7 @@ public void messageReceived(
long slowLogThresholdMs,
TransportMessageListener messageListener
) throws IOException {
NativeInboundMessage inboundMessage = (NativeInboundMessage) message;
InboundMessage inboundMessage = (InboundMessage) message;
TransportLogger.logInboundMessage(channel, inboundMessage);
if (inboundMessage.isPing()) {
keepAlive.receiveKeepAlive(channel);
Expand All @@ -130,7 +129,7 @@ public void messageReceived(

private void handleMessage(
TcpChannel channel,
NativeInboundMessage message,
InboundMessage message,
long startTime,
long slowLogThresholdMs,
TransportMessageListener messageListener
Expand Down Expand Up @@ -202,7 +201,7 @@ private Map<String, Collection<String>> extractHeaders(Map<String, String> heade
private <T extends TransportRequest> void handleRequest(
TcpChannel channel,
Header header,
NativeInboundMessage message,
InboundMessage message,
TransportMessageListener messageListener
) throws IOException {
final String action = header.getActionName();
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,18 @@ protected void serverAcceptedChannel(TcpChannel channel) {
*/
protected abstract void stopInternal();

/**
* @deprecated use {@link #inboundMessage(TcpChannel, ProtocolInboundMessage)}
* Handles inbound message that has been decoded.
*
* @param channel the channel the message is from
* @param message the message
*/
@Deprecated(since = "2.14.0", forRemoval = true)
public void inboundMessage(TcpChannel channel, InboundMessage message) {
inboundMessage(channel, (ProtocolInboundMessage) message);
}

/**
* Handles inbound message that has been decoded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;

import java.io.IOException;

Expand All @@ -65,7 +64,7 @@ static void logInboundMessage(TcpChannel channel, BytesReference message) {
}
}

static void logInboundMessage(TcpChannel channel, NativeInboundMessage message) {
static void logInboundMessage(TcpChannel channel, InboundMessage message) {
if (logger.isTraceEnabled()) {
try {
String logMessage = format(channel, message, "READ");
Expand Down Expand Up @@ -137,7 +136,7 @@ private static String format(TcpChannel channel, BytesReference message, String
return sb.toString();
}

private static String format(TcpChannel channel, NativeInboundMessage message, String event) throws IOException {
private static String format(TcpChannel channel, InboundMessage message, String event) throws IOException {
final StringBuilder sb = new StringBuilder();
sb.append(channel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.transport.InboundAggregator;
import org.opensearch.transport.InboundBytesHandler;
import org.opensearch.transport.InboundDecoder;
import org.opensearch.transport.InboundMessage;
import org.opensearch.transport.ProtocolInboundMessage;
import org.opensearch.transport.StatsTracker;
import org.opensearch.transport.TcpChannel;
Expand All @@ -31,7 +32,7 @@
public class NativeInboundBytesHandler implements InboundBytesHandler {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
private static final NativeInboundMessage PING_MESSAGE = new NativeInboundMessage(null, true);
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);

private final ArrayDeque<ReleasableBytesReference> pending;
private final InboundDecoder decoder;
Expand Down Expand Up @@ -151,7 +152,7 @@ private void forwardFragments(
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (NativeInboundMessage aggregated = aggregator.finishAggregation()) {
try (InboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.nativeprotocol.NativeInboundMessage;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -108,7 +107,7 @@ public void testInboundAggregation() throws IOException {
}

// Signal EOS
NativeInboundMessage aggregated = aggregator.finishAggregation();
InboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(aggregated.isPing());
Expand Down Expand Up @@ -139,7 +138,7 @@ public void testInboundUnknownAction() throws IOException {
assertEquals(0, content.refCount());

// Signal EOS
NativeInboundMessage aggregated = aggregator.finishAggregation();
InboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertTrue(aggregated.isShortCircuit());
Expand All @@ -162,7 +161,7 @@ public void testCircuitBreak() throws IOException {
content1.close();

// Signal EOS
NativeInboundMessage aggregated1 = aggregator.finishAggregation();
InboundMessage aggregated1 = aggregator.finishAggregation();

assertEquals(0, content1.refCount());
assertThat(aggregated1, notNullValue());
Expand All @@ -181,7 +180,7 @@ public void testCircuitBreak() throws IOException {
content2.close();

// Signal EOS
NativeInboundMessage aggregated2 = aggregator.finishAggregation();
InboundMessage aggregated2 = aggregator.finishAggregation();

assertEquals(1, content2.refCount());
assertThat(aggregated2, notNullValue());
Expand All @@ -200,7 +199,7 @@ public void testCircuitBreak() throws IOException {
content3.close();

// Signal EOS
NativeInboundMessage aggregated3 = aggregator.finishAggregation();
InboundMessage aggregated3 = aggregator.finishAggregation();

assertEquals(1, content3.refCount());
assertThat(aggregated3, notNullValue());
Expand Down Expand Up @@ -264,7 +263,7 @@ public void testFinishAggregationWillFinishHeader() throws IOException {
content.close();

// Signal EOS
NativeInboundMessage aggregated = aggregator.finishAggregation();
InboundMessage aggregated = aggregator.finishAggregation();

assertThat(aggregated, notNullValue());
assertFalse(header.needsToReadVariableHeader());
Expand Down
Loading

0 comments on commit 13a2304

Please sign in to comment.