diff --git a/src/main/java/liquibase/ext/mongodb/statement/AbstractRunCommandStatement.java b/src/main/java/liquibase/ext/mongodb/statement/AbstractRunCommandStatement.java index d1523137..d549a9b7 100644 --- a/src/main/java/liquibase/ext/mongodb/statement/AbstractRunCommandStatement.java +++ b/src/main/java/liquibase/ext/mongodb/statement/AbstractRunCommandStatement.java @@ -22,7 +22,10 @@ import com.mongodb.MongoException; import com.mongodb.client.MongoDatabase; +import liquibase.Scope; +import liquibase.executor.jvm.JdbcExecutor; import liquibase.ext.mongodb.database.MongoLiquibaseDatabase; +import liquibase.nosql.executor.NoSqlExecutor; import liquibase.nosql.statement.NoSqlExecuteStatement; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; @@ -30,8 +33,10 @@ import org.bson.Document; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Objects.nonNull; +import static liquibase.executor.jvm.JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY; @AllArgsConstructor @EqualsAndHashCode(callSuper = true) @@ -43,12 +48,17 @@ public abstract class AbstractRunCommandStatement extends AbstractMongoStatement public static final String OK = "ok"; public static final String WRITE_ERRORS = "writeErrors"; + public static final String N = "n"; + public static final String N_MODIFIED = "nModified"; + public static final String N_REMOVED = "nRemoved"; + @Getter protected final Document command; @Override public void execute(final MongoLiquibaseDatabase database) { - run(database); + Document response = run(database); + updateRowsAffected(response); } public Document run(final MongoLiquibaseDatabase database) { @@ -74,16 +84,50 @@ public Document run(final MongoDatabase mongoDatabase) { * Check the response and throw an appropriate exception if the command was not successful */ protected void checkResponse(final Document responseDocument) throws MongoException { - final Double ok = responseDocument.get(OK) instanceof Integer ? (double) responseDocument.getInteger(OK) : - responseDocument.getDouble(OK); + final double ok = responseDocument.get(OK) instanceof Integer + ? (double) responseDocument.getInteger(OK) + : responseDocument.getDouble(OK); + final List writeErrors = responseDocument.getList(WRITE_ERRORS, Document.class); - if (nonNull(ok) && !ok.equals(1.0d) - || nonNull(writeErrors) && !writeErrors.isEmpty()) { + if (ok != 1.0d || nonNull(writeErrors) && !writeErrors.isEmpty()) { throw new MongoException("Command failed. The full response is " + responseDocument.toJson()); } } + protected void updateRowsAffected(Document response) { + int affectedCount = extractAffectedCount(response); + if (affectedCount <= 0) { + return; + } + + AtomicInteger scopeRowsAffected = Scope.getCurrentScope().get(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, AtomicInteger.class); + if (scopeRowsAffected == null) { + scopeRowsAffected = NoSqlExecutor.GLOBAL_ROWS_AFFECTED; + } + Boolean shouldUpdate = Scope.getCurrentScope().get(SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, Boolean.TRUE); + if (Boolean.TRUE.equals(shouldUpdate)) { + scopeRowsAffected.addAndGet(affectedCount); + Scope.getCurrentScope().getLog(getClass()).fine("Added " + affectedCount + " to ROWS_AFFECTED_SCOPE_KEY; new total=" + scopeRowsAffected.get()); + } + } + + protected int extractAffectedCount(Document response) { + if (response.containsKey(N)) { + return response.getInteger(N, 0); + } + // Then 'nModified' (some updates) + if (response.containsKey(N_MODIFIED)) { + return response.getInteger(N_MODIFIED, 0); + } + // Then 'nRemoved' + if (response.containsKey(N_REMOVED)) { + return response.getInteger(N_REMOVED, 0); + } + // or 0 for commands that do not return anything about affected docs + return 0; + } + @Override public String getCommandName() { return COMMAND_NAME; @@ -105,4 +149,4 @@ public String toJs() { + BsonUtils.toJson(command) + ");"; } -} +} \ No newline at end of file diff --git a/src/main/java/liquibase/nosql/executor/NoSqlExecutor.java b/src/main/java/liquibase/nosql/executor/NoSqlExecutor.java index 5e2e4dee..8f41cd1a 100644 --- a/src/main/java/liquibase/nosql/executor/NoSqlExecutor.java +++ b/src/main/java/liquibase/nosql/executor/NoSqlExecutor.java @@ -26,6 +26,7 @@ import liquibase.database.Database; import liquibase.exception.DatabaseException; import liquibase.executor.AbstractExecutor; +import liquibase.executor.jvm.JdbcExecutor; import liquibase.ext.mongodb.changelog.MongoHistoryService; import liquibase.ext.mongodb.database.MongoLiquibaseDatabase; import liquibase.logging.Logger; @@ -40,6 +41,7 @@ import liquibase.statement.core.UpdateStatement; import lombok.NoArgsConstructor; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -51,6 +53,8 @@ @NoArgsConstructor public class NoSqlExecutor extends AbstractExecutor { + public static final AtomicInteger GLOBAL_ROWS_AFFECTED = new AtomicInteger(0); + public static final String EXECUTOR_NAME = "jdbc"; private final Logger log = Scope.getCurrentScope().getLog(getClass()); @@ -166,7 +170,7 @@ public List queryForList(final SqlStatement sql, final Class elementType * @throws DatabaseException in case of a failure */ public void execute(final UpdateStatement updateStatement) throws DatabaseException { - if(updateStatement.getNewColumnValues().containsKey("MD5SUM") + if (updateStatement.getNewColumnValues().containsKey("MD5SUM") && updateStatement.getNewColumnValues().get("MD5SUM") == null) { try { Scope.getCurrentScope().getSingleton(ChangeLogHistoryServiceFactory.class) @@ -181,8 +185,22 @@ public void execute(final UpdateStatement updateStatement) throws DatabaseExcept @Override public void execute(final SqlStatement sql) throws DatabaseException { - disableRowAffectedMessage(); - this.execute(sql, emptyList()); + try { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, GLOBAL_ROWS_AFFECTED); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + this.execute(sql, emptyList()); + return null; + }); + + } catch (Exception e) { + if (e instanceof DatabaseException) { + throw (DatabaseException) e; + } + throw new DatabaseException(e); + } } @Override @@ -199,7 +217,7 @@ public void execute(final SqlStatement sql, final List sqlVisitors) ChangeLogHistoryService changeLogHistoryService = Scope.getCurrentScope().getSingleton(ChangeLogHistoryServiceFactory.class) .getChangeLogService(getDatabase()); if (changeLogHistoryService instanceof MongoHistoryService) { - ((MongoHistoryService)changeLogHistoryService).updateCheckSum(((UpdateChangeSetChecksumStatement) sql).getChangeSet()); + ((MongoHistoryService) changeLogHistoryService).updateCheckSum(((UpdateChangeSetChecksumStatement) sql).getChangeSet()); } else { throw new DatabaseException("Could not execute as we are not in a MongoDB"); } diff --git a/src/test/java/liquibase/ext/mongodb/statement/RowsAffectedStatementIT.java b/src/test/java/liquibase/ext/mongodb/statement/RowsAffectedStatementIT.java new file mode 100644 index 00000000..760f9ad9 --- /dev/null +++ b/src/test/java/liquibase/ext/mongodb/statement/RowsAffectedStatementIT.java @@ -0,0 +1,146 @@ +package liquibase.ext.mongodb.statement; + +import liquibase.Scope; +import liquibase.executor.jvm.JdbcExecutor; +import liquibase.ext.AbstractMongoIntegrationTest; +import lombok.SneakyThrows; +import org.bson.Document; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static liquibase.ext.mongodb.TestUtils.COLLECTION_NAME_1; +import static org.assertj.core.api.Assertions.assertThat; + +class RowsAffectedStatementIT extends AbstractMongoIntegrationTest { + + private String collectionName; + private AtomicInteger rowsAffected; + + @BeforeEach + public void setUp() throws Exception { + collectionName = COLLECTION_NAME_1 + System.nanoTime(); + rowsAffected = new AtomicInteger(0); + + // Setup scope with rowsAffected counter + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + // The test methods will run within this scope + Scope.child(scopeValues, () -> { + super.setUpEach(); + collectionName = COLLECTION_NAME_1 + System.nanoTime(); + }); + } + + @Override + @SneakyThrows + protected void tearDownEach() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + executor.execute(new DropAllCollectionsStatement()); + connection.close(); + }); + } + + @Test + @SneakyThrows + void shouldTrackRowsAffectedForCreateCollection() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName); + createStatement.execute(database); + assertThat(rowsAffected.get()).isEqualTo(1); + }); + } + + @Test + @SneakyThrows + void shouldTrackRowsAffectedForInsertOne() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName); + createStatement.execute(database); + rowsAffected.set(0); + + Document doc = new Document("test", "value"); + final InsertOneStatement insertStatement = new InsertOneStatement(collectionName, doc); + insertStatement.execute(database); + + assertThat(rowsAffected.get()).isEqualTo(1); + }); + } + + @Test + @SneakyThrows + void shouldTrackRowsAffectedForInsertMany() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName); + createStatement.execute(database); + rowsAffected.set(0); + + Document doc1 = new Document("test", "value1"); + Document doc2 = new Document("test", "value2"); + final InsertManyStatement insertStatement = new InsertManyStatement(collectionName, Arrays.asList(doc1, doc2)); + insertStatement.execute(database); + + assertThat(rowsAffected.get()).isEqualTo(2); + }); + } + + @Test + @SneakyThrows + void shouldTrackRowsAffectedForDropCollection() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName); + createStatement.execute(database); + rowsAffected.set(0); + + final DropCollectionStatement dropStatement = new DropCollectionStatement(collectionName); + dropStatement.execute(database); + + assertThat(rowsAffected.get()).isEqualTo(1); + }); + } + + @Test + @SneakyThrows + void shouldTrackTotalRowsAffectedForMultipleOperations() { + Map scopeValues = new HashMap<>(); + scopeValues.put(JdbcExecutor.ROWS_AFFECTED_SCOPE_KEY, rowsAffected); + scopeValues.put(JdbcExecutor.SHOULD_UPDATE_ROWS_AFFECTED_SCOPE_KEY, true); + + Scope.child(scopeValues, () -> { + final CreateCollectionStatement createStatement = new CreateCollectionStatement(collectionName); + createStatement.execute(database); + + Document doc1 = new Document("test", "value1"); + Document doc2 = new Document("test", "value2"); + final InsertManyStatement insertStatement = new InsertManyStatement(collectionName, Arrays.asList(doc1, doc2)); + insertStatement.execute(database); + assertThat(rowsAffected.get()).isEqualTo(3); // 1 to create + 2 to insert + }); + } +} \ No newline at end of file