Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22561 Get rid of ByteString in messages #3987

Merged
merged 4 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.util;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -59,6 +60,9 @@ public final class ArrayUtils {
/** Empty string array. */
public static final String[] STRING_EMPTY_ARRAY = new String[0];

/** Empty array-based byte buffer. Not read-only. */
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(BYTE_EMPTY_ARRAY);

/** {@code byte} array factory. */
public static final ArrayFactory<byte[]> BYTE_ARRAY = len -> {
if (len < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteUuid;
Expand Down Expand Up @@ -66,6 +67,7 @@
* <li>{@link UUID};</li>
* <li>{@link IgniteUuid};</li>
* <li>{@link BitSet};</li>
* <li>{@link ByteBuffer};</li>
* <li>Nested {@code NetworkMessage};</li>
* <li>Array of primitive types, corresponding boxed types or other directly marshallable types;</li>
* <li>{@code Collection} of boxed primitive types or other directly marshallable types;</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.ignite.internal.network.direct;

import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;

import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
Expand All @@ -32,17 +34,13 @@
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.network.serialization.MessageWriter;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;

/**
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
/** Empty array-based byte buffer. Not read-only. */
public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(ArrayUtils.BYTE_EMPTY_ARRAY);

/** State. */
private final DirectMessageState<StateItem> state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.internal.raft.util;

import static org.apache.ignite.internal.network.direct.DirectMessageWriter.EMPTY_BYTE_BUFFER;
import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
Expand Down Expand Up @@ -119,7 +120,6 @@
import org.apache.ignite.raft.jraft.storage.SnapshotExecutor;
import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
Expand Down Expand Up @@ -2258,7 +2258,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
ByteBuffer allData = request.data() != null ? request.data().asReadOnlyByteBuffer() : ByteString.EMPTY.asReadOnlyByteBuffer();
ByteBuffer allData = request.data() != null ? request.data().asReadOnlyBuffer() : EMPTY_BYTE_BUFFER.asReadOnlyBuffer();

final Collection<RaftOutter.EntryMeta> entriesList = request.entriesList();
for (RaftOutter.EntryMeta entry : entriesList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.ignite.raft.jraft.core;

import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -39,7 +41,6 @@
import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.ReadIndexState;
import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
import org.apache.ignite.raft.jraft.error.OverloadException;
Expand All @@ -51,7 +52,6 @@
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Bytes;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
Expand Down Expand Up @@ -232,11 +232,11 @@ private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
.serverId(this.node.getServerId().toString());

List<ReadIndexState> states = new ArrayList<>(events.size());
List<ByteString> entries = new ArrayList<>(events.size());
List<ByteBuffer> entries = new ArrayList<>(events.size());

for (ReadIndexEvent event : events) {
byte[] ctx = event.requestContext.get();
entries.add(ctx == null ? ByteString.EMPTY : new ByteString(ctx));
entries.add(ctx == null ? EMPTY_BYTE_BUFFER : ByteBuffer.wrap(ctx));
states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.codahale.metrics.MetricRegistry.name;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.apache.ignite.raft.jraft.rpc.RaftClientService;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
Expand All @@ -65,7 +65,6 @@
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Recyclable;
import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList;
Expand Down Expand Up @@ -782,7 +781,7 @@ public void run(final Status status) {
else {
// No entries and has empty data means a probe request.
// TODO refactor, adds a new flag field? https://issues.apache.org/jira/browse/IGNITE-14832
rb.data(ByteString.EMPTY);
rb.data(EMPTY_BYTE_BUFFER);
request = rb.build();
// Sending a probe request.
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
Expand Down Expand Up @@ -1402,8 +1401,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
if (request.entriesList() != null) {
r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
r.nodeMetrics.recordSize("replicate-entries-count", request.entriesList().size());
r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().size()
: 0);
r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().capacity() : 0);
}

final boolean isLogDebugEnabled = LOG.isDebugEnabled();
Expand Down Expand Up @@ -1667,7 +1665,7 @@ private boolean sendEntries(final long nextSendingIndex) {
}
final ByteBuffer buf = dataBuf.getBuffer();
buf.flip();
rb.data(new ByteString(buf));
rb.data(buf);
}
}
finally {
Expand Down Expand Up @@ -1716,7 +1714,7 @@ public void run(final Status status) {
ThrowUtil.throwException(t);
}
addInflight(RequestType.AppendEntries, nextSendingIndex, Utils.size(request.entriesList()),
request.data() == null ? 0 : request.data().size(), seq, rpcFuture);
request.data() == null ? 0 : request.data().capacity(), seq, rpcFuture);

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import org.apache.ignite.internal.hlc.HybridTimestamp;
Expand All @@ -29,7 +30,6 @@
import org.apache.ignite.raft.jraft.entity.RaftOutter.EntryMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.jetbrains.annotations.Nullable;

public final class RpcRequests {
Expand Down Expand Up @@ -158,7 +158,7 @@ public interface RequestVoteResponse extends Message {
boolean granted();
}

@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST)
@Transferable(RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST)
public interface AppendEntriesRequest extends Message {
String groupId();

Expand All @@ -177,9 +177,7 @@ public interface AppendEntriesRequest extends Message {

long committedIndex();

@Nullable
@Marshallable
ByteString data();
@Nullable ByteBuffer data();

long timestampLong();

Expand Down Expand Up @@ -216,26 +214,23 @@ public interface GetFileRequest extends Message {
boolean readPartly();
}

@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE)
@Transferable(RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE)
public interface GetFileResponse extends Message {
boolean eof();

long readSize();

@Marshallable
ByteString data();
ByteBuffer data();
}

@Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST)
@Transferable(RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST)
public interface ReadIndexRequest extends Message {
String groupId();

@Nullable
String serverId();

@Nullable
@Marshallable
List<ByteString> entriesList();
@Nullable List<ByteBuffer> entriesList();

@Nullable
String peerId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.storage;

import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -34,7 +35,6 @@
import org.apache.ignite.raft.jraft.rpc.RpcRequests.GetFileRequest;
import org.apache.ignite.raft.jraft.storage.io.FileReader;
import org.apache.ignite.raft.jraft.util.ByteBufferCollector;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Utils;

Expand Down Expand Up @@ -110,11 +110,11 @@ public Message handleGetFile(final GetFileRequest request, final RpcRequestClosu
buf.flip();
if (!buf.hasRemaining()) {
// skip empty data
responseBuilder.data(ByteString.EMPTY);
responseBuilder.data(EMPTY_BYTE_BUFFER);
}
else {
// TODO check hole https://issues.apache.org/jira/browse/IGNITE-14832
responseBuilder.data(new ByteString(buf));
responseBuilder.data(buf);
}
return responseBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.storage.snapshot.remote;

import static org.apache.ignite.raft.jraft.util.BytesUtil.writeTo;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -242,7 +243,7 @@ void onRpcReturned(final Status status, final GetFileResponse response) {
}
if (this.outputStream != null) {
try {
response.data().writeTo(this.outputStream);
writeTo(this.outputStream, response.data());
}
catch (final IOException e) {
LOG.error("Fail to write into file {}", this.destPath, e);
Expand All @@ -252,7 +253,7 @@ void onRpcReturned(final Status status, final GetFileResponse response) {
}
}
else {
this.destBuf.put(response.data().asReadOnlyByteBuffer());
this.destBuf.put(response.data().asReadOnlyBuffer());
}
if (response.eof()) {
onFinished();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,6 @@ public static String unsafeDecode(final byte[] in) {
return unsafeDecode(in, 0, in.length);
}

public static String unsafeDecode(final ByteString in) {
final int len = in.size();
final char[] out = new char[len];
for (int i = 0; i < len; i++) {
out[i] = (char) (in.byteAt(i) & 0xFF);
}
return moveToString(out);
}

private AsciiStringUtil() {
}

Expand Down
Loading