Skip to content

Commit

Permalink
[AMS-Refactor] fix optimizing integration test (#1487)
Browse files Browse the repository at this point in the history
* fix optimizing integration test

* fix optimizing integration test

* fix optimizing integration test

* fix optimizing integration test and fix runtime bug

* fix optimizing integration test
  • Loading branch information
hameizi authored May 31, 2023
1 parent 8e7865f commit 4e45907
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ public class PropertyNames {

public static final String RESOURCE_ID = "resource-id";
public static final String EXPORT_PROPERTY_PREFIX = "export.";
public static final String JAVA_PARAMETERS_PROPERTY = "jvm-args";

public static final String OPTIMIZER_AMS_URL = "ams-url";
public static final String AMS_HOME = "ams-home";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
.append(resource.getProperties().get(PropertyNames.OPTIMIZER_MEMORY_STORAGE_SIZE));
}
}
if (containerProperties.containsKey(PropertyNames.JAVA_PARAMETERS_PROPERTY)) {
stringBuilder.append(containerProperties.get(PropertyNames.JAVA_PARAMETERS_PROPERTY));
}
if (resource.getResourceId() != null && resource.getResourceId().length() > 0) {
stringBuilder.append(" -id ").append(resource.getResourceId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ protected Map<String, String> doScaleOut(String startUpArgs) {

@Override
protected String buildOptimizerStartupArgsString(Resource resource) {
String taskManagerMemory = PropertyUtil.checkAndGetProperty(getContainerProperties(),
String taskManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(),
TASK_MANAGER_MEMORY_PROPERTY);
String jobManagerMemory = PropertyUtil.checkAndGetProperty(getContainerProperties(), JOB_MANAGER_MEMORY_PROPERTY);
String jobManagerMemory = PropertyUtil.checkAndGetProperty(resource.getProperties(), JOB_MANAGER_MEMORY_PROPERTY);
String jobPath = getAMSHome() + "/plugin/optimize/OptimizeJob.jar";
return String.format("%s/bin/flink run -m yarn-cluster -ytm %s -yjm %s -c %s %s -m %s %s",
getFlinkHome(), taskManagerMemory, jobManagerMemory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private OptimizingTaskResult executeTask(OptimizingTask task) {
ByteBuffer outputByteBuffer = SerializationUtil.simpleSerialize(output);
OptimizingTaskResult result = new OptimizingTaskResult(task.getTaskId(), threadId);
result.setTaskOutput(outputByteBuffer);
result.setSummary(result.summary);
result.setSummary(output.summary());
LOG.info("Optimizer executor[{}] executed task[{}]", threadId, task.getTaskId());
return result;
} catch (Throwable t) {
Expand Down
7 changes: 7 additions & 0 deletions ams/server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.annotations.VisibleForTesting;
import com.netease.arctic.ams.api.ArcticTableMetastore;
import com.netease.arctic.ams.api.Constants;
import com.netease.arctic.ams.api.Environments;
Expand All @@ -34,9 +33,7 @@
import com.netease.arctic.server.resource.ContainerMetadata;
import com.netease.arctic.server.resource.ResourceContainers;
import com.netease.arctic.server.table.DefaultTableService;
import com.netease.arctic.server.table.TableService;
import com.netease.arctic.server.table.executor.AsyncTableExecutors;
import com.netease.arctic.server.utils.ConfigOption;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.server.utils.ThriftServiceProxy;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -146,25 +143,6 @@ public void dispose() {
optimizingService = null;
}

public boolean isStarted() {
return server != null && server.isServing();
}

@VisibleForTesting
void setConfig(ConfigOption option, Object value) {
serviceConfig.set(option, value);
}

@VisibleForTesting
TableService getTableService() {
return this.tableService;
}

@VisibleForTesting
DefaultOptimizingService getOptimizingService() {
return this.optimizingService;
}

private void initConfig() throws IOException {
LOG.info("initializing configurations...");
new ConfigurationHelper().init();
Expand Down Expand Up @@ -254,8 +232,10 @@ private void initResourceGroupConfig() {
"can not find such container config named" +
groupBuilder.getContainer());
}
if (groupConfig.containsKey(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES)) {
groupBuilder.addProperties(groupConfig.getObject(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES,
if (groupConfig.containsKey(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES) &&
groupConfig.get(ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES) != null) {
groupBuilder.addProperties(groupConfig.getObject(
ArcticManagementConf.OPTIMIZER_GROUP_PROPERTIES,
Map.class));
}
resourceGroups.add(groupBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class DefaultOptimizingService extends DefaultResourceManager
private final Map<String, OptimizingQueue> optimizingQueueByGroup = new ConcurrentHashMap<>();
private final Map<String, OptimizingQueue> optimizingQueueByToken = new ConcurrentHashMap<>();
private final TableManager tableManager;
private RuntimeHandlerChain tableHandlerChain;
private final RuntimeHandlerChain tableHandlerChain;
private Timer optimizerMonitorTimer;

public DefaultOptimizingService(DefaultTableService tableService, List<ResourceGroup> resourceGroups) {
Expand Down Expand Up @@ -108,9 +108,12 @@ public void touch(String authToken) {

@Override
public OptimizingTask pollTask(String authToken, int threadId) {
LOG.info("Optimizer {} polling task", authToken);
OptimizingQueue queue = getQueueByToken(authToken);
return queue.pollTask(authToken, threadId);
OptimizingTask task = queue.pollTask(authToken, threadId);
if (task != null) {
LOG.info("Optimizer {} polling task", authToken);
}
return task;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public enum SqlExample {
" primary key (id)\n" +
") using arctic \n" +
"partitioned by (days(ts)) \n" +
"tblproperties (\"table.props\" = \"val\");"),
"tblproperties ('table.props' = 'val');"),
DELETE_TABLE("DeleteTable",
"drop table db_name.table_name;"),
EDIT_TABLE("EditTable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ public void acceptResult(TaskRuntime taskRuntime) {
throw new OptimizingClosedException(processId);
}
if (taskRuntime.getStatus() == TaskRuntime.Status.SUCCESS && allTasksPrepared()) {
tableRuntime.beginCommitting();
this.metricsSummary.addNewFileCnt(taskRuntime.getSummary().getNewFileCnt());
this.metricsSummary.addNewFileSize(taskRuntime.getSummary().getNewFileSize());
tableRuntime.beginCommitting();
} else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) {
if (taskRuntime.getRetry() <= tableRuntime.getMaxExecuteRetryCount()) {
retryTask(taskRuntime, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class DefaultTableService extends PersistentBase implements TableService
private final Map<String, InternalCatalog> internalCatalogMap = new ConcurrentHashMap<>();
private final Map<String, ExternalCatalog> externalCatalogMap = new ConcurrentHashMap<>();
private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap = new ConcurrentHashMap<>();
private volatile boolean started = false;
private RuntimeHandlerChain headHandler;
private Timer tableExplorerTimer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
import com.netease.arctic.catalog.ArcticCatalog;
import com.netease.arctic.catalog.CatalogLoader;
import com.netease.arctic.catalog.CatalogTestHelpers;
import com.netease.arctic.hive.TestHMS;
import com.netease.arctic.hive.HMSMockServer;
import com.netease.arctic.optimizer.local.LocalOptimizer;
import com.netease.arctic.server.resource.ResourceContainers;
import com.netease.arctic.server.table.DefaultTableService;
import com.netease.arctic.server.utils.Configurations;
import com.netease.arctic.table.TableIdentifier;
import org.apache.commons.io.FileUtils;
import org.apache.curator.shaded.com.google.common.io.MoreFiles;
import org.apache.curator.shaded.com.google.common.io.RecursiveDeleteOption;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;
import org.junit.rules.TemporaryFolder;
import org.kohsuke.args4j.CmdLineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,6 +34,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

public class AmsEnvironment {
Expand All @@ -39,10 +43,12 @@ public class AmsEnvironment {
private final String rootPath;
private static final String DEFAULT_ROOT_PATH = "/tmp/arctic_integration";
private static final String OPTIMIZE_GROUP = "default";
private ArcticServiceContainer arcticService;
private AtomicBoolean amsExit;
private final ArcticServiceContainer arcticService;
private Configurations serviceConfig;
private DefaultTableService tableService;
private final AtomicBoolean amsExit;
private int thriftBindPort;
private final TestHMS testHMS;
private final HMSMockServer testHMS;
private final Map<String, ArcticCatalog> catalogs = new HashMap<>();
public static final String ICEBERG_CATALOG = "iceberg_catalog";
public static String ICEBERG_CATALOG_DIR = "/iceberg/warehouse";
Expand All @@ -54,8 +60,6 @@ public static void main(String[] args) throws Exception {
AmsEnvironment amsEnvironment = new AmsEnvironment();
amsEnvironment.start();
amsEnvironment.startOptimizer();
Thread.sleep(2 * 60 * 1000);
amsEnvironment.stopOptimizer();
}

public AmsEnvironment() throws Exception {
Expand All @@ -71,22 +75,34 @@ public AmsEnvironment(String rootPath) throws Exception {
System.setProperty("derby.init.sql.dir", path + "../classes/sql/derby/");
amsExit = new AtomicBoolean(false);
arcticService = new ArcticServiceContainer();
testHMS = new TestHMS();
TemporaryFolder hiveDir = new TemporaryFolder();
hiveDir.create();
testHMS = new HMSMockServer(hiveDir.newFile());
testHMS.start();
}

public void start() throws Exception {
startAms();
DynFields.UnboundField<Boolean> field =
DynFields.builder().hiddenImpl(DefaultTableService.class, "started").build();
boolean tableServiceIsStart = field.bind(arcticService.getTableService()).get();
DynFields.UnboundField<DefaultTableService> amsTableServiceField =
DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "tableService").build();
tableService = amsTableServiceField.bind(arcticService).get();
DynFields.UnboundField<CompletableFuture<Boolean>> tableServiceField =
DynFields.builder().hiddenImpl(DefaultTableService.class, "initialized").build();
boolean tableServiceIsStart = false;
long startTime = System.currentTimeMillis();
while (!tableServiceIsStart) {
if (System.currentTimeMillis() - startTime > 10000) {
throw new RuntimeException("table service not start yet after 10s");
}
try {
tableServiceField.bind(tableService).get().get();
tableServiceIsStart = true;
} catch (RuntimeException e) {
LOG.info("table service not start yet");
}
Thread.sleep(1000);
tableServiceIsStart = field.bind(arcticService.getTableService()).get();
}

initCatalog();
}

Expand All @@ -95,6 +111,7 @@ public void stop() throws IOException {
if (this.arcticService != null) {
this.arcticService.dispose();
}
testHMS.stop();
MoreFiles.deleteRecursively(Paths.get(rootPath), RecursiveDeleteOption.ALLOW_INSECURE);
}

Expand All @@ -103,10 +120,10 @@ public ArcticCatalog catalog(String name) {
}

public boolean tableExist(TableIdentifier tableIdentifier) {
return arcticService.getTableService().tableExist(tableIdentifier.buildTableIdentifier());
return tableService.tableExist(tableIdentifier.buildTableIdentifier());
}

public TestHMS getTestHMS() {
public HMSMockServer getTestHMS() {
return testHMS;
}

Expand All @@ -123,7 +140,7 @@ private void createIcebergCatalog() {
properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir);
CatalogMeta catalogMeta = CatalogTestHelpers.buildCatalogMeta(ICEBERG_CATALOG,
CatalogMetaProperties.CATALOG_TYPE_HADOOP, properties, TableFormat.ICEBERG);
arcticService.getTableService().createCatalog(catalogMeta);
tableService.createCatalog(catalogMeta);
catalogs.put(ICEBERG_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + ICEBERG_CATALOG));
}

Expand All @@ -134,15 +151,15 @@ private void createMixIcebergCatalog() {
properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir);
CatalogMeta catalogMeta = CatalogTestHelpers.buildCatalogMeta(MIXED_ICEBERG_CATALOG,
CatalogMetaProperties.CATALOG_TYPE_AMS, properties, TableFormat.MIXED_ICEBERG);
arcticService.getTableService().createCatalog(catalogMeta);
tableService.createCatalog(catalogMeta);
catalogs.put(MIXED_ICEBERG_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + MIXED_ICEBERG_CATALOG));
}

private void createMixHiveCatalog() {
Map<String, String> properties = Maps.newHashMap();
CatalogMeta catalogMeta = CatalogTestHelpers.buildHiveCatalogMeta(MIXED_HIVE_CATALOG,
properties, testHMS.getHiveConf(), TableFormat.MIXED_HIVE);
arcticService.getTableService().createCatalog(catalogMeta);
properties, testHMS.hiveConf(), TableFormat.MIXED_HIVE);
tableService.createCatalog(catalogMeta);
catalogs.put(MIXED_HIVE_CATALOG, CatalogLoader.load(getAmsUrl() + "/" + MIXED_HIVE_CATALOG));
}

Expand All @@ -167,7 +184,9 @@ public void startOptimizer() {
}

public void stopOptimizer() {
arcticService.getOptimizingService().listOptimizers()
DynFields.UnboundField<DefaultOptimizingService> field =
DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "optimizingService").build();
field.bind(arcticService).get().listOptimizers()
.forEach(resource -> {
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
});
Expand All @@ -185,8 +204,11 @@ private void startAms() throws Exception {
try {
LOG.info("start ams");
genThriftBindPort();
arcticService.setConfig(ArcticManagementConf.THRIFT_BIND_PORT, thriftBindPort);
arcticService.setConfig(ArcticManagementConf.EXTERNAL_CATALOG_REFRESH_INTERVAL, 5 * 1000L);
DynFields.UnboundField<Configurations> field =
DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "serviceConfig").build();
serviceConfig = field.bind(arcticService).get();
serviceConfig.set(ArcticManagementConf.THRIFT_BIND_PORT, thriftBindPort);
serviceConfig.set(ArcticManagementConf.EXTERNAL_CATALOG_REFRESH_INTERVAL, 1000L);
// when AMS is successfully running, this thread will wait here
arcticService.startService();
break;
Expand All @@ -213,12 +235,15 @@ private void startAms() throws Exception {
}, "ams-runner");
amsRunner.start();

DynFields.UnboundField<TServer> amsServerField =
DynFields.builder().hiddenImpl(ArcticServiceContainer.class, "server").build();
while (true) {
if (amsExit.get()) {
LOG.error("ams exit");
break;
}
if (arcticService.isStarted()) {
TServer thriftServer = amsServerField.bind(arcticService).get();
if (thriftServer != null && thriftServer.isServing()) {
LOG.info("ams start");
break;
}
Expand Down
Loading

0 comments on commit 4e45907

Please sign in to comment.