diff --git a/cli/src/alluxio.org/cli/cmd/process/process.go b/cli/src/alluxio.org/cli/cmd/process/process.go index 9e1c3a2e9e0d..b4ec935da8d1 100644 --- a/cli/src/alluxio.org/cli/cmd/process/process.go +++ b/cli/src/alluxio.org/cli/cmd/process/process.go @@ -15,11 +15,14 @@ import ( "alluxio.org/cli/env" ) -var Service = &env.Service{ - Name: "process", - Description: "Start or stop cluster processes", - Commands: []env.Command{ - &env.StartProcessCommand{}, - &env.StopProcessCommand{}, - }, -} +var ( + Service = &env.Service{ + Name: "process", + Description: "Start/stop cluster processes or remove workers", + Commands: []env.Command{ + &env.StartProcessCommand{}, + &env.StopProcessCommand{}, + RemoveWorker, + }, + } +) diff --git a/cli/src/alluxio.org/cli/cmd/process/remove_worker.go b/cli/src/alluxio.org/cli/cmd/process/remove_worker.go new file mode 100644 index 000000000000..daf1dce578ff --- /dev/null +++ b/cli/src/alluxio.org/cli/cmd/process/remove_worker.go @@ -0,0 +1,57 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package process + +import ( + "github.com/spf13/cobra" + + "alluxio.org/cli/cmd/names" + "alluxio.org/cli/env" +) + +var RemoveWorker = &RemoveWorkerCommand{ + BaseJavaCommand: &env.BaseJavaCommand{ + CommandName: "remove-worker", + JavaClassName: names.FileSystemAdminShellJavaClass, + Parameters: []string{"nodes", "remove"}, + }, +} + +type RemoveWorkerCommand struct { + *env.BaseJavaCommand + workerId string +} + +func (c *RemoveWorkerCommand) Base() *env.BaseJavaCommand { + return c.BaseJavaCommand +} + +func (c *RemoveWorkerCommand) ToCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "remove-worker", + Short: "Remove a worker from ETCD membership", + Long: `Remove given worker from the cluster, so that clients and other workers will not consider the removed worker for services. +The worker must have been stopped before it can be safely removed from the cluster.`, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return c.Run(args) + }, + } + const name = "name" + cmd.Flags().StringVarP(&c.workerId, name, "n", "", "Worker id") + cmd.MarkFlagRequired(name) + return cmd +} + +func (c *RemoveWorkerCommand) Run(_ []string) error { + return c.Base().Run([]string{"-n", c.workerId}) +} diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index bc3512a1d7ff..7e6d0045d430 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -14,6 +14,7 @@ import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; import alluxio.exception.status.AlreadyExistsException; +import alluxio.exception.status.InvalidArgumentException; import alluxio.util.CommonUtils; import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerInfo; @@ -256,7 +257,24 @@ public void stopHeartBeat(WorkerInfo worker) throws IOException { @Override public void decommission(WorkerInfo worker) throws IOException { - // TO BE IMPLEMENTED + Optional targetWorker = getAllMembers().getWorkerById(worker.getIdentity()); + if (!targetWorker.isPresent()) { + throw new InvalidArgumentException( + String.format("Unrecognized or non-existing worker: %s", worker.getIdentity())); + } + // Worker should already be offline + if (targetWorker.get().getState() != WorkerState.LOST) { + throw new InvalidArgumentException( + String.format("Can't remove running worker: %s, stop the worker" + + " before removing", worker.getIdentity())); + } + // stop heartbeat if it is an existing service discovery tab(although unlikely) + stopHeartBeat(worker); + String pathOnRing = new StringBuffer() + .append(getRingPathPrefix()) + .append(worker.getIdentity()).toString(); + mAlluxioEtcdClient.deleteForPath(pathOnRing, false); + LOG.info("Successfully removed worker:{}", worker.getIdentity()); } @Override diff --git a/dora/core/common/src/main/java/alluxio/wire/WorkerIdentity.java b/dora/core/common/src/main/java/alluxio/wire/WorkerIdentity.java index 30a338df127b..c2180ae485d3 100644 --- a/dora/core/common/src/main/java/alluxio/wire/WorkerIdentity.java +++ b/dora/core/common/src/main/java/alluxio/wire/WorkerIdentity.java @@ -11,6 +11,8 @@ package alluxio.wire; +import alluxio.exception.status.InvalidArgumentException; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonGenerator; @@ -112,6 +114,38 @@ public int hashCode() { return hashCode; } + /** + * Construct from a workerid representation from registration. + * + * @param workerIdentityStr + * @return WorkerIdentity object + * @throws InvalidArgumentException + */ + public static WorkerIdentity fromString(String workerIdentityStr) + throws InvalidArgumentException { + String prefix = "worker-"; + String errStr = "Unrecognized worker identity string"; + if (!workerIdentityStr.startsWith(prefix)) { + throw new InvalidArgumentException(errStr + ": " + workerIdentityStr); + } + String idStr = workerIdentityStr.substring(prefix.length()); + try { + return ParserV1.INSTANCE.fromUUID(idStr); + } catch (java.lang.IllegalArgumentException ex) { + // DO NOTHING + } + try { + return ParserV0.INSTANCE.fromLong(Long.parseLong(idStr)); + } catch (NumberFormatException ex) { + throw new InvalidArgumentException(errStr + ": " + workerIdentityStr); + } + } + + /** + * [NOTE] paired with fromString. + * if modified, change fromString as well + * @return String representation of WorkerIdentity + */ @Override public String toString() { return String.format("worker-%s", diff --git a/dora/core/common/src/test/java/alluxio/wire/WorkerIdentityTest.java b/dora/core/common/src/test/java/alluxio/wire/WorkerIdentityTest.java index f4afc716c784..3da8343f9ddd 100644 --- a/dora/core/common/src/test/java/alluxio/wire/WorkerIdentityTest.java +++ b/dora/core/common/src/test/java/alluxio/wire/WorkerIdentityTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import alluxio.exception.status.InvalidArgumentException; import alluxio.util.io.BufferUtils; import com.fasterxml.jackson.databind.ObjectMapper; @@ -26,6 +27,7 @@ import com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.RandomUtils; +import org.junit.Assert; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -311,4 +313,27 @@ public void javaSerdeInvalidData() throws Exception { t.printStackTrace(); } } + + @Test + public void stringSerDeTest() throws InvalidArgumentException { + // V1 string SerDe + UUID uuid = UUID.nameUUIDFromBytes("uuid".getBytes(StandardCharsets.UTF_8)); + WorkerIdentity identity = WorkerIdentity.ParserV1.INSTANCE.fromUUID(uuid); + String idStr = identity.toString(); + WorkerIdentity deserializedId = WorkerIdentity.fromString(idStr); + Assert.assertEquals(identity, deserializedId); + + // V0 string SerDe + Long longId = RandomUtils.nextLong(); + identity = WorkerIdentity.ParserV0.INSTANCE.fromLong(longId); + idStr = identity.toString(); + deserializedId = WorkerIdentity.fromString(idStr); + Assert.assertEquals(identity, deserializedId); + + // Deserialize an unrecognized id str should get a InvalidArgumentException exception + String invalidIdStr = String.format("{}-{}", + RandomUtils.nextLong(), RandomUtils.nextLong()); + Assert.assertThrows(InvalidArgumentException.class, + () -> WorkerIdentity.fromString(invalidIdStr)); + } } diff --git a/dora/shell/src/main/java/alluxio/cli/fsadmin/command/NodesCommand.java b/dora/shell/src/main/java/alluxio/cli/fsadmin/command/NodesCommand.java index 175709dc6e3f..fda4b4f959f5 100644 --- a/dora/shell/src/main/java/alluxio/cli/fsadmin/command/NodesCommand.java +++ b/dora/shell/src/main/java/alluxio/cli/fsadmin/command/NodesCommand.java @@ -12,6 +12,7 @@ package alluxio.cli.fsadmin.command; import alluxio.cli.Command; +import alluxio.cli.fsadmin.nodes.RemoveWorkerCommand; import alluxio.cli.fsadmin.nodes.WorkerStatusCommand; import alluxio.conf.AlluxioConfiguration; @@ -29,6 +30,7 @@ public class NodesCommand extends AbstractFsAdminCommand { static { SUB_COMMANDS.put("status", WorkerStatusCommand::new); + SUB_COMMANDS.put("remove", RemoveWorkerCommand::new); } private Map mSubCommands = new HashMap<>(); diff --git a/dora/shell/src/main/java/alluxio/cli/fsadmin/nodes/RemoveWorkerCommand.java b/dora/shell/src/main/java/alluxio/cli/fsadmin/nodes/RemoveWorkerCommand.java new file mode 100644 index 000000000000..8d7c3450e5ec --- /dev/null +++ b/dora/shell/src/main/java/alluxio/cli/fsadmin/nodes/RemoveWorkerCommand.java @@ -0,0 +1,100 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.cli.fsadmin.nodes; + +import alluxio.cli.fsadmin.command.AbstractFsAdminCommand; +import alluxio.cli.fsadmin.command.Context; +import alluxio.conf.AlluxioConfiguration; +import alluxio.membership.MembershipManager; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.io.IOException; + +/** + * Remove a non-running worker from the membership. + */ +public class RemoveWorkerCommand extends AbstractFsAdminCommand { + public static final String WORKERNAME_OPTION_NAME = "n"; + public static final String HELP_OPTION_NAME = "h"; + + private static final Option WORKERNAME_OPTION = + Option.builder(WORKERNAME_OPTION_NAME) + .required(true) + .hasArg(true) + .desc("ID of the worker to remove") + .build(); + + private static final Option HELP_OPTION = + Option.builder(HELP_OPTION_NAME) + .required(false) + .hasArg(false) + .desc("print help information.") + .build(); + + private final AlluxioConfiguration mAlluxioConf; + + /** + * @param context + * @param alluxioConf + */ + public RemoveWorkerCommand(Context context, AlluxioConfiguration alluxioConf) { + super(context); + mAlluxioConf = alluxioConf; + } + + @Override + public String getCommandName() { + return "remove"; + } + + @Override + public Options getOptions() { + return new Options().addOption(WORKERNAME_OPTION) + .addOption(HELP_OPTION); + } + + @Override + public String getUsage() { + return getCommandName() + " -n | -h"; + } + + @Override + public String getDescription() { + return "Remove given worker from the cluster, so that clients and " + + "other workers will not consider the removed worker for services. " + + "The worker must have been stopped before it can be safely removed " + + "from the cluster."; + } + + @Override + public int run(CommandLine cl) throws IOException { + if (cl.hasOption(HELP_OPTION_NAME) + || !cl.hasOption(WORKERNAME_OPTION_NAME)) { + System.out.println(getUsage()); + System.out.println(getDescription()); + return 0; + } + MembershipManager membershipManager = + MembershipManager.Factory.create(mAlluxioConf); + String workerId = cl.getOptionValue(WORKERNAME_OPTION_NAME).trim(); + membershipManager.decommission( + new WorkerInfo().setIdentity(WorkerIdentity.fromString(workerId))); + mPrintStream.println(String.format( + "Successfully removed worker: %s", workerId)); + return 0; + } +} diff --git a/dora/tests/integration/src/test/java/alluxio/client/cli/fsadmin/command/NodesCommandIntegrationTest.java b/dora/tests/integration/src/test/java/alluxio/client/cli/fsadmin/command/NodesCommandIntegrationTest.java index fb7e1de0385f..967cd3bbc979 100644 --- a/dora/tests/integration/src/test/java/alluxio/client/cli/fsadmin/command/NodesCommandIntegrationTest.java +++ b/dora/tests/integration/src/test/java/alluxio/client/cli/fsadmin/command/NodesCommandIntegrationTest.java @@ -21,9 +21,19 @@ public class NodesCommandIntegrationTest extends AbstractFsAdminShellTest { * alluxio mini cluster to start with for StaticMembershipManager * and EtcdMembershipManager. Currrently only add basic tests. */ + @Test public void testNoopMemberManager() { int ret = mFsAdminShell.run("nodes", "status"); Assert.assertEquals(0, ret); } + + @Test + public void testRemoveMember() throws Exception { + int ret = mFsAdminShell.run("nodes", "remove"); + Assert.assertNotEquals(0, ret); + Assert.assertTrue(mOutput.toString().contains("Missing required option")); + /* TODO() if possible start the local cluster with etcdmembership and then test + further logics */ + } } diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index 6694282a92f1..9cb3c1a830bf 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -11,9 +11,12 @@ package alluxio.membership; +import static org.junit.Assert.assertThrows; + import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; import alluxio.exception.status.AlreadyExistsException; +import alluxio.exception.status.InvalidArgumentException; import alluxio.util.CommonUtils; import alluxio.util.WaitForOptions; import alluxio.wire.WorkerIdentity; @@ -33,7 +36,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -148,6 +150,10 @@ public MembershipManager getHealthyEtcdMemberMgr() { return new EtcdMembershipManager(Configuration.global(), getHealthyAlluxioEtcdClient()); } + public MembershipManager getToxicEtcdMemberMgr() { + return new EtcdMembershipManager(Configuration.global(), getToxicAlluxioEtcdClient()); + } + public void enableEtcdAuthentication() throws ExecutionException, InterruptedException { // root has full permission and is first user to enable authentication // so just use root user for test etcd user/password connection. @@ -288,7 +294,7 @@ public void testServiceRegistryMembershipManager() throws Exception { Assert.assertEquals(wkrs, allMembers); List strs = client.getChildren("/").stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) - .collect(Collectors.toList()); + .collect(Collectors.toList()); Assert.assertEquals(3, strs.size()); for (String str : strs) { Assert.assertTrue(str.contains("/ServiceDiscovery/DefaultAlluxioCluster/worker")); @@ -316,15 +322,9 @@ public void testServiceRegistryMembershipManager() throws Exception { .collect(Collectors.toList()), actualLiveMembers); } - // ignore due to flaky already exist address exception. This test only passes when it is the - // first test to run. - @Ignore @Test public void testFlakyNetwork() throws Exception { - Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD); - Configuration.set(PropertyKey.ETCD_ENDPOINTS, getProxiedClientEndpoints()); - MembershipManager membershipManager = MembershipManager.Factory.create(Configuration.global()); - Assert.assertTrue(membershipManager instanceof EtcdMembershipManager); + MembershipManager membershipManager = getToxicEtcdMemberMgr(); WorkerInfo wkr1 = new WorkerInfo() .setIdentity(WorkerIdentityTestUtils.randomUuidBasedId()) .setAddress(new WorkerNetAddress() @@ -511,4 +511,74 @@ public void testOptionalHttpPortChangeInWorkerAddress() throws Exception { Assert.assertEquals(wkr.getAddress().getHttpServerPort(), curWorkerInfo.get().getAddress().getHttpServerPort()); } + + @Test + public void testDecommission() throws Exception { + Configuration.set(PropertyKey.WORKER_MEMBERSHIP_MANAGER_TYPE, MembershipType.ETCD); + Configuration.set(PropertyKey.ETCD_ENDPOINTS, getClientEndpoints()); + MembershipManager membershipManager = getHealthyEtcdMemberMgr(); + WorkerInfo wkr1 = new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.randomUuidBasedId()) + .setAddress(new WorkerNetAddress() + .setHost("worker1").setContainerHost("containerhostname1") + .setRpcPort(1000).setDataPort(1001).setWebPort(1011) + .setDomainSocketPath("/var/lib/domain.sock")); + WorkerIdentity wkr2Id = WorkerIdentityTestUtils.randomUuidBasedId(); + WorkerInfo wkr2 = new WorkerInfo() + .setIdentity(wkr2Id) + .setAddress(new WorkerNetAddress() + .setHost("worker2").setContainerHost("containerhostname2") + .setRpcPort(2000).setDataPort(2001).setWebPort(2011) + .setDomainSocketPath("/var/lib/domain.sock")); + membershipManager.join(wkr1); + membershipManager.join(wkr2); + List wkrs = new ArrayList<>(); + wkrs.add(new WorkerInfo(wkr1).setState(WorkerState.LIVE)); + wkrs.add(new WorkerInfo(wkr2).setState(WorkerState.LIVE)); + List allMembers = membershipManager.getAllMembers().stream() + .sorted(Comparator.comparing(w -> w.getAddress().getHost())) + .collect(Collectors.toList()); + Assert.assertEquals(wkrs, allMembers); + + // try to decommission a running worker will be rejected + assertThrows(InvalidArgumentException.class, () -> membershipManager.decommission(wkr2)); + + // try stop and decommission + membershipManager.stopHeartBeat(wkr2); + CommonUtils.waitFor("Worker to stop", + () -> { + try { + return membershipManager.getFailedMembers() + .getWorkerById(wkr2.getIdentity()).isPresent(); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unexpected error while getting failed members: %s", e)); + } + }, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10))); + membershipManager.decommission(wkr2); + Assert.assertFalse(membershipManager.getAllMembers() + .getWorkerById(wkr2.getIdentity()).isPresent()); + + // some other worker with a same id could register again + WorkerInfo wkr2Replacement = new WorkerInfo() + .setIdentity(wkr2Id) + .setAddress(new WorkerNetAddress() + .setHost("worker3").setContainerHost("containerhostname3") + .setRpcPort(2000).setDataPort(2001).setWebPort(2011) + .setDomainSocketPath("/var/lib/domain.sock")); + membershipManager.join(wkr2Replacement); + CommonUtils.waitFor("Worker2 replacement to be up.", + () -> { + try { + return membershipManager.getLiveMembers().getWorkerById(wkr2Id).isPresent(); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unexpected error while getting live members: %s", e)); + } + }, WaitForOptions.defaults().setTimeoutMs(TimeUnit.SECONDS.toMillis(10))); + Optional newWkr2Entity = membershipManager.getAllMembers().getWorkerById(wkr2Id); + Assert.assertTrue(newWkr2Entity.isPresent()); + wkr2Replacement.setState(WorkerState.LIVE); + Assert.assertEquals(wkr2Replacement, newWkr2Entity.get()); + } }