diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java index fa4f527acc0b..23d73b285dea 100644 --- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java @@ -87,6 +87,7 @@ public class Snapshot implements Serializable { protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount"; protected static final String FIELD_WATERMARK = "watermark"; protected static final String FIELD_STATISTICS = "statistics"; + protected static final String FIELD_PROPERTIES = "properties"; // version of snapshot // null for paimon <= 0.2 @@ -195,6 +196,13 @@ public class Snapshot implements Serializable { @Nullable protected final String statistics; + // properties + // null for paimon <= 1.1 or empty properties + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_PROPERTIES) + @Nullable + protected final Map properties; + public Snapshot( long id, long schemaId, @@ -214,7 +222,8 @@ public Snapshot( @Nullable Long deltaRecordCount, @Nullable Long changelogRecordCount, @Nullable Long watermark, - @Nullable String statistics) { + @Nullable String statistics, + @Nullable Map properties) { this( CURRENT_VERSION, id, @@ -235,7 +244,8 @@ public Snapshot( deltaRecordCount, changelogRecordCount, watermark, - statistics); + statistics, + properties); } @JsonCreator @@ -260,7 +270,8 @@ public Snapshot( @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, - @JsonProperty(FIELD_STATISTICS) @Nullable String statistics) { + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_PROPERTIES) @Nullable Map properties) { this.version = version; this.id = id; this.schemaId = schemaId; @@ -281,6 +292,7 @@ public Snapshot( this.changelogRecordCount = changelogRecordCount; this.watermark = watermark; this.statistics = statistics; + this.properties = properties; } @JsonGetter(FIELD_VERSION) @@ -395,6 +407,12 @@ public String statistics() { return statistics; } + @JsonGetter(FIELD_PROPERTIES) + @Nullable + public Map properties() { + return properties; + } + public String toJson() { return JsonSerdeUtil.toJson(this); } @@ -421,7 +439,8 @@ public int hashCode() { deltaRecordCount, changelogRecordCount, watermark, - statistics); + statistics, + properties); } @Override @@ -452,7 +471,8 @@ public boolean equals(Object o) { && Objects.equals(deltaRecordCount, that.deltaRecordCount) && Objects.equals(changelogRecordCount, that.changelogRecordCount) && Objects.equals(watermark, that.watermark) - && Objects.equals(statistics, that.statistics); + && Objects.equals(statistics, that.statistics) + && Objects.equals(properties, that.properties); } /** Type of changes in this snapshot. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 672c57a21af7..c0865cdd3456 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -62,7 +62,8 @@ public Changelog(Snapshot snapshot) { snapshot.deltaRecordCount(), snapshot.changelogRecordCount(), snapshot.watermark(), - snapshot.statistics()); + snapshot.statistics(), + snapshot.properties); } @JsonCreator @@ -87,7 +88,8 @@ public Changelog( @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, - @JsonProperty(FIELD_STATISTICS) @Nullable String statistics) { + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_PROPERTIES) Map properties) { super( version, id, @@ -108,7 +110,8 @@ public Changelog( deltaRecordCount, changelogRecordCount, watermark, - statistics); + statistics, + properties); } public static Changelog fromJson(String json) { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java index b4abd0e9ec0e..e8e12a668d25 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java @@ -34,6 +34,7 @@ public class ManifestCommittable { private final long identifier; @Nullable private final Long watermark; private final Map logOffsets; + private final Map properties; private final List commitMessages; public ManifestCommittable(long identifier) { @@ -45,6 +46,7 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) { this.watermark = watermark; this.logOffsets = new HashMap<>(); this.commitMessages = new ArrayList<>(); + this.properties = new HashMap<>(); } public ManifestCommittable( @@ -52,10 +54,20 @@ public ManifestCommittable( @Nullable Long watermark, Map logOffsets, List commitMessages) { + this(identifier, watermark, logOffsets, commitMessages, new HashMap<>()); + } + + public ManifestCommittable( + long identifier, + @Nullable Long watermark, + Map logOffsets, + List commitMessages, + Map properties) { this.identifier = identifier; this.watermark = watermark; this.logOffsets = logOffsets; this.commitMessages = commitMessages; + this.properties = properties; } public void addFileCommittable(CommitMessage commitMessage) { @@ -72,6 +84,10 @@ public void addLogOffset(int bucket, long offset, boolean allowDuplicate) { logOffsets.put(bucket, newOffset); } + public void addProperty(String key, String value) { + properties.put(key, value); + } + public long identifier() { return identifier; } @@ -89,6 +105,10 @@ public List fileCommittables() { return commitMessages; } + public Map properties() { + return properties; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -101,12 +121,13 @@ public boolean equals(Object o) { return Objects.equals(identifier, that.identifier) && Objects.equals(watermark, that.watermark) && Objects.equals(logOffsets, that.logOffsets) - && Objects.equals(commitMessages, that.commitMessages); + && Objects.equals(commitMessages, that.commitMessages) + && Objects.equals(properties, that.properties); } @Override public int hashCode() { - return Objects.hash(identifier, watermark, logOffsets, commitMessages); + return Objects.hash(identifier, watermark, logOffsets, commitMessages, properties); } @Override @@ -116,7 +137,8 @@ public String toString() { + "identifier = %s, " + "watermark = %s, " + "logOffsets = %s, " - + "commitMessages = %s", - identifier, watermark, logOffsets, commitMessages); + + "commitMessages = %s, " + + "properties = %s}", + identifier, watermark, logOffsets, commitMessages, properties); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java index c0ae59017987..67845099e424 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java @@ -34,7 +34,7 @@ /** {@link VersionedSerializer} for {@link ManifestCommittable}. */ public class ManifestCommittableSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 3; + private static final int CURRENT_VERSION = 4; private final CommitMessageSerializer commitMessageSerializer; @@ -62,6 +62,7 @@ public byte[] serialize(ManifestCommittable obj) throws IOException { view.writeLong(watermark); } serializeOffsets(view, obj.logOffsets()); + serializeProperties(view, obj.properties()); view.writeInt(commitMessageSerializer.getVersion()); commitMessageSerializer.serializeList(obj.fileCommittables(), view); return out.toByteArray(); @@ -76,6 +77,15 @@ private void serializeOffsets(DataOutputViewStreamWrapper view, Map properties) throws IOException { + view.writeInt(properties.size()); + for (Map.Entry entry : properties.entrySet()) { + view.writeUTF(entry.getKey()); + view.writeUTF(entry.getValue()); + } + } + @Override public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException { if (version > CURRENT_VERSION) { @@ -91,6 +101,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO long identifier = view.readLong(); Long watermark = view.readBoolean() ? null : view.readLong(); Map offsets = deserializeOffsets(view); + Map properties = + version == CURRENT_VERSION ? deserializeProperties(view) : new HashMap<>(); int fileCommittableSerializerVersion = view.readInt(); List fileCommittables; try { @@ -116,7 +128,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO fileCommittables = legacyV2CommitMessageSerializer.deserializeList(view); } - return new ManifestCommittable(identifier, watermark, offsets, fileCommittables); + return new ManifestCommittable( + identifier, watermark, offsets, fileCommittables, properties); } private Map deserializeOffsets(DataInputDeserializer view) throws IOException { @@ -127,4 +140,14 @@ private Map deserializeOffsets(DataInputDeserializer view) throws } return offsets; } + + private Map deserializeProperties(DataInputDeserializer view) + throws IOException { + int size = view.readInt(); + Map properties = new HashMap<>(size); + for (int i = 0; i < size; i++) { + properties.put(view.readUTF(), view.readUTF()); + } + return properties; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 852776d2feac..a8084eb32501 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -331,6 +331,7 @@ public void commit( committable.identifier(), committable.watermark(), committable.logOffsets(), + committable.properties(), Snapshot.CommitKind.APPEND, noConflictCheck(), null); @@ -365,6 +366,7 @@ public void commit( committable.identifier(), committable.watermark(), committable.logOffsets(), + committable.properties(), Snapshot.CommitKind.COMPACT, hasConflictChecked(safeLatestSnapshotId), null); @@ -502,7 +504,8 @@ public void overwrite( appendHashIndexFiles, committable.identifier(), committable.watermark(), - committable.logOffsets()); + committable.logOffsets(), + committable.properties()); generatedSnapshot += 1; } @@ -515,6 +518,7 @@ public void overwrite( committable.identifier(), committable.watermark(), committable.logOffsets(), + committable.properties(), Snapshot.CommitKind.COMPACT, mustConflictCheck(), null); @@ -568,12 +572,25 @@ public void dropPartitions(List> partitions, long commitIden } tryOverwrite( - partitionFilter, emptyList(), emptyList(), commitIdentifier, null, new HashMap<>()); + partitionFilter, + emptyList(), + emptyList(), + commitIdentifier, + null, + new HashMap<>(), + new HashMap<>()); } @Override public void truncateTable(long commitIdentifier) { - tryOverwrite(null, emptyList(), emptyList(), commitIdentifier, null, new HashMap<>()); + tryOverwrite( + null, + emptyList(), + emptyList(), + commitIdentifier, + null, + new HashMap<>(), + new HashMap<>()); } @Override @@ -610,6 +627,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) { commitIdentifier, null, Collections.emptyMap(), + Collections.emptyMap(), Snapshot.CommitKind.ANALYZE, noConflictCheck(), statsFileName); @@ -752,6 +770,7 @@ private int tryCommit( long identifier, @Nullable Long watermark, Map logOffsets, + Map properties, Snapshot.CommitKind commitKind, ConflictCheck conflictCheck, @Nullable String statsFileName) { @@ -769,6 +788,7 @@ private int tryCommit( identifier, watermark, logOffsets, + properties, commitKind, latestSnapshot, conflictCheck, @@ -800,7 +820,8 @@ private int tryOverwrite( List indexFiles, long identifier, @Nullable Long watermark, - Map logOffsets) { + Map logOffsets, + Map properties) { // collect all files with overwrite Snapshot latestSnapshot = snapshotManager.latestSnapshot(); List changesWithOverwrite = new ArrayList<>(); @@ -845,6 +866,7 @@ private int tryOverwrite( identifier, watermark, logOffsets, + properties, Snapshot.CommitKind.OVERWRITE, mustConflictCheck(), null); @@ -859,6 +881,7 @@ CommitResult tryCommitOnce( long identifier, @Nullable Long watermark, Map logOffsets, + Map properties, Snapshot.CommitKind commitKind, @Nullable Snapshot latestSnapshot, ConflictCheck conflictCheck, @@ -1030,7 +1053,9 @@ CommitResult tryCommitOnce( deltaRecordCount, recordCount(changelogFiles), currentWatermark, - statsFileName); + statsFileName, + // if empty properties, just set to null + properties.isEmpty() ? null : properties); } catch (Throwable e) { // fails when preparing for commit, we should clean up if (retryResult != null) { @@ -1178,7 +1203,8 @@ private ManifestCompactResult compactManifest(@Nullable ManifestCompactResult la 0L, 0L, latestSnapshot.watermark(), - latestSnapshot.statistics()); + latestSnapshot.statistics(), + latestSnapshot.properties()); if (!commitSnapshotImpl(newSnapshot, emptyList())) { return new ManifestCompactResult( diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index 9280f0006f2e..c7db185068b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -79,6 +79,7 @@ public Tag( @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_PROPERTIES) Map properties, @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { super( @@ -101,7 +102,8 @@ public Tag( deltaRecordCount, changelogRecordCount, watermark, - statistics); + statistics, + properties); this.tagCreateTime = tagCreateTime; this.tagTimeRetained = tagTimeRetained; } @@ -144,6 +146,7 @@ public static Tag fromSnapshotAndTagTtl( snapshot.changelogRecordCount(), snapshot.watermark(), snapshot.statistics(), + snapshot.properties(), tagCreateTime, tagTimeRetained); } @@ -169,7 +172,8 @@ public Snapshot trimToSnapshot() { deltaRecordCount, changelogRecordCount, watermark, - statistics); + statistics, + properties); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index b33055e9d5dc..6ed112de136a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -45,6 +45,80 @@ /** Compatibility Test for {@link ManifestCommittableSerializer}. */ public class ManifestCommittableSerializerCompatibilityTest { + @Test + public void testCompatibilityToV4CommitV7() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file"); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + 16, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + manifestCommittable.addProperty("k1", "v1"); + manifestCommittable.addProperty("k2", "v2"); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + + byte[] oldBytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v4-v7"), + true); + deserialized = serializer.deserialize(4, oldBytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + @Test public void testCompatibilityToV3CommitV7() throws IOException { SimpleStats keyStats = @@ -104,7 +178,7 @@ public void testCompatibilityToV3CommitV7() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] oldBytes = @@ -176,7 +250,7 @@ public void testCompatibilityToV3CommitV6() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] oldBytes = @@ -248,7 +322,7 @@ public void testCompatibilityToV3CommitV5() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] oldBytes = IOUtils.readFully( @@ -319,7 +393,7 @@ public void testCompatibilityToV3CommitV4() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] v2Bytes = @@ -391,7 +465,7 @@ public void testCompatibilityToV3CommitV3() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] oldBytes = @@ -463,7 +537,7 @@ public void testCompatibilityToV2CommitV2() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(3, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] v2Bytes = @@ -532,7 +606,7 @@ public void testCompatibilityToVersion2PaimonV07() throws IOException { ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); byte[] bytes = serializer.serialize(manifestCommittable); - ManifestCommittable deserialized = serializer.deserialize(2, bytes); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); assertThat(deserialized).isEqualTo(manifestCommittable); byte[] v2Bytes = diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index 6b2212d983ff..2372c7bb5f6a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -46,7 +46,8 @@ public void testCommittableSerDe() throws IOException { ManifestCommittableSerializer serializer = serializer(); ManifestCommittable committable = create(); byte[] serialized = serializer.serialize(committable); - assertThat(serializer.deserialize(2, serialized)).isEqualTo(committable); + assertThat(serializer.deserialize(serializer.getVersion(), serialized)) + .isEqualTo(committable); } public static ManifestCommittableSerializer serializer() { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 537a0ef774cc..b369beff43d4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -929,6 +929,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { commitIdentifier++, null, Collections.emptyMap(), + Collections.emptyMap(), Snapshot.CommitKind.APPEND, store.snapshotManager().latestSnapshot(), mustConflictCheck(), diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 8d97983a8b45..3f6c2aba1177 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -87,6 +87,7 @@ import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.apache.paimon.utils.HintFileUtils.LATEST; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -1014,6 +1015,34 @@ public void testManifestCompactFull() throws Exception { .isEqualTo(0); } + @Test + public void testCommitManifestWithProperties() throws Exception { + TestFileStore store = createStore(false); + + try (FileStoreCommit fileStoreCommit = store.newCommit()) { + fileStoreCommit.ignoreEmptyCommit(false); + + // commit with empty properties, the properties in snapshot should be null + ManifestCommittable manifestCommittable = new ManifestCommittable(0); + fileStoreCommit.commit(manifestCommittable, Collections.emptyMap()); + Snapshot snapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(snapshot.properties()).isNull(); + + // commit with non-empty properties + manifestCommittable = new ManifestCommittable(0); + manifestCommittable.addProperty("k1", "v1"); + manifestCommittable.addProperty("k2", "v2"); + fileStoreCommit.commit(manifestCommittable, Collections.emptyMap()); + snapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + Map expectedProps = new HashMap<>(); + expectedProps.put("k1", "v1"); + expectedProps.put("k2", "v2"); + Map snapshotProps = snapshot.properties(); + assertThat(snapshotProps).isNotNull(); + assertThat(snapshotProps).isEqualTo(expectedProps); + } + } + private TestFileStore createStore(boolean failing, Map options) throws Exception { return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, options); diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index 5acd26cf10da..3794c8dc8aaa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -407,6 +407,7 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, null); tagManager.createTag( snapshot1, @@ -435,6 +436,7 @@ public void testExpireTagsByTimeRetained() throws Exception { null, null, null, + null, null); tagManager.createTag( snapshot2, diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java index af7ef7ce7b57..0096238f72ec 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -50,6 +50,7 @@ public class TagTest { null, null, null, + null, null); @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index cf3d77feee63..b33227cad6b2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -262,6 +262,7 @@ public static Snapshot createSnapshotWithMillis(long id, long millis) { null, null, null, + null, null); } @@ -285,6 +286,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis, long watermark) null, null, watermark, + null, null); } @@ -309,6 +311,7 @@ private Changelog createChangelogWithMillis(long id, long millis) { null, null, null, + null, null)); } @@ -339,6 +342,7 @@ public void testLatestSnapshotOfUser() throws IOException, InterruptedException null, null, null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } @@ -391,6 +395,7 @@ public void testTraversalSnapshotsFromLatestSafely() throws IOException, Interru null, null, null, + null, null); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v4-v7 b/paimon-core/src/test/resources/compatibility/manifest-committable-v4-v7 new file mode 100644 index 000000000000..25db5463be15 Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/manifest-committable-v4-v7 differ