Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import java.io.File;
import java.util.List;
import java.util.Map;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.server.ShuffleServerConf.RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* This class is to test the {@link org.apache.uniffle.server.storage.local.ChainableLocalStorageSelector} multiple
* disk selection strategy for reader.
*/
public class ShuffleClientWithLocalMultiDiskTest extends ShuffleReadWriteBase {

private static ShuffleServerInfo shuffleServerInfo;
private ShuffleWriteClientImpl shuffleWriteClientImpl;
private ShuffleReadClientImpl shuffleReadClientImpl;

@BeforeAll
public static void setupShuffleServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000000);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
System.out.println("base: " + basePath);
shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 10 * 1024 * 1024);
shuffleServerConf.set(RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE, true);
createShuffleServer(shuffleServerConf);
startServers();
shuffleServerInfo =
new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT);
}

@BeforeEach
public void createClient() {
shuffleWriteClientImpl = new ShuffleWriteClientImpl(
ClientType.GRPC.name(), 3, 1000, 1,
1, 1, 1, true, 1, 1, 10, 10);
}

@AfterEach
public void closeClient() {
shuffleWriteClientImpl.close();
}

@Test
public void testClientRemoteReadFromMultipleDisk() throws Exception {
String appId = "testClientRemoteReadFromMultipleDisk_appId";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo,
appId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL
);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();

// First committing, blocks will be written to one disk
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap,
expectedData, Lists.newArrayList(shuffleServerInfo));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(3, result.getSuccessBlockIds().size());

boolean commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1);
assertTrue(commitResult);

// Mark one storage reaching high watermark, it should switch another storage for next writing
ShuffleServer shuffleServer = shuffleServers.get(0);
ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 0);
LocalStorage storage1 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
storage1.getMetaData().setSize(20 * 1024 * 1024);

blocks = createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap,
expectedData, Lists.newArrayList(shuffleServerInfo));
result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(3, result.getSuccessBlockIds().size());
commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1);
assertTrue(commitResult);

readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 1);
LocalStorage storage2 = (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
assertNotEquals(storage1, storage2);

/**
* String storageType,
* String appId,
* int shuffleId,
* int partitionId,
* int indexReadLimit,
* int partitionNumPerRange,
* int partitionNum,
* int readBufferSize,
* String storageBasePath,
* Roaring64NavigableMap blockIdBitmap,
* Roaring64NavigableMap taskIdBitmap,
* List<ShuffleServerInfo> shuffleServerInfoList,
* Configuration hadoopConf,
* IdHelper idHelper) {
*/
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(
StorageType.LOCALFILE.name(),
appId,
0,
0,
100,
1,
1,
1000,
"",
blockIdBitmap,
Roaring64NavigableMap.bitmapOf(0),
Lists.newArrayList(shuffleServerInfo),
new Configuration(),
new DefaultIdHelper()
);

TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public static List<ShuffleDataSegment> readShuffleIndexSegments(
int readBufferSize) {
// read index file
RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0);
ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult();
return new FixedSizeSegmentSplitter(readBufferSize).split(shuffleIndexResult);
}
Expand All @@ -152,7 +152,7 @@ public static ShuffleDataResult readShuffleData(
ShuffleDataSegment segment = sds.get(segmentIndex);
RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
segment.getOffset(), segment.getLength());
segment.getOffset(), segment.getLength(), 0);

