|
4 | 4 | import com.github.cloudyrock.mongock.ChangeSet;
|
5 | 5 | import com.github.cloudyrock.mongock.driver.mongodb.springdata.v4.decorator.impl.MongockTemplate;
|
6 | 6 | import com.github.f4b6a3.uuid.UuidCreator;
|
| 7 | +import com.mongodb.client.MongoCollection; |
| 8 | +import com.mongodb.client.MongoCursor; |
7 | 9 | import com.mongodb.client.result.DeleteResult;
|
8 | 10 | import lombok.extern.slf4j.Slf4j;
|
9 | 11 | import org.bson.Document;
|
@@ -315,47 +317,80 @@ private int getMongoDBVersion(MongockTemplate mongoTemplate) {
|
315 | 317 | @ChangeSet(order = "026", id = "add-time-series-snapshot-history", author = "")
|
316 | 318 | public void addTimeSeriesSnapshotHistory(MongockTemplate mongoTemplate, CommonConfig commonConfig) {
|
317 | 319 | int mongoVersion = getMongoDBVersion(mongoTemplate);
|
318 |
| - if (mongoVersion < 5) { |
319 |
| - log.warn("MongoDB version is below 5. Time-series collections are not supported. Upgrade the MongoDB version."); |
320 |
| - } |
321 |
| - |
322 |
| - // Create the time-series collection if it doesn't exist |
323 |
| - if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
324 |
| - if (mongoVersion < 5) { |
325 |
| - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); |
326 |
| - } else { |
327 |
| - mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, CollectionOptions.empty().timeSeries("createdAt")); |
328 |
| - } |
329 |
| - } |
330 | 320 |
|
331 | 321 | Instant thresholdDate = Instant.now().minus(commonConfig.getQuery().getAppSnapshotKeepDuration(), ChronoUnit.DAYS);
|
332 | 322 |
|
333 |
| - // Use aggregation to move and transform data |
334 |
| - Document match = new Document("$match", |
335 |
| - new Document("createdAt", new Document("$gte", thresholdDate))); |
| 323 | + if (mongoVersion >= 5) { |
| 324 | + // MongoDB version >= 5: Use manual insert query |
| 325 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 326 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class, |
| 327 | + CollectionOptions.empty().timeSeries("createdAt")); |
| 328 | + } |
336 | 329 |
|
337 |
| - Document project = new Document("$project", new Document() |
338 |
| - .append("applicationId", 1) |
339 |
| - .append("dsl", 1) |
340 |
| - .append("context", 1) |
341 |
| - .append("createdAt", 1) |
342 |
| - .append("createdBy", 1) |
343 |
| - .append("modifiedBy", 1) |
344 |
| - .append("updatedAt", 1) |
345 |
| - .append("id", "$_id")); // Map MongoDB's default `_id` to `id` if needed. |
| 330 | + // Aggregation pipeline to fetch the records |
| 331 | + List<Document> aggregationPipeline = Arrays.asList( |
| 332 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 333 | + new Document("$project", new Document() |
| 334 | + .append("applicationId", 1) |
| 335 | + .append("dsl", 1) |
| 336 | + .append("context", 1) |
| 337 | + .append("createdAt", 1) |
| 338 | + .append("createdBy", 1) |
| 339 | + .append("modifiedBy", 1) |
| 340 | + .append("updatedAt", 1) |
| 341 | + .append("id", "$_id")) // Map `_id` to `id` if needed |
| 342 | + ); |
| 343 | + |
| 344 | + MongoCollection<Document> sourceCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshot"); |
| 345 | + MongoCollection<Document> targetCollection = mongoTemplate.getDb().getCollection("applicationHistorySnapshotTS"); |
| 346 | + |
| 347 | + // Fetch results and insert them into the time-series collection |
| 348 | + try (MongoCursor<Document> cursor = sourceCollection.aggregate(aggregationPipeline).iterator()) { |
| 349 | + while (cursor.hasNext()) { |
| 350 | + Document document = cursor.next(); |
| 351 | + targetCollection.insertOne(document); // Insert into the time-series collection |
| 352 | + } |
| 353 | + } |
346 | 354 |
|
347 |
| - Document out = new Document("$out", "applicationHistorySnapshotTS"); // Target collection name |
| 355 | + // Delete the migrated records |
| 356 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 357 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
348 | 358 |
|
349 |
| - // Execute the aggregation pipeline |
350 |
| - mongoTemplate.getDb() |
351 |
| - .getCollection("applicationHistorySnapshot") // Original collection name |
352 |
| - .aggregate(Arrays.asList(match, project, out)) |
353 |
| - .toCollection(); |
| 359 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
| 360 | + } else { |
| 361 | + // MongoDB version < 5: Use aggregation with $out |
| 362 | + if (!mongoTemplate.collectionExists(ApplicationHistorySnapshotTS.class)) { |
| 363 | + mongoTemplate.createCollection(ApplicationHistorySnapshotTS.class); // Create a regular collection |
| 364 | + } |
354 | 365 |
|
355 |
| - // Delete the migrated records |
356 |
| - Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
357 |
| - DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 366 | + // Aggregation pipeline with $out |
| 367 | + List<Document> aggregationPipeline = Arrays.asList( |
| 368 | + new Document("$match", new Document("createdAt", new Document("$gte", thresholdDate))), |
| 369 | + new Document("$project", new Document() |
| 370 | + .append("applicationId", 1) |
| 371 | + .append("dsl", 1) |
| 372 | + .append("context", 1) |
| 373 | + .append("createdAt", 1) |
| 374 | + .append("createdBy", 1) |
| 375 | + .append("modifiedBy", 1) |
| 376 | + .append("updatedAt", 1) |
| 377 | + .append("id", "$_id")), // Map `_id` to `id` if needed |
| 378 | + new Document("$out", "applicationHistorySnapshotTS") // Write directly to the target collection |
| 379 | + ); |
| 380 | + |
| 381 | + mongoTemplate.getDb() |
| 382 | + .getCollection("applicationHistorySnapshot") |
| 383 | + .aggregate(aggregationPipeline) |
| 384 | + .toCollection(); |
| 385 | + |
| 386 | + // Delete the migrated records |
| 387 | + Query deleteQuery = new Query(Criteria.where("createdAt").gte(thresholdDate)); |
| 388 | + DeleteResult deleteResult = mongoTemplate.remove(deleteQuery, ApplicationHistorySnapshot.class); |
| 389 | + |
| 390 | + log.info("Deleted {} records from the source collection.", deleteResult.getDeletedCount()); |
| 391 | + } |
358 | 392 |
|
| 393 | + // Ensure indexes on the new collection |
359 | 394 | ensureIndexes(mongoTemplate, ApplicationHistorySnapshotTS.class,
|
360 | 395 | makeIndex("applicationId"),
|
361 | 396 | makeIndex("createdAt"));
|
|
0 commit comments