Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add decommission capability and removenode cmd #18530

Merged
merged 8 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cli/src/alluxio.org/cli/cmd/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
"alluxio.org/cli/env"
)

var Service = &env.Service{
Name: "process",
Description: "Start or stop cluster processes",
Commands: []env.Command{
&env.StartProcessCommand{},
&env.StopProcessCommand{},
},
}
var (
Service = &env.Service{
Name: "process",
Description: "Start/stop cluster processes or remove workers",
Commands: []env.Command{
&env.StartProcessCommand{},
&env.StopProcessCommand{},
RemoveWorker,
},
}
)
57 changes: 57 additions & 0 deletions cli/src/alluxio.org/cli/cmd/process/remove_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing license header in new file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


import (
"github.com/spf13/cobra"

"alluxio.org/cli/cmd/names"
"alluxio.org/cli/env"
)

var RemoveWorker = &RemoveWorkerCommand{
BaseJavaCommand: &env.BaseJavaCommand{
CommandName: "remove-worker",
JavaClassName: names.FileSystemAdminShellJavaClass,
Parameters: []string{"nodes", "remove"},
},
}

type RemoveWorkerCommand struct {
*env.BaseJavaCommand
workerId string
}

func (c *RemoveWorkerCommand) Base() *env.BaseJavaCommand {
return c.BaseJavaCommand
}

func (c *RemoveWorkerCommand) ToCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "remove-worker",
Short: "Remove a worker from ETCD membership",
Long: `Remove given worker from the cluster, so that clients and other workers will not consider the removed worker for services.
The worker must have been stopped before it can be safely removed from the cluster.`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return c.Run(args)
},
}
const name = "name"
cmd.Flags().StringVarP(&c.workerId, name, "n", "", "Worker id")
cmd.MarkFlagRequired(name)
return cmd
}

