Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.exception;

public class FileNotFoundException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use Java.io.FileNotFoundException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java.io.FileNotFoundException is IOException which need to catch in invoking side, so introduce this extending the RuntimeException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do u think @frankliee


public FileNotFoundException(String message) {
super(message);
}

public FileNotFoundException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import java.io.File;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.MemoryQuorumClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ShuffleServerWithLocalOfExceptionTest extends ShuffleReadWriteBase {

private ShuffleServerGrpcClient shuffleServerClient;
private static String REMOTE_STORAGE = HDFS_URI + "rss/test";

@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File tmpDir = Files.createTempDir();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", "5000");
createShuffleServer(shuffleServerConf);

startServers();
}

@BeforeEach
public void createClient() {
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
}

@AfterEach
public void closeClient() {
shuffleServerClient.close();
}

@Test
public void testReadWhenConnectionFailedShouldThrowException() throws Exception {
String testAppId = "testReadWhenException";
int shuffleId = 0;
int partitionId = 0;

MemoryQuorumClientReadHandler memoryQuorumClientReadHandler = new MemoryQuorumClientReadHandler(
testAppId, shuffleId, partitionId, 150, Lists.newArrayList(shuffleServerClient));
ClientReadHandler[] handlers = new ClientReadHandler[1];
handlers[0] = memoryQuorumClientReadHandler;
ComposedClientReadHandler composedClientReadHandler = new ComposedClientReadHandler(handlers);
shuffleServers.get(0).stopServer();
try {
ShuffleDataResult sdr = composedClientReadHandler.readShuffleData();
fail("Should throw connection exception directly.");
} catch (RssException rssException) {
assertTrue(rssException.getMessage().contains("Failed to read shuffle data from HOT handler"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {

Expand Down Expand Up @@ -232,13 +232,7 @@ public void writeReadTest() throws Exception {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());

try {
readClient.readShuffleBlockData();
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to read all replicas for"));
}
assertNull(readClient.readShuffleBlockData());
readClient.close();

// send 2nd commit, data will be persisted to disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,20 @@ public void readTest3() throws Exception {

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 2, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks);

ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 0, 100, 1, 10, 1000,
"", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
FileUtils.deleteDirectory(new File(DATA_DIR1.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
FileUtils.deleteDirectory(new File(DATA_DIR2.getAbsolutePath() + "/" + testAppId + "/0/0-0"));
// sleep to wait delete operation
Thread.sleep(2000);

try {
readClient.readShuffleBlockData();
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to read all replicas for"));
}
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(),
testAppId, 0, 0, 100, 1, 10, 1000,
"", blockIdBitmap, taskIdBitmap, shuffleServerInfo, null, new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
readClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatResponse;
Expand Down Expand Up @@ -533,6 +534,12 @@ public void getLocalShuffleIndex(GetLocalShuffleIndexRequest request,

builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
reply = builder.build();
} catch (FileNotFoundException indexFileNotFoundException) {
LOG.warn("Index file for {} is not found, maybe the data has been flushed to cold storage.",
requestInfo, indexFileNotFoundException);
reply = GetLocalShuffleIndexResponse.newBuilder()
.setStatus(valueOf(status))
.build();
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;


public class ComposedClientReadHandler implements ClientReadHandler {

private static final Logger LOG = LoggerFactory.getLogger(ComposedClientReadHandler.class);
Expand Down Expand Up @@ -127,7 +127,7 @@ public ShuffleDataResult readShuffleData() {
return null;
}
} catch (Exception e) {
LOG.error("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
throw new RssException("Failed to read shuffle data from " + getCurrentHandlerName() + " handler", e);
}
// when is no data for current handler, and the upmostLevel is not reached,
// then try next one if there has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
Expand Down Expand Up @@ -80,7 +81,7 @@ private void prepareFilePath(
File baseFolder = new File(fullShufflePath);
if (!baseFolder.exists()) {
// the partition doesn't exist in this base folder, skip
throw new RuntimeException("Can't find folder " + fullShufflePath);
throw new FileNotFoundException("Can't find folder " + fullShufflePath);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change exception to FileNotFoundException ,this should not be thrown to client.

For example:
When using the MEMORY_LOCALFILE_HDFS type, sometime, the localfile storage may be empty due to the big event flushed to HDFS directly. So when I want to fast fail in ComposedClientReadHandler, this problem should be involved. So this exception should be handled in server's grpc api and return the empty index result.

}
File[] indexFiles;
String failedGetIndexFileMsg = "No index file found in " + storageBasePath;
Expand All @@ -93,7 +94,7 @@ public boolean accept(File dir, String name) {
}
});
} catch (Exception e) {
throw new RuntimeException(failedGetIndexFileMsg, e);
throw new FileNotFoundException(failedGetIndexFileMsg, e);
}

if (indexFiles != null && indexFiles.length > 0) {
Expand Down