From 2fc9505c35df76ff0195168fb6a9d46d022635fd Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 26 Jun 2024 10:48:04 +0300 Subject: [PATCH 1/2] IGNITE-22561 wip --- .../ignite/internal/util/ArrayUtils.java | 4 + .../network/annotations/Transferable.java | 2 + .../network/direct/DirectMessageWriter.java | 6 +- .../raft/util/OptimizedMarshaller.java | 2 +- .../ignite/raft/jraft/core/NodeImpl.java | 4 +- .../raft/jraft/core/ReadOnlyServiceImpl.java | 8 +- .../ignite/raft/jraft/core/Replicator.java | 12 +- .../ignite/raft/jraft/rpc/RpcRequests.java | 19 ++-- .../raft/jraft/storage/FileService.java | 6 +- .../storage/snapshot/remote/CopySession.java | 5 +- .../raft/jraft/util/AsciiStringUtil.java | 9 -- .../ignite/raft/jraft/util/ByteString.java | 103 ------------------ .../ignite/raft/jraft/util/BytesUtil.java | 30 +++++ .../jraft/util/RecyclableByteBufferList.java | 4 +- .../raft/jraft/core/ReadOnlyServiceTest.java | 11 +- .../raft/jraft/core/ReplicatorGroupTest.java | 4 +- .../raft/jraft/core/ReplicatorTest.java | 28 ++--- .../jraft/rpc/AppendEntriesBenchmark.java | 7 +- .../raft/jraft/storage/FileServiceTest.java | 5 +- .../jraft/storage/SnapshotExecutorTest.java | 10 +- .../local/LocalSnapshotCopierTest.java | 5 +- .../snapshot/remote/CopySessionTest.java | 6 +- .../CheckCatalogVersionOnAppendEntries.java | 2 +- 23 files changed, 104 insertions(+), 188 deletions(-) delete mode 100644 modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteString.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java index 8bd8cfdac49..35e3f90cc99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -59,6 +60,9 @@ public final class ArrayUtils { /** Empty string array. */ public static final String[] STRING_EMPTY_ARRAY = new String[0]; + /** Empty array-based byte buffer. Not read-only. */ + public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(BYTE_EMPTY_ARRAY); + /** {@code byte} array factory. */ public static final ArrayFactory BYTE_ARRAY = len -> { if (len < 0) { diff --git a/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java b/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java index e0f28f44e18..ff33271612f 100644 --- a/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java +++ b/modules/network-api/src/main/java/org/apache/ignite/internal/network/annotations/Transferable.java @@ -21,6 +21,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.nio.ByteBuffer; import java.util.BitSet; import java.util.UUID; import org.apache.ignite.internal.lang.IgniteUuid; @@ -66,6 +67,7 @@ *
  • {@link UUID};
  • *
  • {@link IgniteUuid};
  • *
  • {@link BitSet};
  • + *
  • {@link ByteBuffer};
  • *
  • Nested {@code NetworkMessage};
  • *
  • Array of primitive types, corresponding boxed types or other directly marshallable types;
  • *
  • {@code Collection} of boxed primitive types or other directly marshallable types;
  • diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java index 9c2b93f9911..d26661d92d6 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.network.direct; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; + import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; @@ -32,7 +34,6 @@ import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.network.serialization.MessageWriter; -import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.jetbrains.annotations.Nullable; @@ -40,9 +41,6 @@ * Message writer implementation. */ public class DirectMessageWriter implements MessageWriter { - /** Empty array-based byte buffer. Not read-only. */ - public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(ArrayUtils.BYTE_EMPTY_ARRAY); - /** State. */ private final DirectMessageState state; diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java index 36bea3c8348..4afa1bfb155 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.raft.util; -import static org.apache.ignite.internal.network.direct.DirectMessageWriter.EMPTY_BYTE_BUFFER; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import java.nio.ByteBuffer; import java.nio.ByteOrder; diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 007e9387f41..57202200b5a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -19,6 +19,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.RingBuffer; @@ -119,7 +120,6 @@ import org.apache.ignite.raft.jraft.storage.SnapshotExecutor; import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.Describer; import org.apache.ignite.raft.jraft.util.DisruptorMetricSet; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; @@ -2258,7 +2258,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi // Parse request long index = prevLogIndex; final List entries = new ArrayList<>(entriesCount); - ByteBuffer allData = request.data() != null ? request.data().asReadOnlyByteBuffer() : ByteString.EMPTY.asReadOnlyByteBuffer(); + ByteBuffer allData = request.data() != null ? request.data().asReadOnlyBuffer() : EMPTY_BYTE_BUFFER.asReadOnlyBuffer(); final Collection entriesList = request.entriesList(); for (RaftOutter.EntryMeta entry : entriesList) { diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java index 4914833ce97..19df46b7731 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java @@ -16,10 +16,12 @@ */ package org.apache.ignite.raft.jraft.core; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.RingBuffer; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -39,7 +41,6 @@ import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType; import org.apache.ignite.raft.jraft.disruptor.NodeIdAware; import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; -import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.ReadIndexState; import org.apache.ignite.raft.jraft.entity.ReadIndexStatus; import org.apache.ignite.raft.jraft.error.OverloadException; @@ -51,7 +52,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest; import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse; import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.Bytes; import org.apache.ignite.raft.jraft.util.DisruptorMetricSet; import org.apache.ignite.raft.jraft.util.OnlyForTest; @@ -232,11 +232,11 @@ private void executeReadIndexEvents(final List events) { .serverId(this.node.getServerId().toString()); List states = new ArrayList<>(events.size()); - List entries = new ArrayList<>(events.size()); + List entries = new ArrayList<>(events.size()); for (ReadIndexEvent event : events) { byte[] ctx = event.requestContext.get(); - entries.add(ctx == null ? ByteString.EMPTY : new ByteString(ctx)); + entries.add(ctx == null ? EMPTY_BYTE_BUFFER : ByteBuffer.wrap(ctx)); states.add(new ReadIndexState(event.requestContext, event.done, event.startTime)); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java index ea78b6db4ee..6a760a95efa 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java @@ -19,6 +19,7 @@ import static com.codahale.metrics.MetricRegistry.name; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; @@ -56,7 +57,6 @@ import org.apache.ignite.raft.jraft.rpc.RaftClientService; import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest; import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse; -import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse; import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest; import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotResponse; import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest; @@ -65,7 +65,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; import org.apache.ignite.raft.jraft.util.ByteBufferCollector; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.OnlyForTest; import org.apache.ignite.raft.jraft.util.Recyclable; import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList; @@ -782,7 +781,7 @@ public void run(final Status status) { else { // No entries and has empty data means a probe request. // TODO refactor, adds a new flag field? https://issues.apache.org/jira/browse/IGNITE-14832 - rb.data(ByteString.EMPTY); + rb.data(EMPTY_BYTE_BUFFER); request = rb.build(); // Sending a probe request. this.statInfo.runningState = RunningState.APPENDING_ENTRIES; @@ -1402,8 +1401,7 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight if (request.entriesList() != null) { r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime); r.nodeMetrics.recordSize("replicate-entries-count", request.entriesList().size()); - r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().size() - : 0); + r.nodeMetrics.recordSize("replicate-entries-bytes", request.data() != null ? request.data().capacity() : 0); } final boolean isLogDebugEnabled = LOG.isDebugEnabled(); @@ -1667,7 +1665,7 @@ private boolean sendEntries(final long nextSendingIndex) { } final ByteBuffer buf = dataBuf.getBuffer(); buf.flip(); - rb.data(new ByteString(buf)); + rb.data(buf); } } finally { @@ -1716,7 +1714,7 @@ public void run(final Status status) { ThrowUtil.throwException(t); } addInflight(RequestType.AppendEntries, nextSendingIndex, Utils.size(request.entriesList()), - request.data() == null ? 0 : request.data().size(), seq, rpcFuture); + request.data() == null ? 0 : request.data().capacity(), seq, rpcFuture); return true; } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java index e6a9d2379a9..ca37da3ce5f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java @@ -19,6 +19,7 @@ import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp; import static org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -29,7 +30,6 @@ import org.apache.ignite.raft.jraft.entity.RaftOutter.EntryMeta; import org.apache.ignite.raft.jraft.error.RaftError; import org.apache.ignite.raft.jraft.rpc.impl.SMThrowable; -import org.apache.ignite.raft.jraft.util.ByteString; import org.jetbrains.annotations.Nullable; public final class RpcRequests { @@ -158,7 +158,7 @@ public interface RequestVoteResponse extends Message { boolean granted(); } - @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST) + @Transferable(RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST) public interface AppendEntriesRequest extends Message { String groupId(); @@ -177,9 +177,7 @@ public interface AppendEntriesRequest extends Message { long committedIndex(); - @Nullable - @Marshallable - ByteString data(); + @Nullable ByteBuffer data(); long timestampLong(); @@ -216,26 +214,23 @@ public interface GetFileRequest extends Message { boolean readPartly(); } - @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE) + @Transferable(RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE) public interface GetFileResponse extends Message { boolean eof(); long readSize(); - @Marshallable - ByteString data(); + ByteBuffer data(); } - @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST) + @Transferable(RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST) public interface ReadIndexRequest extends Message { String groupId(); @Nullable String serverId(); - @Nullable - @Marshallable - List entriesList(); + @Nullable List entriesList(); @Nullable String peerId(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/FileService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/FileService.java index 111eda38293..6f3d76f2e47 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/FileService.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/FileService.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.storage; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; @@ -34,7 +35,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.GetFileRequest; import org.apache.ignite.raft.jraft.storage.io.FileReader; import org.apache.ignite.raft.jraft.util.ByteBufferCollector; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.OnlyForTest; import org.apache.ignite.raft.jraft.util.Utils; @@ -110,11 +110,11 @@ public Message handleGetFile(final GetFileRequest request, final RpcRequestClosu buf.flip(); if (!buf.hasRemaining()) { // skip empty data - responseBuilder.data(ByteString.EMPTY); + responseBuilder.data(EMPTY_BYTE_BUFFER); } else { // TODO check hole https://issues.apache.org/jira/browse/IGNITE-14832 - responseBuilder.data(new ByteString(buf)); + responseBuilder.data(buf); } return responseBuilder.build(); } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java index fe4d756d1c2..f235505b068 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySession.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.storage.snapshot.remote; +import static org.apache.ignite.raft.jraft.util.BytesUtil.writeTo; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -242,7 +243,7 @@ void onRpcReturned(final Status status, final GetFileResponse response) { } if (this.outputStream != null) { try { - response.data().writeTo(this.outputStream); + writeTo(this.outputStream, response.data()); } catch (final IOException e) { LOG.error("Fail to write into file {}", this.destPath, e); @@ -252,7 +253,7 @@ void onRpcReturned(final Status status, final GetFileResponse response) { } } else { - this.destBuf.put(response.data().asReadOnlyByteBuffer()); + this.destBuf.put(response.data().asReadOnlyBuffer()); } if (response.eof()) { onFinished(); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java index a3556f3ab65..0fb8134fd1a 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java @@ -48,15 +48,6 @@ public static String unsafeDecode(final byte[] in) { return unsafeDecode(in, 0, in.length); } - public static String unsafeDecode(final ByteString in) { - final int len = in.size(); - final char[] out = new char[len]; - for (int i = 0; i < len; i++) { - out[i] = (char) (in.byteAt(i) & 0xFF); - } - return moveToString(out); - } - private AsciiStringUtil() { } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteString.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteString.java deleted file mode 100644 index 1ed74407369..00000000000 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/ByteString.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ignite.raft.jraft.util; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; - -// TODO asch readResolve for empty string. Get rid and use utility class class ByteArray ? IGNITE-14832 -public class ByteString implements Externalizable { - public static final ByteString EMPTY = new ByteString(ByteBuffer.wrap(new byte[0])); - - private ByteBuffer buf; - - public ByteString() { - // Externalizable. - } - - public ByteString(ByteBuffer buf) { - this.buf = buf; - } - - public ByteString(byte[] bytes) { - this.buf = ByteBuffer.wrap(bytes); - } - - public int size() { - return buf.capacity(); - } - - public ByteBuffer asReadOnlyByteBuffer() { - return buf.asReadOnlyBuffer(); - } - - public byte byteAt(int pos) { - return buf.get(pos); - } - - public void writeTo(OutputStream outputStream) throws IOException { - WritableByteChannel channel = Channels.newChannel(outputStream); - - channel.write(buf); - } - - public byte[] toByteArray() { - byte[] arr = new byte[buf.remaining()]; - buf.get(arr); - buf.flip(); - return arr; - } - - public ByteString copy() { - return this == EMPTY ? EMPTY : new ByteString(toByteArray()); - } - - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - ByteString that = (ByteString) o; - - return buf.equals(that.buf); - } - - @Override public int hashCode() { - return buf.hashCode(); - } - - @Override public void writeExternal(ObjectOutput out) throws IOException { - byte[] bytes = toByteArray(); - out.writeInt(bytes.length); - out.write(bytes); - } - - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int len = in.readInt(); - byte[] data = new byte[len]; - in.readFully(data); - - buf = ByteBuffer.wrap(data); - } -} diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java index 5f3b282bd2f..55eb506b967 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java @@ -16,7 +16,12 @@ */ package org.apache.ignite.raft.jraft.util; +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.util.Comparator; /** @@ -174,6 +179,31 @@ public static byte[] hexStringToByteArray(final String s) { return bytes; } + /** + * Converts a byte buffer to a byte array. + * + * @param buf Byte buffer from which we copy bytes. + */ + public static byte[] toByteArray(ByteBuffer buf) { + byte[] arr = new byte[buf.remaining()]; + buf.get(arr); + buf.flip(); + return arr; + } + + /** + * Writes a byte buffer to the output stream. + * + * @param outputStream Output stream into which bytes will be written. + * @param buf Byte buffer from which bytes are to be retrieved. + * @throws IOException When writing to the output stream. + */ + public static void writeTo(OutputStream outputStream, ByteBuffer buf) throws IOException { + WritableByteChannel channel = Channels.newChannel(outputStream); + + channel.write(buf); + } + private BytesUtil() { } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferList.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferList.java index ef6e1cddfb1..6d922d474b6 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferList.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/RecyclableByteBufferList.java @@ -55,10 +55,10 @@ public static RecyclableByteBufferList newInstance(final int minCapacity) { * * @param buffers Buffers. */ - public static ByteString concatenate(List buffers) { + public static ByteBuffer concatenate(List buffers) { final ByteBuffer combined = ByteBuffer.allocate(buffers.stream().mapToInt(Buffer::remaining).sum()); buffers.stream().forEach(b -> combined.put(b.duplicate())); - return new ByteString(combined); + return combined; } public int getCapacity() { diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java index be3e148b3ca..b5875d59703 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.core; +import static org.apache.ignite.raft.jraft.util.BytesUtil.toByteArray; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -139,7 +140,7 @@ public void run(final Status status, final long index, final byte[] reqCtx) { final ReadIndexRequest req = (ReadIndexRequest) argument; return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId()) && Utils.size(req.entriesList()) == 1 - && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray()); + && Arrays.equals(requestContext, toByteArray(req.entriesList().get(0))); } return false; } @@ -171,7 +172,7 @@ public void run(final Status status, final long index, final byte[] reqCtx) { final ReadIndexRequest req = (ReadIndexRequest) argument; return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId()) && Utils.size(req.entriesList()) == 1 - && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray()); + && Arrays.equals(requestContext, toByteArray(req.entriesList().get(0))); } return false; } @@ -216,7 +217,7 @@ public void run(final Status status, final long index, final byte[] reqCtx) { final ReadIndexRequest req = (ReadIndexRequest) argument; return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId()) && Utils.size(req.entriesList()) == 1 - && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray()); + && Arrays.equals(requestContext, toByteArray(req.entriesList().get(0))); } return false; } @@ -259,7 +260,7 @@ public void run(final Status status, final long index, final byte[] reqCtx) { final ReadIndexRequest req = (ReadIndexRequest) argument; return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId()) && Utils.size(req.entriesList()) == 1 - && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray()); + && Arrays.equals(requestContext, toByteArray(req.entriesList().get(0))); } return false; } @@ -332,7 +333,7 @@ public boolean matches(ReadIndexRequest argument) { ReadIndexRequest req = (ReadIndexRequest) argument; return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId()) && Utils.size(req.entriesList()) == 1 - && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray()); + && Arrays.equals(requestContext, toByteArray(req.entriesList().get(0))); } return false; } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupTest.java index c82bdda64f2..e0ddd4ae367 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorGroupTest.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.core; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -39,7 +40,6 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.storage.LogManager; import org.apache.ignite.raft.jraft.storage.SnapshotStorage; -import org.apache.ignite.raft.jraft.util.ByteString; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -284,7 +284,7 @@ private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestToPeer(final P .prevLogIndex(10) .prevLogTerm(1) .committedIndex(0) - .data(ByteString.EMPTY) + .data(EMPTY_BYTE_BUFFER) .build(); } } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java index 69f4df5a4b9..798db78e072 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReplicatorTest.java @@ -16,6 +16,16 @@ */ package org.apache.ignite.raft.jraft.core; +import static org.apache.ignite.internal.util.ArrayUtils.EMPTY_BYTE_BUFFER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -48,7 +58,6 @@ import org.apache.ignite.raft.jraft.storage.LogManager; import org.apache.ignite.raft.jraft.storage.SnapshotStorage; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.ThreadId; import org.apache.ignite.raft.jraft.util.Utils; @@ -65,15 +74,6 @@ import org.mockito.quality.Strictness; import org.mockito.stubbing.Answer; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.same; - @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class ReplicatorTest extends BaseIgniteAbstractTest { @@ -151,7 +151,7 @@ private RpcRequests.AppendEntriesRequest createEmptyEntriesRequest(final boolean .prevLogTerm(1) .committedIndex(0); if (!isHeartbeat) { - rb.data(ByteString.EMPTY); + rb.data(EMPTY_BYTE_BUFFER); } return rb.build(); } @@ -294,7 +294,7 @@ public void testOnRpcReturnedMoreLogs() { .peerId(this.peerId.toString()) .term(1) .prevLogIndex(9) - .data(ByteString.EMPTY) + .data(EMPTY_BYTE_BUFFER) .prevLogTerm(1) .committedIndex(0) .build(); @@ -336,7 +336,7 @@ public void testOnRpcReturnedLessLogs() { .term(1) .prevLogIndex(8) .prevLogTerm(1) - .data(ByteString.EMPTY) + .data(EMPTY_BYTE_BUFFER) .committedIndex(0) .build(); Mockito.when(this.rpcService.appendEntries(eq(this.peerId), eq(newReq), eq(-1), Mockito.any())) @@ -464,7 +464,7 @@ public void testContinueSendingEntries() throws Exception { rb.entriesList(entries); - rb.data(new ByteString(new byte[totalDataLen])); + rb.data(ByteBuffer.wrap(new byte[totalDataLen])); final RpcRequests.AppendEntriesRequest request = rb.build(); Mockito.when(this.rpcService.appendEntries(eq(this.peerId), eq(request), eq(-1), Mockito.any())) diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java index edd6145bb3a..f29f43922aa 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AppendEntriesBenchmark.java @@ -22,7 +22,6 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.util.AdaptiveBufAllocator; import org.apache.ignite.raft.jraft.util.ByteBufferCollector; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.JDKMarshaller; import org.apache.ignite.raft.jraft.util.RecyclableByteBufferList; import org.apache.ignite.raft.jraft.util.RecycleUtil; @@ -170,7 +169,7 @@ private byte[] sendEntries1() { } final ByteBuffer buf = dataBuffer.getBuffer(); buf.flip(); - rb.data(new ByteString(buf)); + rb.data(buf); return JDKMarshaller.INSTANCE.marshall(rb.build()); } @@ -187,7 +186,7 @@ private byte[] sendEntries2() { } final ByteBuffer buf = dataBuffer.getBuffer(); buf.flip(); - rb.data(new ByteString(buf)); + rb.data(buf); return JDKMarshaller.INSTANCE.marshall(rb.build()); } finally { @@ -210,7 +209,7 @@ private byte[] sendEntries3() { buf.flip(); final int remaining = buf.remaining(); handleThreadLocal.get().record(remaining); - rb.data(new ByteString(buf)); + rb.data(buf); return JDKMarshaller.INSTANCE.marshall(rb.build()); } finally { diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/FileServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/FileServiceTest.java index 3655f36cb6e..38783968c59 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/FileServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/FileServiceTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.raft.jraft.storage; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.ignite.raft.jraft.util.BytesUtil.toByteArray; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -123,7 +124,7 @@ public void testGetFileData() throws IOException { assertTrue(msg instanceof RpcRequests.GetFileResponse); RpcRequests.GetFileResponse response = (RpcRequests.GetFileResponse) msg; assertTrue(response.eof()); - assertEquals("jraft is great!", new String(response.data().toByteArray(), UTF_8)); + assertEquals("jraft is great!", new String(toByteArray(response.data()), UTF_8)); assertEquals(-1, response.readSize()); } @@ -155,7 +156,7 @@ public void testGetLargeFileData() throws IOException { assertTrue(msg instanceof RpcRequests.GetFileResponse); final RpcRequests.GetFileResponse response = (RpcRequests.GetFileResponse) msg; final byte[] sourceArray = data.getBytes(UTF_8); - final byte[] respData = response.data().toByteArray(); + final byte[] respData = toByteArray(response.data()); final int length = sourceArray.length; int offset = 0; diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java index f5f0f139f61..bc5a438675e 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.storage; +import static org.apache.ignite.raft.jraft.util.BytesUtil.toByteArray; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -67,7 +68,6 @@ import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotWriter; import org.apache.ignite.raft.jraft.test.MockAsyncContext; import org.apache.ignite.raft.jraft.test.TestUtils; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.Utils; import org.junit.jupiter.api.AfterEach; @@ -221,7 +221,7 @@ public void run() { RpcResponseClosure closure = argument.getValue(); final ByteBuffer metaBuf = this.table.saveToByteBufferAsRemote(); closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(metaBuf.remaining()).eof(true) - .data(new ByteString(metaBuf).copy()).build()); + .data(ByteBuffer.wrap(toByteArray(metaBuf))).build()); //mock get file argument = ArgumentCaptor.forClass(RpcResponseClosure.class); @@ -235,7 +235,7 @@ public void run() { Thread.sleep(500); closure = argument.getValue(); closure.setResponse(raftOptions.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true) - .data(new ByteString(new byte[100]).copy()).build()); + .data(ByteBuffer.wrap(new byte[100])).build()); final ArgumentCaptor loadSnapshotArg = ArgumentCaptor.forClass(LoadSnapshotClosure.class); Mockito.when(this.fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true); @@ -293,7 +293,7 @@ public void testInstallSnapshot() throws Exception { RpcResponseClosure closure = argument.getValue(); final ByteBuffer metaBuf = table.saveToByteBufferAsRemote(); closure.setResponse(msgFactory.getFileResponse().readSize(metaBuf.remaining()).eof(true) - .data(new ByteString(metaBuf)).build()); + .data(metaBuf).build()); // Mock get file argument = ArgumentCaptor.forClass(RpcResponseClosure.class); @@ -309,7 +309,7 @@ public void testInstallSnapshot() throws Exception { closure = argument.getValue(); closure.setResponse(msgFactory.getFileResponse().readSize(100).eof(true) - .data(new ByteString(new byte[100])).build()); + .data(ByteBuffer.wrap(new byte[100])).build()); ArgumentCaptor loadSnapshotArg = ArgumentCaptor.forClass(LoadSnapshotClosure.class); Mockito.when(fSMCaller.onSnapshotLoad(loadSnapshotArg.capture())).thenReturn(true); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java index 84a8f4240e2..e1caeb5a28f 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotCopierTest.java @@ -44,7 +44,6 @@ import org.apache.ignite.raft.jraft.storage.snapshot.Snapshot; import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; import org.apache.ignite.raft.jraft.test.TestUtils; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.Utils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -196,7 +195,7 @@ public void testStartJoinFinishOK() throws Exception { .getFileResponse() .readSize(metaBuf.remaining()) .eof(true) - .data(new ByteString(metaBuf)) + .data(metaBuf) .build(); closure.setResponse(response); @@ -218,7 +217,7 @@ public void testStartJoinFinishOK() throws Exception { .getFileResponse() .readSize(100) .eof(true) - .data(new ByteString(new byte[100])) + .data(ByteBuffer.wrap(new byte[100])) .build(); closure.setResponse(response); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java index 00352b580ad..7136af28e89 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/remote/CopySessionTest.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.storage.snapshot.remote; +import java.nio.ByteBuffer; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -33,7 +34,6 @@ import org.apache.ignite.raft.jraft.rpc.RaftClientService; import org.apache.ignite.raft.jraft.rpc.RpcRequests; import org.apache.ignite.raft.jraft.util.ByteBufferCollector; -import org.apache.ignite.raft.jraft.util.ByteString; import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.Utils; import org.junit.jupiter.api.AfterEach; @@ -115,7 +115,7 @@ public void testOnRpcReturnedEOF() throws Exception { this.session.setDestBuf(bufRef); this.session.onRpcReturned(Status.OK(), raftOpts.getRaftMessagesFactory().getFileResponse().readSize(100).eof(true) - .data(new ByteString(new byte[100])).build()); + .data(ByteBuffer.wrap(new byte[100])).build()); assertEquals(100, bufRef.capacity()); //should be flip assertEquals(0, bufRef.getBuffer().position()); @@ -147,7 +147,7 @@ public void testOnRpcReturnedOK() { .thenReturn(future); this.session.onRpcReturned(Status.OK(), raftOpts.getRaftMessagesFactory().getFileResponse().readSize(100).eof(false) - .data(new ByteString(new byte[100])).build()); + .data(ByteBuffer.wrap(new byte[100])).build()); assertEquals(100, bufRef.capacity()); assertEquals(100, bufRef.getBuffer().position()); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java index 519b833c8da..ae66da9cdb6 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/CheckCatalogVersionOnAppendEntries.java @@ -59,7 +59,7 @@ public CheckCatalogVersionOnAppendEntries(CatalogService catalogService) { Node node = (Node) service; - ByteBuffer allData = request.data().asReadOnlyByteBuffer(); + ByteBuffer allData = request.data().asReadOnlyBuffer(); int offset = 0; for (RaftOutter.EntryMeta entry : request.entriesList()) { From ec7f4dce8c33f2bddf00d0920b8e2dc1c95ad78d Mon Sep 17 00:00:00 2001 From: Kirill Tkalenko Date: Wed, 26 Jun 2024 15:05:57 +0300 Subject: [PATCH 2/2] IGNITE-22561 after review --- .../java/org/apache/ignite/raft/jraft/util/BytesUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java index 55eb506b967..2914d81f96f 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/BytesUtil.java @@ -194,9 +194,9 @@ public static byte[] toByteArray(ByteBuffer buf) { /** * Writes a byte buffer to the output stream. * - * @param outputStream Output stream into which bytes will be written. - * @param buf Byte buffer from which bytes are to be retrieved. - * @throws IOException When writing to the output stream. + * @param outputStream Output stream to write bytes into. + * @param buf Byte buffer to retrieve bytes from. + * @throws IOException If writing to the output stream failed. */ public static void writeTo(OutputStream outputStream, ByteBuffer buf) throws IOException { WritableByteChannel channel = Channels.newChannel(outputStream);