Skip to content

Commit

Permalink
fix: synchronize all DAO operations to avoid H2 threading problems (#202
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MikeDombo authored Feb 27, 2024
1 parent ef1ceeb commit 0199edd
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 181 deletions.
1 change: 1 addition & 0 deletions .github/workflows/uat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
role-to-assume: ${{ env.AWS_ROLE_TO_ASSUME }}
role-session-name: shadowManagerCI
aws-region: ${{ env.AWS_REGION }}
role-duration-seconds: 7200
- name: Run UAT on linux
uses: aws-actions/aws-codebuild-run-build@v1
with:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void GIVEN_non_default_max_shadow_size_WHEN_update_shadow_THEN_throws_invalid_ar
ignoreExceptionOfType(context, InterruptedException.class);
ignoreExceptionOfType(context, InvalidRequestParametersException.class);

startNucleusWithConfig("shadow.yaml");
startNucleusWithConfig("shadowUnhappy.yaml");

int sizeLimit = 20 * 1024;
shadowManager.getConfig().lookup(CONFIGURATION_CONFIG_KEY, CONFIGURATION_MAX_DOC_SIZE_LIMIT_B_TOPIC).withValue(
Expand All @@ -112,7 +112,7 @@ void GIVEN_non_default_max_shadow_size_WHEN_update_shadow_document_size_and_upda
ignoreExceptionOfType(context, InterruptedException.class);
ignoreExceptionOfType(context, InvalidRequestParametersException.class);

startNucleusWithConfig("shadow.yaml");
startNucleusWithConfig("shadowUnhappy.yaml");

shadowManager.getConfig().lookup(CONFIGURATION_CONFIG_KEY, CONFIGURATION_MAX_DOC_SIZE_LIMIT_B_TOPIC).withValue(20 * 1024);
UpdateThingShadowRequestHandler updateHandler = shadowManager.getUpdateThingShadowRequestHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ services:
posixUser: nobody
aws.greengrass.ShadowManager:
configuration:
strategy:
type: realTime
synchronize:
direction: betweenDeviceAndCloud
shadowDocuments:
- thingName: "mockThing"
classic: false
namedShadows:
- testShadowName
rateLimits:
maxLocalRequestsPerSecondPerThing: 100
maxLocalRequestsPerSecondPerThing: 10000
maxTotalLocalRequestsRate: 10000
main:
dependencies:
- DoAll
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
services:
aws.greengrass.Nucleus:
configuration:
runWithDefault:
posixUser: nobody
aws.greengrass.ShadowManager:
configuration:
rateLimits:
maxLocalRequestsPerSecondPerThing: 100
main:
dependencies:
- DoAll
DoAll:
lifecycle:
startup: echo "Running"
shutdown: echo "Stopping"
dependencies:
- aws.greengrass.ShadowManager
configuration:
accessControl:
aws.greengrass.ShadowManager:
policyId1:
policyDescription: access to CRUD shadow operations
operations:
- 'aws.greengrass#GetThingShadow'
- 'aws.greengrass#UpdateThingShadow'
- 'aws.greengrass#DeleteThingShadow'
resources:
- '*'
policyId2:
policyDescription: access to list named shadows
operations:
- 'aws.greengrass#ListNamedShadowsForThing'
resources:
- '*'
aws.greengrass.ipc.pubsub:
policyId3:
policyDescription: access to pubsub topics
operations:
- 'aws.greengrass#SubscribeToTopic'
resources:
- '*'
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;

import static com.aws.greengrass.shadowmanager.model.Constants.LOG_CLOUD_VERSION_KEY;
Expand Down Expand Up @@ -55,7 +52,7 @@ public ShadowManagerDAOImpl(final ShadowManagerDatabase database) {
* @return The queried shadow from the local shadow store
*/
@Override
public Optional<ShadowDocument> getShadowThing(String thingName, String shadowName) {
public synchronized Optional<ShadowDocument> getShadowThing(String thingName, String shadowName) {
String sql = "SELECT document, version, updateTime FROM documents WHERE deleted = 0 AND "
+ "thingName = ? AND shadowName = ?";

Expand Down Expand Up @@ -92,7 +89,7 @@ private JdbcConnectionPool getPool() {
* @return The deleted shadow from the local shadow store
*/
@Override
public Optional<ShadowDocument> deleteShadowThing(String thingName, String shadowName) {
public synchronized Optional<ShadowDocument> deleteShadowThing(String thingName, String shadowName) {
// To be consistent with cloud, subsequent updates to the shadow should not start from version 0
// https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-data-flow.html
logger.atDebug()
Expand All @@ -103,7 +100,7 @@ public Optional<ShadowDocument> deleteShadowThing(String thingName, String shado
+ " WHERE thingName = ? AND shadowName = ?";
return getShadowThing(thingName, shadowName)
.flatMap(shadowDocument ->
executeWriteOperation(sql,
execute(sql,
preparedStatement -> {
preparedStatement.setLong(1, Instant.now().getEpochSecond());
preparedStatement.setLong(2, shadowDocument.getVersion() + 1);
Expand Down Expand Up @@ -134,7 +131,7 @@ public Optional<byte[]> updateShadowThing(String thingName, String shadowName, b
.log("Updating shadow");
String sql = "MERGE INTO documents(thingName, shadowName, document, version, deleted, updateTime) "
+ "KEY (thingName, shadowName) VALUES (?, ?, ?, ?, ?, ?)";
return executeWriteOperation(sql,
return execute(sql,
preparedStatement -> {
preparedStatement.setString(1, thingName);
preparedStatement.setString(2, shadowName);
Expand Down Expand Up @@ -193,7 +190,7 @@ public boolean updateSyncInformation(final SyncInformation request) {
String sql = "MERGE INTO sync(thingName, shadowName, lastSyncedDocument, cloudVersion, cloudDeleted, "
+ "cloudUpdateTime, lastSyncTime, localVersion) KEY (thingName, shadowName) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
return executeWriteOperation(sql,
return execute(sql,
preparedStatement -> {
preparedStatement.setString(1, request.getThingName());
preparedStatement.setString(2, request.getShadowName());
Expand Down Expand Up @@ -268,7 +265,7 @@ public List<Pair<String, String>> listSyncedShadows() {
* @return The deleted shadow version if it was deleted or exists; Else an empty optional
*/
@Override
public Optional<Long> getDeletedShadowVersion(String thingName, String shadowName) {
public synchronized Optional<Long> getDeletedShadowVersion(String thingName, String shadowName) {
String sql = "SELECT version FROM documents WHERE deleted = 1 AND thingName = ? AND shadowName = ?";

try (Connection c = getPool().getConnection();
Expand Down Expand Up @@ -300,7 +297,7 @@ public boolean deleteSyncInformation(String thingName, String shadowName) {
.kv(LOG_THING_NAME_KEY, thingName)
.kv(LOG_SHADOW_NAME_KEY, shadowName)
.log("Deleting sync info");
return executeWriteOperation("DELETE FROM sync WHERE thingName = ? AND shadowName = ?",
return execute("DELETE FROM sync WHERE thingName = ? AND shadowName = ?",
preparedStatement -> {
preparedStatement.setString(1, thingName);
preparedStatement.setString(2, shadowName);
Expand Down Expand Up @@ -349,7 +346,7 @@ public boolean insertSyncInfoIfNotExists(SyncInformation request) {
String sql = "INSERT INTO sync(thingName, shadowName, lastSyncedDocument, cloudVersion, cloudDeleted, "
+ "cloudUpdateTime, lastSyncTime, localVersion) SELECT ?, ?, ?, ?, ?, ?, ?, ? "
+ "WHERE NOT EXISTS(SELECT 1 FROM sync WHERE thingName = ? AND shadowName = ?)";
return executeWriteOperation(sql,
return execute(sql,
preparedStatement -> {
preparedStatement.setString(1, request.getThingName());
preparedStatement.setString(2, request.getShadowName());
Expand All @@ -367,7 +364,7 @@ public boolean insertSyncInfoIfNotExists(SyncInformation request) {

}

private <T> T execute(String sql, SQLExecution<T> thunk) {
private synchronized <T> T execute(String sql, SQLExecution<T> thunk) {
try (Connection c = getPool().getConnection();
PreparedStatement statement = c.prepareStatement(sql)) {
statement.setQueryTimeout(10);
Expand All @@ -376,18 +373,5 @@ private <T> T execute(String sql, SQLExecution<T> thunk) {
throw new ShadowManagerDataException(e);
}
}

private <T> T executeWriteOperation(String sql, SQLExecution<T> thunk) {
try {
return database.getDbWriteThreadPool().submit(() ->
execute(sql, thunk)).get(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.atDebug().log("Interrupted while executing the DB write operation");
Thread.currentThread().interrupt();
throw new ShadowManagerDataException(e);
} catch (ExecutionException | TimeoutException e) {
throw new ShadowManagerDataException(e);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -51,7 +49,6 @@ public class ShadowManagerDatabase implements Closeable {

private static final Logger logger = LogManager.getLogger(ShadowManagerDatabase.class);
private final Path databasePath;
private ExecutorService dbWriteThreadPool;
@Getter
private boolean initialized = false;

Expand Down Expand Up @@ -159,24 +156,9 @@ private void deleteDB(Path databasePath) throws IOException {
@SuppressWarnings("PMD.NullAssignment")
@SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR", justification = "Field gated by flag")
public void close() {
if (dbWriteThreadPool != null) {
dbWriteThreadPool.shutdown();
}
if (pool != null) {
pool.dispose();
pool = null;
}
}

/**
* Get the thread pool used for writing to the DB.
*
* @return a running thread pool
*/
public synchronized ExecutorService getDbWriteThreadPool() {
if (dbWriteThreadPool == null || dbWriteThreadPool.isShutdown()) {
dbWriteThreadPool = Executors.newCachedThreadPool();
}
return dbWriteThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public static ShadowDocSizeConfiguration from(Topics serviceTopics) {
private static int getMaxShadowDocSizeFromConfig(Topics topics) {
int newMaxShadowSize = Coerce.toInt(
topics.findOrDefault(DEFAULT_DOCUMENT_SIZE, CONFIGURATION_MAX_DOC_SIZE_LIMIT_B_TOPIC));
if (MAX_SHADOW_DOCUMENT_SIZE < newMaxShadowSize || newMaxShadowSize <= 0) {
if (newMaxShadowSize == 0) {
return DEFAULT_DOCUMENT_SIZE;
}
if (MAX_SHADOW_DOCUMENT_SIZE < newMaxShadowSize || newMaxShadowSize < 0) {
throw new InvalidConfigurationException(String.format(
"Maximum shadow size provided %d is either less than 0 "
+ "or exceeds default maximum shadow size of %d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ boolean isUpdateNecessary(byte[] baseDocument, JsonNode update) throws SkipSyncR
protected Optional<Long> getUpdatedVersion(byte[] payload) {
try {
ShadowDocument document = new ShadowDocument(payload, false);
return Optional.of(document.getVersion());
return Optional.ofNullable(document.getVersion());
} catch (InvalidRequestParametersException | IOException e) {
logger.atDebug()
.kv(LOG_THING_NAME_KEY, getThingName())
Expand Down Expand Up @@ -377,7 +377,9 @@ Optional<ShadowDocument> getCloudShadowDocument() throws RetryableException,
try {
GetThingShadowResponse getThingShadowResponse = context.getIotDataPlaneClientWrapper()
.getThingShadow(getThingName(), getShadowName());
if (getThingShadowResponse != null && getThingShadowResponse.payload() != null) {
// Check asByteArray for null to account for mocking in tests
if (getThingShadowResponse != null && getThingShadowResponse.payload() != null
&& getThingShadowResponse.payload().asByteArray() != null) {
return Optional.of(new ShadowDocument(getThingShadowResponse.payload().asByteArray()));
}
} catch (ResourceNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.util.Pair;
import org.h2.jdbcx.JdbcConnectionPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -29,8 +28,6 @@
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -49,7 +46,6 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith({MockitoExtension.class, GGExtension.class})
Expand All @@ -58,8 +54,6 @@ class ShadowManagerDAOImplTest {
private static final String THING_NAME = "thingName";
private static final String SHADOW_NAME = "shadowName";

private ExecutorService es;

@Mock
private ShadowManagerDatabase mockDatabase;

Expand Down Expand Up @@ -89,17 +83,11 @@ class ShadowManagerDAOImplTest {
@BeforeEach
void setup() throws SQLException, IOException {
when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement);
es = Executors.newCachedThreadPool();
lenient().when(mockDatabase.getDbWriteThreadPool()).thenReturn(es);
when(mockDatabase.getPool()).thenReturn(mockPool);
when(mockPool.getConnection()).thenReturn(mockConnection);
when(mockDatabase.isInitialized()).thenReturn(true);
JsonUtil.loadSchema();
}
@AfterEach
void after() {
es.shutdownNow();
}

private void assertUpdateShadowStatementMocks(long epochNow) {
assertThat(stringArgumentCaptor.getAllValues().size(), is(2));
Expand Down

0 comments on commit 0199edd

Please sign in to comment.