return new ShuffleDataResult(
shuffleServerClient.getShuffleData(rgsdr).getShuffleData(),
Expand All @@ -170,7 +170,7 @@ public static ShuffleDataResult readShuffleData(
int segmentIndex,
SegmentSplitter segmentSplitter) {
RssGetShuffleIndexRequest rgsir = new RssGetShuffleIndexRequest(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum, 0);
ShuffleIndexResult shuffleIndexResult = shuffleServerClient.getShuffleIndex(rgsir).getShuffleIndexResult();
if (shuffleIndexResult == null) {
return new ShuffleDataResult();
Expand All @@ -184,7 +184,7 @@ public static ShuffleDataResult readShuffleData(
ShuffleDataSegment segment = sds.get(segmentIndex);
RssGetShuffleDataRequest rgsdr = new RssGetShuffleDataRequest(
appId, shuffleId, partitionId, partitionNumPerRange, partitionNum,
segment.getOffset(), segment.getLength());
segment.getOffset(), segment.getLength(), 0);

// read shuffle data
return new ShuffleDataResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public void rpcMetricsTest() throws Exception {
ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get();
try {
shuffleServerClient.getShuffleIndex(new RssGetShuffleIndexRequest(
appId, shuffleId, 1, 1, 3));
appId, shuffleId, 1, 1, 3, 0));
} catch (Exception e) {
// ignore the exception, just test metrics value
}
Expand All @@ -637,7 +637,7 @@ public void rpcMetricsTest() throws Exception {
try {
shuffleServerClient.getShuffleData(new RssGetShuffleDataRequest(
appId, shuffleId, 0, 1, 3,
0, 100));
0, 100, 0));
} catch (Exception e) {
// ignore the exception, just test metrics value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request
.setOffset(request.getOffset())
.setLength(request.getLength())
.setTimestamp(start)
.setStorageId(request.getStorageId())
.build();
GetLocalShuffleDataResponse rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest);
String requestInfo = "appId[" + request.getAppId() + "], shuffleId["
Expand Down Expand Up @@ -571,6 +572,7 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ
.setPartitionId(request.getPartitionId())
.setPartitionNumPerRange(request.getPartitionNumPerRange())
.setPartitionNum(request.getPartitionNum())
.setStorageId(request.getStorageId())
.build();
long start = System.currentTimeMillis();
GetLocalShuffleIndexResponse rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ public class RssGetShuffleDataRequest {
private final int partitionNum;
private final long offset;
private final int length;
private final int storageId;

public RssGetShuffleDataRequest(String appId, int shuffleId, int partitionId, int partitionNumPerRange,
int partitionNum, long offset, int length) {
int partitionNum, long offset, int length, int storageId) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.partitionNumPerRange = partitionNumPerRange;
this.partitionNum = partitionNum;
this.offset = offset;
this.length = length;
this.storageId = storageId;
}

public String getAppId() {
Expand Down Expand Up @@ -65,4 +67,8 @@ public long getOffset() {
public int getLength() {
return length;
}

public int getStorageId() {
return storageId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ public class RssGetShuffleIndexRequest {
private final int partitionId;
private final int partitionNumPerRange;
private final int partitionNum;
private final int storageId;

public RssGetShuffleIndexRequest(
String appId,
int shuffleId,
int partitionId,
int partitionNumPerRange,
int partitionNum) {
int partitionNum,
int storageId) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.partitionNumPerRange = partitionNumPerRange;
this.partitionNum = partitionNum;
this.storageId = storageId;
}

public String getAppId() {
Expand All @@ -58,4 +61,7 @@ public int getPartitionNum() {
return partitionNum;
}

public int getStorageId() {
return storageId;
}
}
2 changes: 2 additions & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message GetLocalShuffleDataRequest {
int64 offset = 6;
int32 length = 7;
int64 timestamp = 8;
int32 storageId = 9;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good design choice. It's leaking too much details to client.
And there're other storage types that doesn't need local storage, such as memory and memory_hdfs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just reuse the meta data in
private final Map<PartitionUnionKey, ChainableLocalStorage> partitionsOfLocalStorage;?

Or another way would be that look up all the disk dirs to find the correct storage path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original shuffle-data reading will follow the rule

  1. Reading the remote whole index file to split and filter to get the required segments
  2. Reading the shuffle-data according to above segment's offset and length one by one

If we expose the unified abstraction for client to obey above reading sequence, it means we have to compose multiple files into abstract one and re-calculate the offset and length for every request to map the real file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good design choice. It's leaking too much details to client.

Yes. I also prefer giving a unified abstraction to hide the multiple under storages' detail for client, but currently I have no good ideas on this.

And there're other storage types that doesn't need local storage, such as memory and memory_hdfs.

Emmm... Only localfile storage will use this proto. Memory reading will use an independent api and HDFS reading will fetch data directly from HDFS datanode instead of fetch remote data from shuffle-server.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid using storageId. It expose too many details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any idea?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet.

}

message GetLocalShuffleDataResponse {
Expand Down Expand Up @@ -108,6 +109,7 @@ message GetLocalShuffleIndexRequest {
int32 partitionId = 3;
int32 partitionNumPerRange = 4;
int32 partitionNum = 5;
int32 storageId = 6;
}

message GetLocalShuffleIndexResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,30 @@

package org.apache.uniffle.server;

import com.google.common.annotations.VisibleForTesting;

public class ShuffleDataReadEvent {

private String appId;
private int shuffleId;
private int partitionId;
private int startPartition;
private int storageIndex = 0;

public ShuffleDataReadEvent(
String appId,
int shuffleId,
int partitionId,
int startPartitionOfRange,
int storageIndex) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.startPartition = startPartitionOfRange;
this.storageIndex = storageIndex;
}

@VisibleForTesting
public ShuffleDataReadEvent(String appId, int shuffleId, int partitionId, int startPartitionOfRange) {
this.appId = appId;
this.shuffleId = shuffleId;
Expand All @@ -46,4 +63,8 @@ public int getPartitionId() {
public int getStartPartition() {
return startPartition;
}

public int getStorageIndex() {
return storageIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription("The max concurrency of single partition writer, the data partition file number is "
+ "equal to this value. Default value is 1.");

public static final ConfigOption<Boolean> RSS_LOCAL_STORAGE_MULTIPLE_DISK_SELECTION_ENABLE = ConfigOptions
.key("rss.server.localstorage.multiple.disk.selection.enable")
.booleanType()
.defaultValue(false)
.withDescription("");

public ShuffleServerConf() {
}

Expand Down
Loading