Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId,
}
}
String msg = "Error happened when getShuffleAssignments with appId[" + appId + "], shuffleId[" + shuffleId
+ "], numMaps[" + partitionNum + "], partitionNumPerRange[" + partitionNumPerRange + "] to coordinator";
+ "], numMaps[" + partitionNum + "], partitionNumPerRange[" + partitionNumPerRange + "] to coordinator. "
+ "Error message: " + response.getMessage();
throwExceptionIfNecessary(response, msg);

return new ShuffleAssignmentsInfo(response.getPartitionToServers(), response.getServerToPartitionRanges());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,14 @@ public void getShuffleAssignments(
logAssignmentResult(appId, shuffleId, pra);
responseObserver.onNext(response);
} catch (Exception e) {
LOG.error(e.getMessage());
response = GetShuffleAssignmentsResponse.newBuilder().setStatus(StatusCode.INTERNAL_ERROR).build();
LOG.error("Errors on getting shuffle assignments for app: {}, shuffleId: {}, partitionNum: {}, "
+ "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
appId, shuffleId, partitionNum, partitionNumPerRange, replica, requiredTags, e);
response = GetShuffleAssignmentsResponse
.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR)
.setRetMsg(e.getMessage())
.build();
responseObserver.onNext(response);
} finally {
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ public class CoordinatorServer {

public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
this.coordinatorConf = coordinatorConf;
initialization();

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope all initializing exception message can be recorded into log files. Sometimes due to abnormal configuration, server or coordinator will exit directly no any log recorded, which is hard to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be error msg in the stdout file when the server occur exception. But it's also acceptable change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. stdout message will lost when using ansible to deploy uniffle cluster.

initialization();
} catch (Exception e) {
LOG.error("Errors on initializing coordinator server.", e);
throw e;
}
}

public static void main(String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public RssGetShuffleAssignmentsResponse getShuffleAssignments(RssGetShuffleAssig
response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.TIMEOUT);
break;
default:
response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.INTERNAL_ERROR);
response = new RssGetShuffleAssignmentsResponse(ResponseStatusCode.INTERNAL_ERROR, rpcResponse.getRetMsg());
}

return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public RssGetShuffleAssignmentsResponse(ResponseStatusCode statusCode) {
super(statusCode);
}

public RssGetShuffleAssignmentsResponse(ResponseStatusCode statusCode, String message) {
super(statusCode, message);
}

public Map<Integer, List<ShuffleServerInfo>> getPartitionToServers() {
return partitionToServers;
}
Expand Down
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ message PartitionRangeAssignment {
message GetShuffleAssignmentsResponse {
StatusCode status = 1;
repeated PartitionRangeAssignment assignments = 2;
string retMsg = 3;
}

message ReportShuffleClientOpRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ public class ShuffleServer {

public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
initialization();
try {
initialization();
} catch (Exception e) {
LOG.error("Errors on initializing shuffle server.", e);
throw e;
}
}

/**
Expand Down