Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fix DataStore Tests #2625

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions aws-datastore/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ apply(from = rootProject.file("configuration/publishing.gradle"))

group = properties["POM_GROUP"].toString()

android {

defaultConfig {
testInstrumentationRunner = "androidx.test.runner.AndroidJUnitRunner"
}

testOptions {
execution = "ANDROIDX_TEST_ORCHESTRATOR"
}
}

dependencies {
implementation(project(":core"))
implementation(project(":aws-core"))
Expand Down Expand Up @@ -57,6 +68,8 @@ dependencies {
androidTestImplementation(libs.rxjava)
androidTestImplementation(libs.okhttp)
androidTestImplementation(libs.oauth2)

androidTestUtil(libs.test.androidx.orchestrator)
}

afterEvaluate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
import com.amplifyframework.datastore.appsync.SynchronousAppSync;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.hub.HubEventFilter;
import com.amplifyframework.logging.AndroidLoggingPlugin;
import com.amplifyframework.logging.LogLevel;
import com.amplifyframework.testmodels.commentsblog.AmplifyModelProvider;
Expand All @@ -53,7 +54,6 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -233,11 +233,21 @@ public void createThenUpdate() throws DataStoreException, ApiException {
BlogOwner updatedRichard = richard.copyOfBuilder()
.name("Richard McClellan")
.build();
String modelName = BlogOwner.class.getSimpleName();

// Expect at least 1 mutation to be published to AppSync.
HubEventFilter hubEventFilter = DataStoreHubEventFilters.filterOutboxEvent(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
model -> {
if (model instanceof BlogOwner) {
BlogOwner published = (BlogOwner) model;
return published.getId().equals(updatedRichard.getId()) &&
published.getName().equals(updatedRichard.getName());
}
return false;
}
);

HubAccumulator richardAccumulator =
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, richard.getId()), 1)
HubAccumulator.create(HubChannel.DATASTORE, hubEventFilter, 1)
.start();

// Create an item, then update it and save it again.
Expand Down Expand Up @@ -319,11 +329,23 @@ public void createThenUpdateDifferentField() throws DataStoreException, ApiExcep
BlogOwner updatedOwner = owner.copyOfBuilder()
.wea("pon")
.build();
String modelName = BlogOwner.class.getSimpleName();

// Expect at least 1 mutation to be published to AppSync.
HubEventFilter hubEventFilter = DataStoreHubEventFilters.filterOutboxEvent(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
model -> {
if (model instanceof BlogOwner) {
BlogOwner published = (BlogOwner) model;
return published.getId().equals(updatedOwner.getId()) &&
published.getWea() != null &&
published.getWea().equals(updatedOwner.getWea());
}
return false;
}
);

// Check for HubEvent on expected final state
HubAccumulator accumulator =
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1)
HubAccumulator.create(HubChannel.DATASTORE, hubEventFilter, 1)
.start();

// Create an item, then update it with different field and save it again.
Expand Down Expand Up @@ -391,7 +413,6 @@ public void createWaitThenUpdateDifferentField() throws DataStoreException, ApiE
* @throws DataStoreException On failure to save or query items from DataStore.
* @throws ApiException On failure to query the API.
*/
@Ignore("Test passes locally but fails inconsistently on CI. Ignoring the test pending further investigation.")
@Test
public void create1ThenCreate2ThenUpdate2() throws DataStoreException, ApiException {
// Setup
Expand All @@ -404,19 +425,31 @@ public void create1ThenCreate2ThenUpdate2() throws DataStoreException, ApiExcept
BlogOwner updatedOwner = anotherOwner.copyOfBuilder()
.wea("pon")
.build();
String modelName = BlogOwner.class.getSimpleName();

// Expect two mutations to be published to AppSync.
HubEventFilter hubEventFilter = DataStoreHubEventFilters.filterOutboxEvent(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
model -> {
if (model instanceof BlogOwner) {
BlogOwner published = (BlogOwner) model;
return published.getId().equals(updatedOwner.getId()) &&
published.getWea() != null &&
published.getWea().equals(updatedOwner.getWea());
}
return false;
}
);

// Verify final state accumulated
HubAccumulator accumulator =
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, anotherOwner.getId()), 2)
HubAccumulator.create(HubChannel.DATASTORE, hubEventFilter, 1)
.start();

// Create an item, then update it with different field and save it again.
dataStore.save(owner);
dataStore.save(anotherOwner);
dataStore.save(updatedOwner);

// Verify that 2 mutations were published.
// Verify that mutations were published.
accumulator.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);

