|
4 | 4 | import com.github.cloudyrock.mongock.ChangeSet;
|
5 | 5 | import com.github.cloudyrock.mongock.driver.mongodb.springdata.v4.decorator.impl.MongockTemplate;
|
6 | 6 | import com.github.f4b6a3.uuid.UuidCreator;
|
| 7 | +import com.mongodb.client.MongoCollection; |
| 8 | +import com.mongodb.client.MongoCursor; |
| 9 | +import com.mongodb.client.result.DeleteResult; |
7 | 10 | import lombok.extern.slf4j.Slf4j;
|
8 | 11 | import org.bson.Document;
|
9 | 12 | import org.lowcoder.domain.application.model.Application;
|
|
44 | 47 |
|
45 | 48 | import java.time.Instant;
|
46 | 49 | import java.time.temporal.ChronoUnit;
|
| 50 | +import java.util.Arrays; |
47 | 51 | import java.util.List;
|
48 | 52 | import java.util.Set;
|
49 | 53 |
|
@@ -313,41 +317,86 @@ private int getMongoDBVersion(MongockTemplate mongoTemplate) {
|
313 | 317 | @ChangeSet(order = "026", id = "add-time-series-snapshot-history", author = "")
|
314 | 318 | public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonConfig commonConfig) {
|
315 | 319 | int mongoVersion = getMongoDBVersion(mongoTemplate);
|
316 |
| - if (mongoVersion < 5) { |
317 |
| - log.warn("MongoDB version is below 5. Time-series collections are not supported. Upgrade the MongoDB version."); |
318 |
| - } |
319 | 320 |
|
320 |
| - // Create the time-series collection if it doesn't exist |
321 |
| - if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
322 |
| - if(mongoVersion < 5) { |
323 |
| - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); |
324 |
| - } else { |
325 |
| - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, CollectionOptions.empty().timeSeries("createdAt")); |
| 321 | + Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); |
| 322 | + |
| 323 | + if (mongoVersion >= 5) { |
| 324 | + // MongoDB version >= 5: Use manual insert query |
| 325 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 326 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, |
| 327 | + CollectionOptions.empty().timeSeries("createdAt")); |
326 | 328 | }
|
| 329 | + |
| 330 | + // Aggregation pipeline to fetch the records |
| 331 | + List<Document> aggregationPipeline = Arrays.asList( |
| 332 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 333 | + new Document("$project", new Document() |
| 334 | + .append("applicationId", 1) |
| 335 | + .append("dsl", 1) |
| 336 | + .append("context", 1) |
| 337 | + .append("createdAt", 1) |
| 338 | + .append("createdBy", 1) |
| 339 | + .append("modifiedBy", 1) |
| 340 | + .append("updatedAt", 1) |
| 341 | + .append("id", "$_id")) // Map `_id` to `id` if needed |
| 342 | + ); |
| 343 | + |
| 344 | + MongoCollection<Document> sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshot"); |
| 345 | + MongoCollection<Document> targetCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS"); |
| 346 | + |
| 347 | + // Fetch results and insert them into the time-series collection |
| 348 | + try (MongoCursor<Document> cursor = sourceCollection.aggregate(aggregationPipeline).iterator()) { |
| 349 | + while (cursor.hasNext()) { |
| 350 | + Document document = cursor.next(); |
| 351 | + targetCollection.insertOne(document); // Insert into the time-series collection |
| 352 | + } |
| 353 | + } |
| 354 | + |
| 355 | + // Delete the migrated records |
| 356 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 357 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 358 | + |
| 359 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
| 360 | + } else { |
| 361 | + // MongoDB version < 5: Use aggregation with $out |
| 362 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 363 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); // Create a regular collection |
| 364 | + } |
| 365 | + |
| 366 | + // Aggregation pipeline with $out |
| 367 | + List<Document> aggregationPipeline = Arrays.asList( |
| 368 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 369 | + new Document("$project", new Document() |
| 370 | + .append("applicationId", 1) |
| 371 | + .append("dsl", 1) |
| 372 | + .append("context", 1) |
| 373 | + .append("createdAt", 1) |
| 374 | + .append("createdBy", 1) |
| 375 | + .append("modifiedBy", 1) |
| 376 | + .append("updatedAt", 1) |
| 377 | + .append("id", "$_id")), // Map `_id` to `id` if needed |
| 378 | + new Document("$out", "applicationHistorySnapshotTS") // Write directly to the target collection |
| 379 | + ); |
| 380 | + |
| 381 | + mongoTemplate.getDb() |
| 382 | + .getCollection("applicationHistorySnapshot") |
| 383 | + .aggregate(aggregationPipeline) |
| 384 | + .toCollection(); |
| 385 | + |
| 386 | + // Delete the migrated records |
| 387 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 388 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 389 | + |
| 390 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
327 | 391 | }
|
328 |
| - Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS); |
329 |
| - List<ApplicationHistorySnapshot> snapshots = mongoTemplate.find(new Query().addCriteria(Criteria.where("createdAt").gte(thresholdDate)), ApplicationHistorySnapshot.class); |
330 |
| - snapshots.forEach(snapshot -> { |
331 |
| - ApplicationHistorySnapshotTS applicationHistorySnapshotTS = new ApplicationHistorySnapshotTS(); |
332 |
| - applicationHistorySnapshotTS.setApplicationId(snapshot.getApplicationId()); |
333 |
| - applicationHistorySnapshotTS.setDsl(snapshot.getDsl()); |
334 |
| - applicationHistorySnapshotTS.setContext(snapshot.getContext()); |
335 |
| - applicationHistorySnapshotTS.setCreatedAt(snapshot.getCreatedAt()); |
336 |
| - applicationHistorySnapshotTS.setCreatedBy(snapshot.getCreatedBy()); |
337 |
| - applicationHistorySnapshotTS.setModifiedBy(snapshot.getModifiedBy()); |
338 |
| - applicationHistorySnapshotTS.setUpdatedAt(snapshot.getUpdatedAt()); |
339 |
| - applicationHistorySnapshotTS.setId(snapshot.getId()); |
340 |
| - mongoTemplate.insert(applicationHistorySnapshotTS); |
341 |
| - mongoTemplate.remove(snapshot); |
342 |
| - }); |
343 | 392 |
|
344 |
| - // Ensure indexes if needed |
| 393 | + // Ensure indexes on the new collection |
345 | 394 | ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class,
|
346 | 395 | makeIndex("applicationId"),
|
347 |
| - makeIndex("createdAt") |
348 |
| - ); |
| 396 | + makeIndex("createdAt")); |
349 | 397 | }
|
350 | 398 |
|
| 399 | + |
351 | 400 | private void addGidField(MongockTemplate mongoTemplate, String collectionName) {
|
352 | 401 | // Create a query to match all documents
|
353 | 402 | Query query = new Query();
|
|
0 commit comments