func (c *RemoveWorkerCommand) Run(_ []string) error {
return c.Base().Run([]string{"-n", c.workerId})
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;
Expand Down Expand Up @@ -256,7 +257,24 @@ public void stopHeartBeat(WorkerInfo worker) throws IOException {

@Override
public void decommission(WorkerInfo worker) throws IOException {
// TO BE IMPLEMENTED
Optional<WorkerInfo> targetWorker = getAllMembers().getWorkerById(worker.getIdentity());
if (!targetWorker.isPresent()) {
throw new InvalidArgumentException(
String.format("Unrecognized or non-existing worker: %s", worker.getIdentity()));
}
// Worker should already be offline
if (targetWorker.get().getState() != WorkerState.LOST) {
throw new InvalidArgumentException(
String.format("Can't remove running worker: %s, stop the worker"
+ " before removing", worker.getIdentity()));
}
// stop heartbeat if it is an existing service discovery tab(although unlikely)
stopHeartBeat(worker);
String pathOnRing = new StringBuffer()
.append(getRingPathPrefix())
.append(worker.getIdentity()).toString();
mAlluxioEtcdClient.deleteForPath(pathOnRing, false);
LOG.info("Successfully removed worker:{}", worker.getIdentity());
}

@Override
Expand Down
34 changes: 34 additions & 0 deletions dora/core/common/src/main/java/alluxio/wire/WorkerIdentity.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package alluxio.wire;

import alluxio.exception.status.InvalidArgumentException;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
Expand Down Expand Up @@ -112,6 +114,38 @@ public int hashCode() {
return hashCode;
}

/**
* Construct from a workerid representation from registration.
*
* @param workerIdentityStr
* @return WorkerIdentity object
* @throws InvalidArgumentException
*/
public static WorkerIdentity fromString(String workerIdentityStr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this method close to toString so that hopefully people will remember to keep the two implementations compatible when they change either of them in the future. Also please add a unit test to ensure the string representation generated by toString can be correctly parsed by fromString.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done and done

throws InvalidArgumentException {
String prefix = "worker-";
String errStr = "Unrecognized worker identity string";
if (!workerIdentityStr.startsWith(prefix)) {
throw new InvalidArgumentException(errStr + ": " + workerIdentityStr);
}
String idStr = workerIdentityStr.substring(prefix.length());
try {
return ParserV1.INSTANCE.fromUUID(idStr);
} catch (java.lang.IllegalArgumentException ex) {
// DO NOTHING
}
try {
return ParserV0.INSTANCE.fromLong(Long.parseLong(idStr));
} catch (NumberFormatException ex) {
throw new InvalidArgumentException(errStr + ": " + workerIdentityStr);
}
}

/**
* [NOTE] paired with fromString.
* if modified, change fromString as well
* @return String representation of WorkerIdentity
*/
@Override
public String toString() {
return String.format("worker-%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import alluxio.exception.status.InvalidArgumentException;
import alluxio.util.io.BufferUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -26,6 +27,7 @@
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -311,4 +313,27 @@ public void javaSerdeInvalidData() throws Exception {
t.printStackTrace();
}
}

@Test
public void stringSerDeTest() throws InvalidArgumentException {
// V1 string SerDe
UUID uuid = UUID.nameUUIDFromBytes("uuid".getBytes(StandardCharsets.UTF_8));
WorkerIdentity identity = WorkerIdentity.ParserV1.INSTANCE.fromUUID(uuid);
String idStr = identity.toString();
WorkerIdentity deserializedId = WorkerIdentity.fromString(idStr);
Assert.assertEquals(identity, deserializedId);

// V0 string SerDe
Long longId = RandomUtils.nextLong();
identity = WorkerIdentity.ParserV0.INSTANCE.fromLong(longId);
idStr = identity.toString();
deserializedId = WorkerIdentity.fromString(idStr);
Assert.assertEquals(identity, deserializedId);

// Deserialize an unrecognized id str should get a InvalidArgumentException exception
String invalidIdStr = String.format("{}-{}",
RandomUtils.nextLong(), RandomUtils.nextLong());
Assert.assertThrows(InvalidArgumentException.class,
() -> WorkerIdentity.fromString(invalidIdStr));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.cli.fsadmin.command;

import alluxio.cli.Command;
import alluxio.cli.fsadmin.nodes.RemoveWorkerCommand;
import alluxio.cli.fsadmin.nodes.WorkerStatusCommand;
import alluxio.conf.AlluxioConfiguration;

Expand All @@ -29,6 +30,7 @@ public class NodesCommand extends AbstractFsAdminCommand {

static {
SUB_COMMANDS.put("status", WorkerStatusCommand::new);
SUB_COMMANDS.put("remove", RemoveWorkerCommand::new);
}

private Map<String, Command> mSubCommands = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also make changes to nodes.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes Rico is making the change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added now with the running format:
bin/alluxio process remove-worker -n <worker_id>

* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.cli.fsadmin.nodes;

import alluxio.cli.fsadmin.command.AbstractFsAdminCommand;
import alluxio.cli.fsadmin.command.Context;
import alluxio.conf.AlluxioConfiguration;
import alluxio.membership.MembershipManager;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import java.io.IOException;

/**
* Remove a non-running worker from the membership.
*/
public class RemoveWorkerCommand extends AbstractFsAdminCommand {
public static final String WORKERNAME_OPTION_NAME = "n";
public static final String HELP_OPTION_NAME = "h";

private static final Option WORKERNAME_OPTION =
Option.builder(WORKERNAME_OPTION_NAME)
.required(true)
.hasArg(true)
.desc("ID of the worker to remove")
.build();

private static final Option HELP_OPTION =
Option.builder(HELP_OPTION_NAME)
.required(false)
.hasArg(false)
.desc("print help information.")
.build();

private final AlluxioConfiguration mAlluxioConf;

/**
* @param context
* @param alluxioConf
*/
public RemoveWorkerCommand(Context context, AlluxioConfiguration alluxioConf) {
super(context);
mAlluxioConf = alluxioConf;
}

@Override
public String getCommandName() {
return "remove";
}

@Override
public Options getOptions() {
return new Options().addOption(WORKERNAME_OPTION)
.addOption(HELP_OPTION);
}

@Override
public String getUsage() {
return getCommandName() + " -n <WorkerId> | -h";
}

@Override
public String getDescription() {
return "Remove given worker from the cluster, so that clients and "
+ "other workers will not consider the removed worker for services. "
+ "The worker must have been stopped before it can be safely removed "
+ "from the cluster.";
}

@Override
public int run(CommandLine cl) throws IOException {
if (cl.hasOption(HELP_OPTION_NAME)
|| !cl.hasOption(WORKERNAME_OPTION_NAME)) {
System.out.println(getUsage());
System.out.println(getDescription());
return 0;
}
MembershipManager membershipManager =
MembershipManager.Factory.create(mAlluxioConf);
String workerId = cl.getOptionValue(WORKERNAME_OPTION_NAME).trim();
membershipManager.decommission(
new WorkerInfo().setIdentity(WorkerIdentity.fromString(workerId)));
mPrintStream.println(String.format(
"Successfully removed worker: %s", workerId));
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,19 @@ public class NodesCommandIntegrationTest extends AbstractFsAdminShellTest {
* alluxio mini cluster to start with for StaticMembershipManager
* and EtcdMembershipManager. Currrently only add basic tests.
*/

@Test
public void testNoopMemberManager() {
int ret = mFsAdminShell.run("nodes", "status");
Assert.assertEquals(0, ret);
}

@Test
public void testRemoveMember() throws Exception {
int ret = mFsAdminShell.run("nodes", "remove");
Assert.assertNotEquals(0, ret);
Assert.assertTrue(mOutput.toString().contains("Missing required option"));
/* TODO() if possible start the local cluster with etcdmembership and then test
further logics */
}
}
Loading
Loading