Skip to content

Commit

Permalink
fix: validate connection to DB after migration (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Jun 16, 2023
1 parent 63a00b4 commit bbb5cfa
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void before() throws IOException {
database = new ShadowManagerDatabase(kernel);
database.install();
JsonUtil.loadSchema();
database.open();
dao = new ShadowManagerDAOImpl(database);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import com.aws.greengrass.shadowmanager.exception.ShadowManagerDataException;
import com.aws.greengrass.shadowmanager.model.ShadowDocument;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.apache.commons.io.FileUtils;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.h2.jdbcx.JdbcConnectionPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -66,7 +67,6 @@
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand All @@ -80,7 +80,6 @@ void initializeShadowManagerDatabase() throws IOException {
System.setProperty("aws.greengrass.scanSelfClasspath", "true");
db = new ShadowManagerDatabase(rootDir);
db.install();
db.open();
}

@AfterEach
Expand Down Expand Up @@ -126,7 +125,6 @@ void GIVEN_data_WHEN_restart_THEN_data_still_exists() throws Exception {
@Test
void GIVEN_migrations_WHEN_install_THEN_shadow_manager_database_installs_and_starts_successfully() throws Exception {
// GIVEN
db.open();
assertNotNull(db.getPool());

// WHEN
Expand Down Expand Up @@ -158,7 +156,6 @@ void GIVEN_corrupted_db_WHEN_install_THEN_shadow_manager_database_reinstalls_and
Path source = Paths.get(getClass().getResource("database/corrupted.mv.db").toURI());
Files.copy(source, dest, StandardCopyOption.REPLACE_EXISTING);
// GIVEN
db.open();
assertNotNull(db.getPool());

// WHEN
Expand Down Expand Up @@ -188,15 +185,14 @@ void GIVEN_shadow_manager_database_connected_WHEN_close_THEN_shadow_manager_data
assertNotNull(c, "connection should not be null");
assertThat("connection is not closed", c.isClosed(), is(false));
c.close();
JdbcConnectionPool pool = db.getPool();
db.close();
assertThat("active connections", db.getPool().getActiveConnections(), is(0));
assertThrows(IllegalStateException.class, () -> db.getPool().getConnection());
assertThat("active connections", pool.getActiveConnections(), is(0));
}

@Test
void GIVEN_shadow_manager_database_open_WHEN_closed_and_opened_THEN_shadow_manager_database_can_return_connections() throws Exception {
db.close();
db.open();
Connection c = assertDoesNotThrow(() -> db.getPool().getConnection());
assertNotNull(c, "connection should not be null");
assertThat("connection is not closed", c.isClosed(), is(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.iotdataplane.model.GetThingShadowResponse;

import javax.net.ssl.KeyManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -53,6 +52,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;

import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_THING_NAME;
Expand All @@ -66,7 +66,6 @@
import static com.aws.greengrass.shadowmanager.model.Constants.CONFIGURATION_SYNCHRONIZATION_TOPIC;
import static com.aws.greengrass.shadowmanager.model.Constants.CONFIGURATION_SYNC_DIRECTION_TOPIC;
import static com.aws.greengrass.shadowmanager.model.Constants.CONFIGURATION_THING_NAME_TOPIC;

import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
import static com.github.grantwest.eventually.EventuallyLambdaMatcher.eventuallyEval;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -170,6 +169,9 @@ void GIVEN_existing_sync_information_WHEN_config_updates_THEN_removed_sync_infor
.configFile(DEFAULT_CONFIG)
.mqttConnected(false)
.build());
// Validate that shadow manager will continue to work properly even if it is reinstalled.
shadowManager.requestReinstall();
Thread.sleep(1000);
shadowManager.startSyncingShadows(ShadowManager.StartSyncInfo.builder().build());
ShadowManagerDAOImpl impl = kernel.getContext().get(ShadowManagerDAOImpl.class);
createThingShadowSyncInfo(impl, THING_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,6 @@ public void postInject() {
@SuppressWarnings({"PMD.AvoidCatchingGenericException"})
public void startup() {
try {
database.open();

reportState(State.RUNNING);
setupSync();
} catch (Exception e) {
Expand All @@ -484,13 +482,14 @@ public void startup() {
}

@Override
@SuppressWarnings("PMD.AvoidCatchingGenericException")
protected void shutdown() throws InterruptedException {
try {
stopSyncingShadows(true);
pubSubIntegrator.unsubscribe();
inboundRateLimiter.clear();
database.close();
} catch (IOException e) {
} catch (Exception e) {
logger.atError()
.setEventType(LogEvents.DATABASE_CLOSE_ERROR.code())
.setCause(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.aws.greengrass.shadowmanager.model.ShadowDocument;
import com.aws.greengrass.shadowmanager.model.dao.SyncInformation;
import com.aws.greengrass.util.Pair;
import org.h2.jdbcx.JdbcConnectionPool;

import java.io.IOException;
import java.sql.Connection;
Expand Down Expand Up @@ -58,7 +59,7 @@ public Optional<ShadowDocument> getShadowThing(String thingName, String shadowNa
String sql = "SELECT document, version, updateTime FROM documents WHERE deleted = 0 AND "
+ "thingName = ? AND shadowName = ?";

try (Connection c = database.getPool().getConnection();
try (Connection c = getPool().getConnection();
PreparedStatement preparedStatement = c.prepareStatement(sql)) {
preparedStatement.setString(1, thingName);
preparedStatement.setString(2, shadowName);
Expand All @@ -75,6 +76,14 @@ public Optional<ShadowDocument> getShadowThing(String thingName, String shadowNa
}
}

private JdbcConnectionPool getPool() {
if (!database.isInitialized()) {
throw new ShadowManagerDataException("Database pool not initialized. Shadow manager most likely isn't "
+ "running yet. Wait for Shadow manager to be running.");
}
return database.getPool();
}

/**
* Attempts to delete the shadow document from the local shadow storage.
*
Expand Down Expand Up @@ -262,7 +271,7 @@ public List<Pair<String, String>> listSyncedShadows() {
public Optional<Long> getDeletedShadowVersion(String thingName, String shadowName) {
String sql = "SELECT version FROM documents WHERE deleted = 1 AND thingName = ? AND shadowName = ?";

try (Connection c = database.getPool().getConnection();
try (Connection c = getPool().getConnection();
PreparedStatement preparedStatement = c.prepareStatement(sql)) {
preparedStatement.setString(1, thingName);
preparedStatement.setString(2, shadowName);
Expand Down Expand Up @@ -359,7 +368,7 @@ public boolean insertSyncInfoIfNotExists(SyncInformation request) {
}

private <T> T execute(String sql, SQLExecution<T> thunk) {
try (Connection c = database.getPool().getConnection();
try (Connection c = getPool().getConnection();
PreparedStatement statement = c.prepareStatement(sql)) {
statement.setQueryTimeout(10);
return thunk.apply(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
Expand All @@ -46,14 +47,13 @@ public class ShadowManagerDatabase implements Closeable {
;
private final JdbcDataSource dataSource;

@Getter
private JdbcConnectionPool pool;

private boolean closed = true;
private static final Logger logger = LogManager.getLogger(ShadowManagerDatabase.class);
private final Path databasePath;
private ExecutorService dbWriteThreadPool;
@Getter
private final ExecutorService dbWriteThreadPool = Executors.newCachedThreadPool();
private boolean initialized = false;

/**
* Creates a database with a {@link javax.sql.DataSource} using the kernel config.
Expand Down Expand Up @@ -90,6 +90,7 @@ public void install() throws ShadowManagerDataException {
deleteDB(databasePath);
migrateDB();
}
initialized = true;
} catch (FlywayException | IOException e) {
throw new ShadowManagerDataException(e);
}
Expand All @@ -103,18 +104,40 @@ private void migrateDB() {
flyway.migrate();
}

@SuppressWarnings({"checkstyle:EmptyBlock", "checkstyle:WhitespaceAround", "PMD.AvoidCatchingGenericException"})
private boolean migrateAndGetResult() {
try {
migrateDB();
return true;
} catch (FlywaySqlException flywaySqlException) {
if (flywaySqlException.getCause() instanceof JdbcSQLNonTransientException
&& flywaySqlException.getCause().getCause() instanceof IllegalStateException) {
logger.atError().cause(flywaySqlException).log("Shadow manager DB is corrupted.");
logger.atError().cause(flywaySqlException).log("Shadow manager DB is corrupted");
return false;
}
throw flywaySqlException;
}

// Validate that after migration we're actually able to open and connect to the DB.
// This may fail if closing the DB after migration failed for some reason.
try {
try (Connection p = getPool().getConnection()) {}
return true;
} catch (Exception e) {
logger.atError().cause(e).log("Shadow manager DB could not be opened. Deleting and recreating it");
close();
return false;
}
}

/**
* Get a reference to the connection pool.
* @return JDBC connection pool
*/
public synchronized JdbcConnectionPool getPool() {
if (pool == null) {
pool = JdbcConnectionPool.create(dataSource);
}
return pool;
}

private void deleteDB(Path databasePath) throws IOException {
Expand All @@ -131,26 +154,29 @@ private void deleteDB(Path databasePath) throws IOException {
}
}

/**
* Open the database to allow connections.
*/
@Synchronized
public void open() {
if (closed) {
// defaults to 10 connections with a 30s timeout waiting for a connection
pool = JdbcConnectionPool.create(dataSource);
closed = false;
}
}

@Override
@Synchronized
@SuppressWarnings("PMD.NullAssignment")
@SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR", justification = "Field gated by flag")
public void close() throws IOException {
if (!closed) {
public void close() {
if (dbWriteThreadPool != null) {
dbWriteThreadPool.shutdown();
}
if (pool != null) {
pool.dispose();
closed = true;
pool = null;
}
}

/**
* Get the thread pool used for writing to the DB.
*
* @return a running thread pool
*/
public synchronized ExecutorService getDbWriteThreadPool() {
if (dbWriteThreadPool == null || dbWriteThreadPool.isShutdown()) {
dbWriteThreadPool = Executors.newCachedThreadPool();
}
return dbWriteThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,8 @@ public class ShadowManagerDataException extends RuntimeException {
public ShadowManagerDataException(final Throwable ex) {
super(ex);
}

public ShadowManagerDataException(final String s) {
super(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void setup() throws SQLException, IOException {
lenient().when(mockDatabase.getDbWriteThreadPool()).thenReturn(es);
when(mockDatabase.getPool()).thenReturn(mockPool);
when(mockPool.getConnection()).thenReturn(mockConnection);
when(mockDatabase.isInitialized()).thenReturn(true);
JsonUtil.loadSchema();
}
@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -617,7 +616,6 @@ void GIVEN_shadow_manager_WHEN_startup_THEN_updates_stored_config_and_starts_syn
verify(mockCloudDataClient, times(1)).updateSubscriptions(anySet());
verify(mockSyncHandler, times(1)).start(any(SyncContext.class), anyInt());

verify(mockDatabase, times(1)).open();
verify(mockDao, times(1)).deleteSyncInformation("foo", "bar");

ArgumentCaptor<SyncInformation> captor = ArgumentCaptor.forClass(SyncInformation.class);
Expand Down Expand Up @@ -648,13 +646,6 @@ void GIVEN_shadow_manager_WHEN_shutdown_THEN_shuts_down_gracefully() throws IOEx
verify(mockPubSubClientWrapper, atMostOnce()).unsubscribe(any());
}

@Test
void GIVEN_shadow_manager_db_WHEN_shutdown_throws_io_exception_THEN_catches_exception(ExtensionContext extensionContext) throws IOException {
ignoreExceptionOfType(extensionContext, IOException.class);
doThrow(IOException.class).when(mockDatabase).close();
assertDoesNotThrow(() -> shadowManager.shutdown());
}

@Test
void GIVEN_installed_WHEN_running_and_config_updated_THEN_sync_restarted() throws Exception {
when(mockMqttClient.connected()).thenReturn(true);
Expand Down
2 changes: 1 addition & 1 deletion uat/testing-features/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>com.aws.greengrass</groupId>
<artifactId>aws-greengrass-testing-standalone</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down

0 comments on commit bbb5cfa

Please sign in to comment.