Skip to content

Commit

Permalink
HDDS-11615. Add Upgrade Action for Initial Schema Constraints for Unh…
Browse files Browse the repository at this point in the history
…ealthy Container Table in Recon. (apache#7372)
  • Loading branch information
ArafatKhan2198 authored Nov 15, 2024
1 parent 4066c7c commit dd22dbe
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;

/**
* Class used to create tables that are required for tracking containers.
Expand Down Expand Up @@ -70,39 +69,11 @@ public enum UnHealthyContainerStates {
public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
dslContext = DSL.using(conn);

if (TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
// Drop the existing constraint if it exists
String constraintName = UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1";
dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME)
.dropConstraint(constraintName)
.execute();

// Add the updated constraint with all enum states
addUpdatedConstraint();
} else {
// Create the table if it does not exist
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
createUnhealthyContainersTable();
}
}

/**
* Add the updated constraint to the table.
*/
private void addUpdatedConstraint() {
// Get all enum values as a list of strings
String[] enumStates = Arrays.stream(UnHealthyContainerStates.values())
.map(Enum::name)
.toArray(String[]::new);

// Alter the table to add the updated constraint
dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME)
.add(DSL.constraint(UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1")
.check(field(name("container_state"))
.in(enumStates)))
.execute();
}

/**
* Create the Missing Containers table.
*/
Expand All @@ -126,4 +97,8 @@ private void createUnhealthyContainersTable() {
public DSLContext getDSLContext() {
return dslContext;
}

public DataSource getDataSource() {
return dataSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public Void call() throws Exception {
ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(versionTableManager, reconContext);
// Run the upgrade framework to finalize layout features if needed
layoutVersionManager.finalizeLayoutFeatures();
ReconStorageContainerManagerFacade reconStorageContainerManagerFacade =
(ReconStorageContainerManagerFacade) this.getReconStorageContainerManager();
layoutVersionManager.finalizeLayoutFeatures(reconStorageContainerManagerFacade);

LOG.info("Initializing support of Recon Features...");
FeatureProvider.initFeatureSupport(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;

/**
* Recon's 'lite' version of SCM.
*/
Expand All @@ -156,6 +158,7 @@ public class ReconStorageContainerManagerFacade
private final SCMHAManager scmhaManager;
private final SequenceIdGenerator sequenceIdGen;
private final ContainerHealthTask containerHealthTask;
private final DataSource dataSource;

private DBStore dbStore;
private ReconNodeManager nodeManager;
Expand Down Expand Up @@ -188,7 +191,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
ReconContainerMetadataManager reconContainerMetadataManager,
ReconUtils reconUtils,
ReconSafeModeManager safeModeManager,
ReconContext reconContext) throws IOException {
ReconContext reconContext,
DataSource dataSource) throws IOException {
reconNodeDetails = reconUtils.getReconNodeDetails(conf);
this.threadNamePrefix = reconNodeDetails.threadNamePrefix();
this.eventQueue = new EventQueue(threadNamePrefix);
Expand Down Expand Up @@ -285,6 +289,8 @@ public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
containerCountBySizeDao,
utilizationSchemaDefinition);

this.dataSource = dataSource;

StaleNodeHandler staleNodeHandler =
new ReconStaleNodeHandler(nodeManager, pipelineManager, conf,
pipelineSyncTask);
Expand Down Expand Up @@ -754,4 +760,8 @@ public ContainerCountBySizeDao getContainerCountBySizeDao() {
public ReconContext getReconContext() {
return reconContext;
}

public DataSource getDataSource() {
return dataSource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.ozone.recon.upgrade;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;

import static org.apache.hadoop.ozone.recon.upgrade.ReconLayoutFeature.INITIAL_VERSION;
import static org.apache.hadoop.ozone.recon.upgrade.ReconUpgradeAction.UpgradeActionType.FINALIZE;
import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;
import static org.hadoop.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;

/**
* Upgrade action for the INITIAL schema version, which manages constraints
* for the UNHEALTHY_CONTAINERS table.
*/
@UpgradeActionRecon(feature = INITIAL_VERSION, type = FINALIZE)
public class InitialConstraintUpgradeAction implements ReconUpgradeAction {

private static final Logger LOG = LoggerFactory.getLogger(InitialConstraintUpgradeAction.class);
private DataSource dataSource;
private DSLContext dslContext;

@Override
public void execute(ReconStorageContainerManagerFacade scmFacade) throws SQLException {
this.dataSource = scmFacade.getDataSource();
try (Connection conn = dataSource.getConnection()) {
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
return;
}
dslContext = DSL.using(conn);
// Drop the existing constraint
dropConstraint();
// Add the updated constraint with all enum states
addUpdatedConstraint();
} catch (SQLException e) {
throw new SQLException("Failed to execute InitialConstraintUpgradeAction", e);
}
}

/**
* Drops the existing constraint from the UNHEALTHY_CONTAINERS table.
*/
private void dropConstraint() {
String constraintName = UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1";
dslContext.alterTable(UNHEALTHY_CONTAINERS_TABLE_NAME)
.dropConstraint(constraintName)
.execute();
LOG.debug("Dropped the existing constraint: {}", constraintName);
}

/**
* Adds the updated constraint directly within this class.
*/
private void addUpdatedConstraint() {
String[] enumStates = Arrays
.stream(ContainerSchemaDefinition.UnHealthyContainerStates.values())
.map(Enum::name)
.toArray(String[]::new);

dslContext.alterTable(ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME)
.add(DSL.constraint(ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME + "ck1")
.check(field(name("container_state"))
.in(enumStates)))
.execute();

LOG.info("Added the updated constraint to the UNHEALTHY_CONTAINERS table for enum state values: {}",
Arrays.toString(enumStates));
}

@Override
public UpgradeActionType getType() {
return FINALIZE;
}

@VisibleForTesting
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}

@VisibleForTesting
public void setDslContext(DSLContext dslContext) {
this.dslContext = dslContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.ozone.recon.ReconContext;
import org.apache.hadoop.ozone.recon.ReconSchemaVersionTableManager;
import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,7 +79,7 @@ private int determineSLV() {
* Finalizes the layout features that need to be upgraded, by executing the upgrade action for each
* feature that is registered for finalization.
*/
public void finalizeLayoutFeatures() {
public void finalizeLayoutFeatures(ReconStorageContainerManagerFacade scmFacade) {
// Get features that need finalization, sorted by version
List<ReconLayoutFeature> featuresToFinalize = getRegisteredFeatures();

Expand All @@ -88,7 +89,7 @@ public void finalizeLayoutFeatures() {
Optional<ReconUpgradeAction> action = feature.getAction(ReconUpgradeAction.UpgradeActionType.FINALIZE);
if (action.isPresent()) {
// Execute the upgrade action & update the schema version in the DB
action.get().execute();
action.get().execute(scmFacade);
updateSchemaVersion(feature.getVersion());
LOG.info("Feature versioned {} finalized successfully.", feature.getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.ozone.recon.upgrade;

import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade;

/**
* ReconUpgradeAction is an interface for executing upgrade actions in Recon.
*/
Expand All @@ -40,7 +42,7 @@ enum UpgradeActionType {
/**
* Execute the upgrade action.
*/
void execute() throws Exception;
void execute(ReconStorageContainerManagerFacade scmFacade) throws Exception;

/**
* Provides the type of upgrade phase (e.g., FINALIZE).
Expand Down
Loading

0 comments on commit dd22dbe

Please sign in to comment.