Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions common/src/main/java/io/netty/util/CompositeFileRegion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.netty.util;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

import io.netty.channel.FileRegion;

import org.apache.uniffle.common.netty.protocol.AbstractFileRegion;

public class CompositeFileRegion extends AbstractFileRegion {
private final FileRegion[] regions;
private long totalSize = 0;
private long bytesTransferred = 0;

public CompositeFileRegion(FileRegion... regions) {
this.regions = regions;
for (FileRegion region : regions) {
totalSize += region.count();
}
}

@Override
public long position() {
return bytesTransferred;
}

@Override
public long count() {
return totalSize;
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
long totalBytesTransferred = 0;

for (FileRegion region : regions) {
if (position >= region.count()) {
position -= region.count();
} else {
long currentBytesTransferred = region.transferTo(target, position);
totalBytesTransferred += currentBytesTransferred;
bytesTransferred += currentBytesTransferred;

if (currentBytesTransferred < region.count() - position) {
break;
}
position = 0;
}
}

return totalBytesTransferred;
}

@Override
public long transferred() {
return bytesTransferred;
}

@Override
public AbstractFileRegion retain() {
super.retain();
for (FileRegion region : regions) {
region.retain();
}
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
for (FileRegion region : regions) {
region.retain(increment);
}
return this;
}

@Override
public boolean release() {
boolean released = super.release();
for (FileRegion region : regions) {
if (!region.release()) {
released = false;
}
}
return released;
}

@Override
public boolean release(int decrement) {
boolean released = super.release(decrement);
for (FileRegion region : regions) {
if (!region.release(decrement)) {
released = false;
}
}
return released;
}

@Override
protected void deallocate() {
for (FileRegion region : regions) {
if (region instanceof AbstractReferenceCounted) {
((AbstractReferenceCounted) region).deallocate();
}
}
}

@Override
public AbstractFileRegion touch() {
super.touch();
for (FileRegion region : regions) {
region.touch();
}
return this;
}

@Override
public AbstractFileRegion touch(Object hint) {
super.touch(hint);
for (FileRegion region : regions) {
region.touch(hint);
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
public class ShuffleDataSegment {
private final long offset;
private final int length;

private final int storageId;
private final List<BufferSegment> bufferSegments;

public ShuffleDataSegment(long offset, int length, List<BufferSegment> bufferSegments) {
public ShuffleDataSegment(
long offset, int length, int storageId, List<BufferSegment> bufferSegments) {
this.offset = offset;
this.length = length;
this.storageId = storageId;
this.bufferSegments = bufferSegments;
}

Expand All @@ -46,4 +50,8 @@ public int getLength() {
public List<BufferSegment> getBufferSegments() {
return bufferSegments;
}

public int getStorageId() {
return storageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@

public class ShuffleIndexResult {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleIndexResult.class);
private static final int[] DEFAULT_STORAGE_IDS = new int[] {0};

private final ManagedBuffer buffer;
private final int[] storageIds;
private long dataFileLen;
private String dataFileName;

Expand All @@ -44,15 +46,28 @@ public ShuffleIndexResult(byte[] data, long dataFileLen) {
}

public ShuffleIndexResult(ByteBuffer data, long dataFileLen) {
this.buffer =
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
this.dataFileLen = dataFileLen;
this(
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER),
dataFileLen,
null,
DEFAULT_STORAGE_IDS);
}

public ShuffleIndexResult(ManagedBuffer buffer, long dataFileLen, String dataFileName) {
this(buffer, dataFileLen, dataFileName, DEFAULT_STORAGE_IDS);
}

public ShuffleIndexResult(
ManagedBuffer buffer, long dataFileLen, String dataFileName, int storageId) {
this(buffer, dataFileLen, dataFileName, new int[] {storageId});
}

public ShuffleIndexResult(
ManagedBuffer buffer, long dataFileLen, String dataFileName, int[] storageIds) {
this.buffer = buffer;
this.dataFileLen = dataFileLen;
this.dataFileName = dataFileName;
this.storageIds = storageIds;
}

public byte[] getData() {
Expand Down Expand Up @@ -99,4 +114,8 @@ public ManagedBuffer getManagedBuffer() {
public String getDataFileName() {
return dataFileName;
}

public int[] getStorageIds() {
return storageIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
header.writeInt(bodyLength);
in.encode(header);
if (header.writableBytes() != 0) {
throw new RssException("header's writable bytes should be 0");
throw new RssException(
"header's writable bytes should be 0, but it is " + header.writableBytes());
}

if (body != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.netty.buffer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.FileRegion;
import io.netty.util.CompositeFileRegion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A wrapper of multiple {@link FileSegmentManagedBuffer}, used for combine shuffle index files. */
public class MultiFileSegmentManagedBuffer extends ManagedBuffer {

private static final Logger LOG = LoggerFactory.getLogger(MultiFileSegmentManagedBuffer.class);
private final List<ManagedBuffer> managedBuffers;

public MultiFileSegmentManagedBuffer(List<ManagedBuffer> managedBuffers) {
this.managedBuffers = managedBuffers;
}

@Override
public int size() {
return managedBuffers.stream().mapToInt(ManagedBuffer::size).sum();
}

@Override
public ByteBuf byteBuf() {
return Unpooled.wrappedBuffer(this.nioByteBuffer());
}

@Override
public ByteBuffer nioByteBuffer() {
ByteBuffer merged = ByteBuffer.allocate(size());
for (ManagedBuffer managedBuffer : managedBuffers) {
ByteBuffer buffer = managedBuffer.nioByteBuffer();
merged.put(buffer.slice());
}
merged.flip();
return merged;
}

@Override
public ManagedBuffer retain() {
return this;
}

@Override
public ManagedBuffer release() {
return this;
}

@Override
public Object convertToNetty() {
List<FileRegion> fileRegions = new ArrayList<>(managedBuffers.size());
for (ManagedBuffer managedBuffer : managedBuffers) {
Object object = managedBuffer.convertToNetty();
if (object instanceof FileRegion) {
fileRegions.add((FileRegion) object);
}
}
return new CompositeFileRegion(fileRegions.toArray(new FileRegion[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class GetLocalShuffleDataRequest extends RequestMessage {
private long offset;
private int length;
private long timestamp;
private int storageId;

public GetLocalShuffleDataRequest(
long requestId,
Expand All @@ -41,6 +42,30 @@ public GetLocalShuffleDataRequest(
long offset,
int length,
long timestamp) {
this(
requestId,
appId,
shuffleId,
partitionId,
partitionNumPerRange,
partitionNum,
offset,
length,
-1,
timestamp);
}

protected GetLocalShuffleDataRequest(
long requestId,
String appId,
int shuffleId,
int partitionId,
int partitionNumPerRange,
int partitionNum,
long offset,
int length,
int storageId,
long timestamp) {
super(requestId);
this.appId = appId;
this.shuffleId = shuffleId;
Expand All @@ -49,6 +74,7 @@ public GetLocalShuffleDataRequest(
this.partitionNum = partitionNum;
this.offset = offset;
this.length = length;
this.storageId = storageId;
this.timestamp = timestamp;
}

Expand Down Expand Up @@ -132,6 +158,10 @@ public long getTimestamp() {
return timestamp;
}

public int getStorageId() {
return storageId;
}

@Override
public String getOperationType() {
return "getLocalShuffleData";
Expand Down
Loading