// Verify that the updatedOwner is saved in the DataStore.
Expand All @@ -431,11 +464,11 @@ public void create1ThenCreate2ThenUpdate2() throws DataStoreException, ApiExcept
/**
* Verify that creating a new item, then immediately deleting succeeds.
* @throws DataStoreException On failure to save or query items from DataStore.
* @throws InterruptedException If sleep interrupted
* @throws ApiException On failure to query the API.
*/
@Test
@Ignore("Inconsistent Test. Needs investigation")
public void createThenDelete() throws DataStoreException, ApiException {
public void createThenDelete() throws DataStoreException, InterruptedException {
// Setup
BlogOwner owner = BlogOwner.builder()
.name("Jean")
Expand All @@ -444,6 +477,12 @@ public void createThenDelete() throws DataStoreException, ApiException {
dataStore.save(owner);
dataStore.delete(owner);

// Sleeping isn't ideal here. However, we don't currently have a way to detect if there is
// still a pending event in the outbox. In a current scenario, if the save is being returned
// from appsync, but delete outbox event still pending send, we end up with a momentary
// state where owner re-exists to be quickly removed again
Thread.sleep(2000);

// Verify that the owner is deleted from the local data store.
assertThrows(NoSuchElementException.class, () -> dataStore.get(BlogOwner.class, owner.getId()));
}
Expand Down Expand Up @@ -587,21 +626,46 @@ public void createItemThenUpdateThenWaitThenUpdate() throws DataStoreException,
// Setup
BlogOwner owner = BlogOwner.builder().name("ownerName").build();
BlogOwner updatedOwner = owner.copyOfBuilder().wea("pon").build();
String modelName = BlogOwner.class.getSimpleName();

// Expect at least 1 update (2 is possible)
HubEventFilter hubEventFilter = DataStoreHubEventFilters.filterOutboxEvent(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
model -> {
if (model instanceof BlogOwner) {
BlogOwner published = (BlogOwner) model;
return published.getId().equals(updatedOwner.getId()) &&
published.getWea() != null &&
published.getWea().equals(updatedOwner.getWea());
}
return false;
}
);

// Check for HubEvent on expected final state
HubAccumulator accumulator =
HubAccumulator.create(HubChannel.DATASTORE, publicationOf(modelName, owner.getId()), 1)
HubAccumulator.create(HubChannel.DATASTORE, hubEventFilter, 1)
.start();
// Create new and then immediately update
dataStore.save(owner);
dataStore.save(updatedOwner);
accumulator.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);

// Update the field
BlogOwner diffFieldUpdated = updatedOwner.copyOfBuilder().name("ownerUpdatedName").build();
accumulator = HubAccumulator.create(HubChannel.DATASTORE,
publicationOf(modelName, diffFieldUpdated.getId()), 1).start();

HubEventFilter hubEventFilter2 = DataStoreHubEventFilters.filterOutboxEvent(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
model -> {
if (model instanceof BlogOwner) {
BlogOwner published = (BlogOwner) model;
return published.getId().equals(diffFieldUpdated.getId()) &&
published.getName().equals(diffFieldUpdated.getName());
}
return false;
}
);

accumulator = HubAccumulator.create(HubChannel.DATASTORE,
hubEventFilter2, 1).start();
dataStore.save(diffFieldUpdated);
accumulator.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.datastore

package com.amplifyframework.datastore;

import com.amplifyframework.core.model.Model;
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
import com.amplifyframework.datastore.events.NetworkStatusEvent;
import com.amplifyframework.datastore.syncengine.OutboxMutationEvent;
import com.amplifyframework.hub.HubEventFilter;
import com.amplifyframework.core.model.Model
import com.amplifyframework.datastore.appsync.ModelWithMetadata
import com.amplifyframework.datastore.events.NetworkStatusEvent
import com.amplifyframework.datastore.syncengine.OutboxMutationEvent
import com.amplifyframework.hub.HubEvent
import com.amplifyframework.hub.HubEventFilter

/**
* Utility to create some common filters that can be applied to hub subscriptions.
*/
public final class DataStoreHubEventFilters {
private DataStoreHubEventFilters() {}

object DataStoreHubEventFilters {
/**
* Watches for publication (out of mutation queue) of a given model.
* Creates a filter that catches events from the mutation processor.
Expand All @@ -36,12 +34,13 @@ private DataStoreHubEventFilters() {}
* @param modelId The ID of a model instance that might be published
* @return A filter that watches for publication of the provided model.
*/
public static HubEventFilter publicationOf(String modelName, String modelId) {
@JvmStatic
fun publicationOf(modelName: String, modelId: String): HubEventFilter {
return outboxEventOf(
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
modelName,
modelId
);
DataStoreChannelEventName.OUTBOX_MUTATION_PROCESSED,
modelName,
modelId
)
}

/**
Expand All @@ -53,12 +52,12 @@ public static HubEventFilter publicationOf(String modelName, String modelId) {
* @param modelId The ID of a model instance that might be published
* @return A filter that watches for publication of the provided model.
*/
public static HubEventFilter enqueueOf(String modelName, String modelId) {
fun enqueueOf(modelName: String, modelId: String): HubEventFilter {
return outboxEventOf(
DataStoreChannelEventName.OUTBOX_MUTATION_ENQUEUED,
modelName,
modelId
);
DataStoreChannelEventName.OUTBOX_MUTATION_ENQUEUED,
modelName,
modelId
)
}

/**
Expand All @@ -71,24 +70,42 @@ public static HubEventFilter enqueueOf(String modelName, String modelId) {
* @param modelId The ID of a model instance that might be published
* @return A filter that watches for publication of the provided model.
*/
private static HubEventFilter outboxEventOf(
DataStoreChannelEventName eventType,
String modelName,
String modelId
) {
return event -> {
if (!eventType.toString().equals(event.getName())) {
return false;
private fun outboxEventOf(
eventType: DataStoreChannelEventName,
modelName: String,
modelId: String
): HubEventFilter {
return HubEventFilter { event: HubEvent<*> ->
if (eventType.toString() != event.name) {
return@HubEventFilter false
}
if (!(event.getData() instanceof OutboxMutationEvent)) {
return false;
if (event.data !is OutboxMutationEvent<*>) {
return@HubEventFilter false
}
OutboxMutationEvent<? extends Model> outboxMutationEvent =
(OutboxMutationEvent<? extends Model>) event.getData();
val outboxMutationEvent = event.data as OutboxMutationEvent<out Model>
modelId == outboxMutationEvent.element.model
.primaryKeyString && modelName == outboxMutationEvent.modelName
}
}

return modelId.equals(outboxMutationEvent.getElement().getModel().getPrimaryKeyString()) &&
modelName.equals(outboxMutationEvent.getModelName());
};
@JvmStatic
fun <T : Model> filterOutboxEvent(
eventType: DataStoreChannelEventName,
filter: (model: Model) -> Boolean
): HubEventFilter {
return HubEventFilter { event: HubEvent<*> ->
if (eventType.toString() != event.name) {
return@HubEventFilter false
}
if (event.data !is OutboxMutationEvent<*>) {
return@HubEventFilter false
}
val outboxMutationEvent = event.data as OutboxMutationEvent<out Model>
val model = outboxMutationEvent.element.model
return@HubEventFilter (model)?.let {
filter(it)
} ?: false
}
}

/**
Expand All @@ -99,35 +116,35 @@ private static HubEventFilter outboxEventOf(
* @param modelId ID of the model instance that may be received
* @return A filter that watches for receive of the provided model
*/
public static HubEventFilter receiptOf(String modelId) {
return event -> {
if (!DataStoreChannelEventName.SUBSCRIPTION_DATA_PROCESSED.toString().equals(event.getName())) {
return false;
@JvmStatic
fun receiptOf(modelId: String): HubEventFilter {
return HubEventFilter { event: HubEvent<*> ->
if (DataStoreChannelEventName.SUBSCRIPTION_DATA_PROCESSED.toString() != event.name) {
return@HubEventFilter false
}
if (!(event.getData() instanceof ModelWithMetadata)) {
return false;
if (event.data !is ModelWithMetadata<*>) {
return@HubEventFilter false
}
ModelWithMetadata<? extends Model> modelWithMetadata =
(ModelWithMetadata<? extends Model>) event.getData();
return modelId.equals(modelWithMetadata.getModel().resolveIdentifier());
};
val modelWithMetadata = event.data as ModelWithMetadata<out Model>
modelId == modelWithMetadata.model.resolveIdentifier()
}
}

/**
* Expect a network status failure event to be emitted by the sync engione.
* @return A filter that checks for network failure messages.
*/
public static HubEventFilter networkStatusFailure() {
return event -> {
if (!DataStoreChannelEventName.NETWORK_STATUS.toString().equals(event.getName())) {
return false;
@JvmStatic
fun networkStatusFailure(): HubEventFilter {
return HubEventFilter { event: HubEvent<*> ->
if (DataStoreChannelEventName.NETWORK_STATUS.toString() != event.name) {
return@HubEventFilter false
}
if (!(event.getData() instanceof NetworkStatusEvent)) {
return false;
if (event.data !is NetworkStatusEvent) {
return@HubEventFilter false
}
NetworkStatusEvent outboxMutationEvent = (NetworkStatusEvent) event.getData();

return !outboxMutationEvent.getActive();
};
val outboxMutationEvent = event.data as NetworkStatusEvent?
!outboxMutationEvent!!.active
}
}
}
Loading
Loading