Skip to content

Commit

Permalink
[AMORO-1810] Check the validity of the heatbeat interval when an opti… (
Browse files Browse the repository at this point in the history
#2432)

* [AMORO-1810] Check the validity of the heatbeat interval when an optimizer start

* adjust the import way

* resolve some logic code

* refactor code

* refactor code

* fix ut error

* resolve ut error

* fix ut error

* fix ut error

* fix ut error

* fix ci error
  • Loading branch information
tcodehuber authored Dec 20, 2023
1 parent 5f35ce2 commit 6e8bf52
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public final class ErrorCodes {
public static final int OBJECT_NOT_EXISTS_ERROR_CODE = 1001;
public static final int ALREADY_EXISTS_ERROR_CODE = 1002;
public static final int ILLEGAL_METADATA_ERROR_CODE = 1003;
public static final int FORBIDDEN_ERROR_CODE = 1004;

public static final int TASK_NOT_FOUND_ERROR_CODE = 2001;
public static final int DUPLICATED_TASK_ERROR_CODE = 2002;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netease.arctic.ams.api.ArcticException;
import com.netease.arctic.ams.api.ErrorCodes;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -54,6 +55,9 @@ private boolean checkToken() {
String token =
callAms(
client -> {
withRegisterProperty(
OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL,
String.valueOf(getConfig().getHeartBeat()));
OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
registerInfo.setThreadCount(getConfig().getExecutionParallel());
registerInfo.setMemoryMb(getConfig().getMemorySize());
Expand All @@ -71,6 +75,10 @@ private boolean checkToken() {
return true;
} catch (TException e) {
LOG.error("Register optimizer to ams failed", e);
if (e instanceof ArcticException
&& ErrorCodes.FORBIDDEN_ERROR_CODE == ((ArcticException) e).getErrorCode()) {
System.exit(1); // Don't need to try again
}
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.netease.arctic.optimizer.common;

import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import com.google.common.collect.*;
import com.netease.arctic.ams.api.*;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand All @@ -26,20 +26,19 @@ public void testRegisterOptimizer() throws InterruptedException {
tokenChangeListener.waitForTokenChange();
Assert.assertEquals(1, tokenChangeListener.tokenList().size());
Assert.assertEquals(1, TEST_AMS.getOptimizerHandler().getRegisteredOptimizers().size());
Map<String, String> optimizerProperties = Maps.newHashMap();
optimizerProperties.put("test_k", "test_v");
optimizerProperties.put(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL, "1000");
validateRegisteredOptimizer(
tokenChangeListener.tokenList().get(0),
optimizerConfig,
Collections.singletonMap("test_k", "test_v"));
tokenChangeListener.tokenList().get(0), optimizerConfig, optimizerProperties);

// clear all optimizer, toucher will register again
TEST_AMS.getOptimizerHandler().getRegisteredOptimizers().clear();
tokenChangeListener.waitForTokenChange();
Assert.assertEquals(2, tokenChangeListener.tokenList().size());
Assert.assertEquals(1, TEST_AMS.getOptimizerHandler().getRegisteredOptimizers().size());
validateRegisteredOptimizer(
tokenChangeListener.tokenList().get(1),
optimizerConfig,
Collections.singletonMap("test_k", "test_v"));
tokenChangeListener.tokenList().get(1), optimizerConfig, optimizerProperties);

optimizerToucher.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import com.netease.arctic.ams.api.OptimizingService;
import com.netease.arctic.ams.api.OptimizingTask;
Expand All @@ -28,6 +29,7 @@
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.ams.api.resource.Resource;
import com.netease.arctic.ams.api.resource.ResourceGroup;
import com.netease.arctic.server.exception.ForbiddenException;
import com.netease.arctic.server.exception.ObjectNotExistsException;
import com.netease.arctic.server.exception.PluginRetryAuthException;
import com.netease.arctic.server.exception.TaskNotFoundException;
Expand Down Expand Up @@ -229,6 +231,21 @@ public void completeTask(String authToken, OptimizingTaskResult taskResult) {
@Override
public String authenticate(OptimizerRegisterInfo registerInfo) {
LOG.info("Register optimizer {}.", registerInfo);
Optional.ofNullable(
registerInfo.getProperties().get(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL))
.ifPresent(
interval -> {
if (Long.parseLong(interval) >= optimizerTouchTimeout) {
throw new ForbiddenException(
String.format(
"The %s:%s configuration should be less than AMS's %s:%s",
OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL,
interval,
ArcticManagementConf.OPTIMIZER_HB_TIMEOUT.key(),
optimizerTouchTimeout));
}
});

OptimizingQueue queue = getQueueByGroup(registerInfo.getGroupName());
OptimizerInstance optimizer = new OptimizerInstance(registerInfo, queue.getContainerName());
registerOptimizer(optimizer, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class ArcticRuntimeException extends RuntimeException {
CODE_MAP.put(ObjectNotExistsException.class, ErrorCodes.OBJECT_NOT_EXISTS_ERROR_CODE);
CODE_MAP.put(AlreadyExistsException.class, ErrorCodes.ALREADY_EXISTS_ERROR_CODE);
CODE_MAP.put(IllegalMetadataException.class, ErrorCodes.ILLEGAL_METADATA_ERROR_CODE);
CODE_MAP.put(ForbiddenException.class, ErrorCodes.FORBIDDEN_ERROR_CODE);

CODE_MAP.put(TaskNotFoundException.class, ErrorCodes.TASK_NOT_FOUND_ERROR_CODE);
CODE_MAP.put(DuplicateRuntimeException.class, ErrorCodes.DUPLICATED_TASK_ERROR_CODE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@
package com.netease.arctic.server.exception;

/** forbiddenException */
public class ForbiddenException extends ArcticRuntimeException {}
public class ForbiddenException extends ArcticRuntimeException {
public ForbiddenException() {}

public ForbiddenException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package com.netease.arctic.server;

import com.google.common.collect.Maps;
import com.netease.arctic.BasicTableTestHelper;
import com.netease.arctic.TableTestHelper;
import com.netease.arctic.ams.api.OptimizerProperties;
import com.netease.arctic.ams.api.OptimizerRegisterInfo;
import com.netease.arctic.ams.api.OptimizingTask;
import com.netease.arctic.ams.api.OptimizingTaskId;
Expand Down Expand Up @@ -55,6 +57,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@RunWith(Parameterized.class)
public class TestDefaultOptimizingService extends AMSTableTestBase {
Expand Down Expand Up @@ -340,6 +343,9 @@ public void testReloadFailedTask() {

private OptimizerRegisterInfo buildRegisterInfo() {
OptimizerRegisterInfo registerInfo = new OptimizerRegisterInfo();
Map<String, String> registerProperties = Maps.newHashMap();
registerProperties.put(OptimizerProperties.OPTIMIZER_HEART_BEAT_INTERVAL, "100");
registerInfo.setProperties(registerProperties);
registerInfo.setThreadCount(1);
registerInfo.setMemoryMb(1024);
registerInfo.setGroupName(defaultResourceGroup().getName());
Expand Down

0 comments on commit 6e8bf52

Please sign in to comment.