Skip to content

Commit

Permalink
HDDS-11465. Introducing Schema Versioning for Recon to Handle Fresh I…
Browse files Browse the repository at this point in the history
…nstalls and Upgrades. (apache#7213)
  • Loading branch information
ArafatKhan2198 authored Oct 28, 2024
1 parent 7a27db2 commit 91d41a0
Show file tree
Hide file tree
Showing 12 changed files with 1,000 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hadoop.ozone.recon.schema.ReconSchemaDefinition;
import org.hadoop.ozone.recon.schema.StatsSchemaDefinition;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.SchemaVersionTableDefinition;

import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
Expand All @@ -40,5 +41,6 @@ protected void configure() {
schemaBinder.addBinding().to(ContainerSchemaDefinition.class);
schemaBinder.addBinding().to(ReconTaskSchemaDefinition.class);
schemaBinder.addBinding().to(StatsSchemaDefinition.class);
schemaBinder.addBinding().to(SchemaVersionTableDefinition.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.hadoop.ozone.recon.schema;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

import static org.hadoop.ozone.recon.codegen.SqlDbUtils.TABLE_EXISTS_CHECK;

/**
* Class for managing the schema of the SchemaVersion table.
*/
@Singleton
public class SchemaVersionTableDefinition implements ReconSchemaDefinition {

public static final String SCHEMA_VERSION_TABLE_NAME = "RECON_SCHEMA_VERSION";
private final DataSource dataSource;
private DSLContext dslContext;

@Inject
public SchemaVersionTableDefinition(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public void initializeSchema() throws SQLException {
Connection conn = dataSource.getConnection();
dslContext = DSL.using(conn);

if (!TABLE_EXISTS_CHECK.test(conn, SCHEMA_VERSION_TABLE_NAME)) {
createSchemaVersionTable();
}
}

/**
* Create the Schema Version table.
*/
private void createSchemaVersionTable() throws SQLException {
dslContext.createTableIfNotExists(SCHEMA_VERSION_TABLE_NAME)
.column("version_number", SQLDataType.INTEGER.nullable(false))
.column("applied_on", SQLDataType.TIMESTAMP.defaultValue(DSL.currentTimestamp()))
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ public enum ErrorCode {
Arrays.asList("Overview (OM Data)", "OM DB Insights")),
GET_SCM_DB_SNAPSHOT_FAILED(
"SCM DB Snapshot sync failed !!!",
Arrays.asList("Containers", "Pipelines"));
Arrays.asList("Containers", "Pipelines")),
UPGRADE_FAILURE(
"Schema upgrade failed. Recon encountered an issue while finalizing the layout upgrade.",
Arrays.asList("Recon startup", "Metadata Layout Version"));

private final String message;
private final List<String> impacts;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.ozone.recon;

import com.google.inject.Inject;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.sql.SQLException;

import static org.jooq.impl.DSL.name;

/**
* Manager for handling the Recon Schema Version table.
* This class provides methods to get and update the current schema version.
*/
public class ReconSchemaVersionTableManager {

private static final Logger LOG = LoggerFactory.getLogger(ReconSchemaVersionTableManager.class);
public static final String RECON_SCHEMA_VERSION_TABLE_NAME = "RECON_SCHEMA_VERSION";
private final DSLContext dslContext;
private final DataSource dataSource;

@Inject
public ReconSchemaVersionTableManager(DataSource dataSource) throws SQLException {
this.dataSource = dataSource;
this.dslContext = DSL.using(dataSource.getConnection());
}

/**
* Get the current schema version from the RECON_SCHEMA_VERSION table.
* If the table is empty, or if it does not exist, it will return 0.
* @return The current schema version.
*/
public int getCurrentSchemaVersion() throws SQLException {
try {
return dslContext.select(DSL.field(name("version_number")))
.from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.fetchOptional()
.map(record -> record.get(
DSL.field(name("version_number"), Integer.class)))
.orElse(-1); // Return -1 if no version is found
} catch (Exception e) {
LOG.error("Failed to fetch the current schema version.", e);
throw new SQLException("Unable to read schema version from the table.", e);
}
}

/**
* Update the schema version in the RECON_SCHEMA_VERSION table after all tables are upgraded.
*
* @param newVersion The new version to set.
*/
public void updateSchemaVersion(int newVersion) throws SQLException {
try {
boolean recordExists = dslContext.fetchExists(dslContext.selectOne()
.from(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME)));

if (recordExists) {
// Update the existing schema version record
dslContext.update(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.set(DSL.field(name("version_number")), newVersion)
.set(DSL.field(name("applied_on")), DSL.currentTimestamp())
.execute();
LOG.info("Updated schema version to '{}'.", newVersion);
} else {
// Insert a new schema version record
dslContext.insertInto(DSL.table(RECON_SCHEMA_VERSION_TABLE_NAME))
.columns(DSL.field(name("version_number")),
DSL.field(name("applied_on")))
.values(newVersion, DSL.currentTimestamp())
.execute();
LOG.info("Inserted new schema version '{}'.", newVersion);
}
} catch (Exception e) {
LOG.error("Failed to update schema version to '{}'.", newVersion, e);
throw new SQLException("Unable to update schema version in the table.", e);
}
}

/**
* Provides the data source used by this manager.
* @return The DataSource instance.
*/
public DataSource getDataSource() {
return dataSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.upgrade.ReconLayoutVersionManager;
import org.apache.hadoop.ozone.util.OzoneNetUtils;
import org.apache.hadoop.ozone.util.OzoneVersionInfo;
import org.apache.hadoop.ozone.util.ShutdownHookManager;
Expand Down Expand Up @@ -105,6 +106,7 @@ public Void call() throws Exception {
ReconServer.class, originalArgs, LOG, configuration);
ConfigurationProvider.setConfiguration(configuration);


injector = Guice.createInjector(new ReconControllerModule(),
new ReconRestServletModule(configuration),
new ReconSchemaGenerationModule());
Expand Down Expand Up @@ -136,8 +138,11 @@ public Void call() throws Exception {
this.reconNamespaceSummaryManager =
injector.getInstance(ReconNamespaceSummaryManager.class);

ReconContext reconContext = injector.getInstance(ReconContext.class);

ReconSchemaManager reconSchemaManager =
injector.getInstance(ReconSchemaManager.class);

LOG.info("Creating Recon Schema.");
reconSchemaManager.createReconSchema();
LOG.debug("Recon schema creation done.");
Expand All @@ -153,6 +158,15 @@ public Void call() throws Exception {
this.reconTaskStatusMetrics =
injector.getInstance(ReconTaskStatusMetrics.class);

// Handle Recon Schema Versioning
ReconSchemaVersionTableManager versionTableManager =
injector.getInstance(ReconSchemaVersionTableManager.class);

ReconLayoutVersionManager layoutVersionManager =
new ReconLayoutVersionManager(versionTableManager, reconContext);
// Run the upgrade framework to finalize layout features if needed
layoutVersionManager.finalizeLayoutFeatures();

LOG.info("Initializing support of Recon Features...");
FeatureProvider.initFeatureSupport(configuration);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.ozone.recon.upgrade;

import org.reflections.Reflections;

import java.util.EnumMap;
import java.util.Optional;
import java.util.Set;

/**
* Enum representing Recon layout features with their version, description,
* and associated upgrade action to be executed during an upgrade.
*/
public enum ReconLayoutFeature {
// Represents the starting point for Recon's layout versioning system.
INITIAL_VERSION(0, "Recon Layout Versioning Introduction");

private final int version;
private final String description;
private final EnumMap<ReconUpgradeAction.UpgradeActionType, ReconUpgradeAction> actions =
new EnumMap<>(ReconUpgradeAction.UpgradeActionType.class);

ReconLayoutFeature(final int version, String description) {
this.version = version;
this.description = description;
}

public int getVersion() {
return version;
}

public String getDescription() {
return description;
}

/**
* Retrieves the upgrade action for the specified {@link ReconUpgradeAction.UpgradeActionType}.
*
* @param type The type of the upgrade action (e.g., FINALIZE).
* @return An {@link Optional} containing the upgrade action if present.
*/
public Optional<ReconUpgradeAction> getAction(ReconUpgradeAction.UpgradeActionType type) {
return Optional.ofNullable(actions.get(type));
}

/**
* Associates a given upgrade action with a specific upgrade phase for this feature.
*
* @param type The phase/type of the upgrade action.
* @param action The upgrade action to associate with this feature.
*/
public void addAction(ReconUpgradeAction.UpgradeActionType type, ReconUpgradeAction action) {
actions.put(type, action);
}

/**
* Scans the classpath for all classes annotated with {@link UpgradeActionRecon}
* and registers their upgrade actions for the corresponding feature and phase.
* This method dynamically loads and registers all upgrade actions based on their
* annotations.
*/
public static void registerUpgradeActions() {
Reflections reflections = new Reflections("org.apache.hadoop.ozone.recon.upgrade");
Set<Class<?>> actionClasses = reflections.getTypesAnnotatedWith(UpgradeActionRecon.class);

for (Class<?> actionClass : actionClasses) {
try {
ReconUpgradeAction action = (ReconUpgradeAction) actionClass.getDeclaredConstructor().newInstance();
UpgradeActionRecon annotation = actionClass.getAnnotation(UpgradeActionRecon.class);
annotation.feature().addAction(annotation.type(), action);
} catch (Exception e) {
throw new RuntimeException("Failed to register upgrade action: " + actionClass.getSimpleName(), e);
}
}
}

/**
* Returns the list of all layout feature values.
*
* @return An array of all {@link ReconLayoutFeature} values.
*/
public static ReconLayoutFeature[] getValues() {
return ReconLayoutFeature.values();
}
}
Loading

0 comments on commit 91d41a0

Please sign in to comment.