diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java index f8b206f435..8200fa7011 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessInfo.java @@ -17,6 +17,8 @@ package org.apache.uniffle.coordinator; +import java.util.Collections; +import java.util.Map; import java.util.Set; import com.google.common.collect.Sets; @@ -24,14 +26,16 @@ public class AccessInfo { private final String accessId; private final Set tags; + private final Map extraProperties; - public AccessInfo(String accessId, Set tags) { + public AccessInfo(String accessId, Set tags, Map extraProperties) { this.accessId = accessId; this.tags = tags; + this.extraProperties = extraProperties == null ? Collections.emptyMap() : extraProperties; } public AccessInfo(String accessId) { - this(accessId, Sets.newHashSet()); + this(accessId, Sets.newHashSet(), Collections.emptyMap()); } public String getAccessId() { @@ -42,11 +46,16 @@ public Set getTags() { return tags; } + public Map getExtraProperties() { + return extraProperties; + } + @Override public String toString() { return "AccessInfo{" - + "accessId='" + accessId + '\'' - + ", tags=" + tags - + '}'; + + "accessId='" + accessId + '\'' + + ", tags=" + tags + + ", extraProperties=" + extraProperties + + '}'; } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java index 4fcff39ead..b3bbf728cf 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java @@ -206,7 +206,12 @@ public void accessCluster(AccessClusterRequest request, StreamObserver legalNames = Arrays.asList("v1", "v2", "v3"); + + public MockedAccessChecker(AccessManager accessManager) throws Exception { + // ignore + } + + @Override + public AccessCheckResult check(AccessInfo accessInfo) { + Map reservedData = accessInfo.getExtraProperties(); + if (legalNames.contains(reservedData.get(key))) { + return new AccessCheckResult(true, ""); + } + return new AccessCheckResult(false, ""); + } + + @Override + public void close() throws IOException { + // ignore. + } + } + + @Test + public void testUsingCustomExtraProperties() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setString( + "rss.coordinator.access.checkers", + "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"); + createCoordinatorServer(coordinatorConf); + startServers(); + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + + // case1: empty map + String accessID = "acessid"; + RssAccessClusterRequest request = new RssAccessClusterRequest( + accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000); + RssAccessClusterResponse response = coordinatorClient.accessCluster(request); + assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode()); + + // case2: illegal names + Map extraProperties = new HashMap<>(); + extraProperties.put("key", "illegalName"); + request = new RssAccessClusterRequest( + accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, extraProperties); + response = coordinatorClient.accessCluster(request); + assertEquals(ResponseStatusCode.ACCESS_DENIED, response.getStatusCode()); + + // case3: legal names + extraProperties.clear(); + extraProperties.put("key", "v1"); + request = new RssAccessClusterRequest( + accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000, extraProperties); + response = coordinatorClient.accessCluster(request); + assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode()); + + shutdownServers(); + } + @Test public void test(@TempDir File tempDir) throws Exception { File cfgFile = File.createTempFile("tmp", ".conf", tempDir); @@ -57,14 +125,13 @@ public void test(@TempDir File tempDir) throws Exception { coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 2); coordinatorConf.setString("rss.coordinator.access.candidates.path", cfgFile.getAbsolutePath()); coordinatorConf.setString( - "rss.coordinator.access.checkers", - "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker"); + "rss.coordinator.access.checkers", + "org.apache.uniffle.coordinator.AccessCandidatesChecker,org.apache.uniffle.coordinator.AccessClusterLoadChecker"); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); createShuffleServer(shuffleServerConf); startServers(); - Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); String accessId = "111111"; RssAccessClusterRequest request = new RssAccessClusterRequest( @@ -100,6 +167,7 @@ public void test(@TempDir File tempDir) throws Exception { assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode()); assertTrue(response.getMessage().startsWith("SUCCESS")); shuffleServer.stopServer(); + shutdownServers(); } } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java index 219dadbac9..41b2a86f57 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java @@ -250,6 +250,7 @@ public RssAccessClusterResponse accessCluster(RssAccessClusterRequest request) { .newBuilder() .setAccessId(request.getAccessId()) .addAllTags(request.getTags()) + .putAllExtraProperties(request.getExtraProperties()) .build(); AccessClusterResponse rpcResponse; try { diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java index adf21f57a2..ac5523ac4b 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssAccessClusterRequest.java @@ -17,6 +17,8 @@ package org.apache.uniffle.client.request; +import java.util.Collections; +import java.util.Map; import java.util.Set; public class RssAccessClusterRequest { @@ -24,11 +26,28 @@ public class RssAccessClusterRequest { private final String accessId; private final Set tags; private final int timeoutMs; + /** + * The map is to pass the extra data to the coordinator and to + * extend more pluggable {@code AccessCheckers} easily. + */ + private final Map extraProperties; public RssAccessClusterRequest(String accessId, Set tags, int timeoutMs) { this.accessId = accessId; this.tags = tags; this.timeoutMs = timeoutMs; + this.extraProperties = Collections.emptyMap(); + } + + public RssAccessClusterRequest( + String accessId, + Set tags, + int timeoutMs, + Map extraProperties) { + this.accessId = accessId; + this.tags = tags; + this.timeoutMs = timeoutMs; + this.extraProperties = extraProperties; } public String getAccessId() { @@ -42,4 +61,8 @@ public Set getTags() { public int getTimeoutMs() { return timeoutMs; } + + public Map getExtraProperties() { + return extraProperties; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index 5e16fddf1d..491316d87b 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -335,6 +335,7 @@ message CheckServiceAvailableResponse { message AccessClusterRequest { string accessId = 1; repeated string tags = 2; + map extraProperties = 3; } message AccessClusterResponse {