Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions paimon-api/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class Snapshot implements Serializable {
protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
protected static final String FIELD_WATERMARK = "watermark";
protected static final String FIELD_STATISTICS = "statistics";
protected static final String FIELD_PROPERTIES = "properties";

// version of snapshot
// null for paimon <= 0.2
Expand Down Expand Up @@ -195,6 +196,13 @@ public class Snapshot implements Serializable {
@Nullable
protected final String statistics;

// properties
// null for paimon <= 1.1 or empty properties
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_PROPERTIES)
@Nullable
protected final Map<String, String> properties;

public Snapshot(
long id,
long schemaId,
Expand All @@ -214,7 +222,8 @@ public Snapshot(
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics) {
@Nullable String statistics,
@Nullable Map<String, String> properties) {
this(
CURRENT_VERSION,
id,
Expand All @@ -235,7 +244,8 @@ public Snapshot(
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
statistics,
properties);
}

@JsonCreator
Expand All @@ -260,7 +270,8 @@ public Snapshot(
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
Expand All @@ -281,6 +292,7 @@ public Snapshot(
this.changelogRecordCount = changelogRecordCount;
this.watermark = watermark;
this.statistics = statistics;
this.properties = properties;
}

@JsonGetter(FIELD_VERSION)
Expand Down Expand Up @@ -395,6 +407,12 @@ public String statistics() {
return statistics;
}

@JsonGetter(FIELD_PROPERTIES)
@Nullable
public Map<String, String> properties() {
return properties;
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand All @@ -421,7 +439,8 @@ public int hashCode() {
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
statistics,
properties);
}

@Override
Expand Down Expand Up @@ -452,7 +471,8 @@ public boolean equals(Object o) {
&& Objects.equals(deltaRecordCount, that.deltaRecordCount)
&& Objects.equals(changelogRecordCount, that.changelogRecordCount)
&& Objects.equals(watermark, that.watermark)
&& Objects.equals(statistics, that.statistics);
&& Objects.equals(statistics, that.statistics)
&& Objects.equals(properties, that.properties);
}

/** Type of changes in this snapshot. */
Expand Down
9 changes: 6 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public Changelog(Snapshot snapshot) {
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.statistics());
snapshot.statistics(),
snapshot.properties);
}

@JsonCreator
Expand All @@ -87,7 +88,8 @@ public Changelog(
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@JsonProperty(FIELD_PROPERTIES) Map<String, String> properties) {
super(
version,
id,
Expand All @@ -108,7 +110,8 @@ public Changelog(
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
statistics,
properties);
}

public static Changelog fromJson(String json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ManifestCommittable {
private final long identifier;
@Nullable private final Long watermark;
private final Map<Integer, Long> logOffsets;
private final Map<String, String> properties;
private final List<CommitMessage> commitMessages;

public ManifestCommittable(long identifier) {
Expand All @@ -45,17 +46,28 @@ public ManifestCommittable(long identifier, @Nullable Long watermark) {
this.watermark = watermark;
this.logOffsets = new HashMap<>();
this.commitMessages = new ArrayList<>();
this.properties = new HashMap<>();
}

public ManifestCommittable(
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
List<CommitMessage> commitMessages) {
this(identifier, watermark, logOffsets, commitMessages, new HashMap<>());
}

public ManifestCommittable(
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
List<CommitMessage> commitMessages,
Map<String, String> properties) {
this.identifier = identifier;
this.watermark = watermark;
this.logOffsets = logOffsets;
this.commitMessages = commitMessages;
this.properties = properties;
}

public void addFileCommittable(CommitMessage commitMessage) {
Expand All @@ -72,6 +84,10 @@ public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
logOffsets.put(bucket, newOffset);
}

public void addProperty(String key, String value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show the example? How to add property? Add what property.

Copy link
Contributor Author

@luoyuxia luoyuxia Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like:

ManifestCommittable committable = new ManifestCommittable(COMMIT_IDENTIFIER);
committable. addProperty("fluss_offset_partition_1_bucket1", "14")
committable. addProperty("fluss_offset_partition_2_bucket2", "15")

properties.put(key, value);
}

public long identifier() {
return identifier;
}
Expand All @@ -89,6 +105,10 @@ public List<CommitMessage> fileCommittables() {
return commitMessages;
}

public Map<String, String> properties() {
return properties;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -101,12 +121,13 @@ public boolean equals(Object o) {
return Objects.equals(identifier, that.identifier)
&& Objects.equals(watermark, that.watermark)
&& Objects.equals(logOffsets, that.logOffsets)
&& Objects.equals(commitMessages, that.commitMessages);
&& Objects.equals(commitMessages, that.commitMessages)
&& Objects.equals(properties, that.properties);
}

@Override
public int hashCode() {
return Objects.hash(identifier, watermark, logOffsets, commitMessages);
return Objects.hash(identifier, watermark, logOffsets, commitMessages, properties);
}

@Override
Expand All @@ -116,7 +137,8 @@ public String toString() {
+ "identifier = %s, "
+ "watermark = %s, "
+ "logOffsets = %s, "
+ "commitMessages = %s",
identifier, watermark, logOffsets, commitMessages);
+ "commitMessages = %s, "
+ "properties = %s}",
identifier, watermark, logOffsets, commitMessages, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/** {@link VersionedSerializer} for {@link ManifestCommittable}. */
public class ManifestCommittableSerializer implements VersionedSerializer<ManifestCommittable> {

private static final int CURRENT_VERSION = 3;
private static final int CURRENT_VERSION = 4;

private final CommitMessageSerializer commitMessageSerializer;

Expand Down Expand Up @@ -62,6 +62,7 @@ public byte[] serialize(ManifestCommittable obj) throws IOException {
view.writeLong(watermark);
}
serializeOffsets(view, obj.logOffsets());
serializeProperties(view, obj.properties());
view.writeInt(commitMessageSerializer.getVersion());
commitMessageSerializer.serializeList(obj.fileCommittables(), view);
return out.toByteArray();
Expand All @@ -76,6 +77,15 @@ private void serializeOffsets(DataOutputViewStreamWrapper view, Map<Integer, Lon
}
}

private void serializeProperties(
DataOutputViewStreamWrapper view, Map<String, String> properties) throws IOException {
view.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
view.writeUTF(entry.getKey());
view.writeUTF(entry.getValue());
}
}

@Override
public ManifestCommittable deserialize(int version, byte[] serialized) throws IOException {
if (version > CURRENT_VERSION) {
Expand All @@ -91,6 +101,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
long identifier = view.readLong();
Long watermark = view.readBoolean() ? null : view.readLong();
Map<Integer, Long> offsets = deserializeOffsets(view);
Map<String, String> properties =
version == CURRENT_VERSION ? deserializeProperties(view) : new HashMap<>();
int fileCommittableSerializerVersion = view.readInt();
List<CommitMessage> fileCommittables;
try {
Expand All @@ -116,7 +128,8 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
fileCommittables = legacyV2CommitMessageSerializer.deserializeList(view);
}

return new ManifestCommittable(identifier, watermark, offsets, fileCommittables);
return new ManifestCommittable(
identifier, watermark, offsets, fileCommittables, properties);
}

private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) throws IOException {
Expand All @@ -127,4 +140,14 @@ private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) throws
}
return offsets;
}

private Map<String, String> deserializeProperties(DataInputDeserializer view)
throws IOException {
int size = view.readInt();
Map<String, String> properties = new HashMap<>(size);
for (int i = 0; i < size; i++) {
properties.put(view.readUTF(), view.readUTF());
}
return properties;
}
}
Loading