Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [Kernel] Catalog Commits (CCv2) Prototype B: Bag of Properties #10

Open
wants to merge 2 commits into
base: kernel_ccv2_prototype_a
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.actions.SingleAction;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.*;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -82,10 +84,14 @@ public interface Transaction {

// CloseableIterator<Row> finalizeActions(Engine engine, CloseableIterator<Row> dataActions);

// TODO: this is for the old POJO approach .... we don't need this for bag of properties?
Optional<Protocol> getUpdatedProtocol();

// TODO: this is for the old POJO approach .... we don't need this for bag of properties?
Optional<Metadata> getUpdatedMetadata();

Iterator<Tuple2<String, String>> getMetaInfo();

long getCommitAsVersion();

void resolveConflictsAndRebase(Engine engine, List<FileStatus> unbackfilledCommits);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.ccv2;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.FileStatus;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public abstract class BagOfPropertiesResolvedMetadata implements ResolvedMetadata {

//////////////////////
// static constants //
//////////////////////

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static final String PREFIX = "__delta__.";

/** __delta__.path */
public static final String PATH_KEY = PREFIX + "path";

/** __delta__.version */
public static final String VERSION_KEY = PREFIX + "version";

/** __delta__.__tracked_commit_files__.$filePath=json-encoded SIZE and MODIFICATION_TIME */
public static final String CATALOG_TRACKED_COMMIT_FILES_PREFIX =
PREFIX + "__tracked_commit_files__.";

// TODO: flatten this out
/** __delta__.protocol */
public static final String PROTOCOL_KEY = PREFIX + "protocol";

// TODO: flatten this out
/** __delta__.metadata */
public static final String METADATA_KEY = PREFIX + "metadata";

////////////////////////
// instance variables //
////////////////////////

public List<Tuple2<String, String>> propertiesList;
public Map<String, String> propertiesMap;

public BagOfPropertiesResolvedMetadata() {
this.propertiesList = null;
this.propertiesMap = null;
}

// TODO: this is janky / hacky
public void initialize(List<Tuple2<String, String>> propertiesList) {
this.propertiesList = propertiesList;
this.propertiesMap = propertiesList.stream().collect(Collectors.toMap(x -> x._1, x -> x._2));
}

////////////////////////////////
// ResolvedMetadata Overrides //
////////////////////////////////

@Override
public String getPath() {
return getOrThrow(PATH_KEY);
}

@Override
public long getVersion() {
return Long.parseLong(getOrThrow(VERSION_KEY));
}

@Override
public Optional<LogSegment> getLogSegment() {
return Optional.of(
new LogSegment(
new Path(getPath(), "_delta_log"),
getVersion(),
getCatalogTrackedFileStatuses() /* deltas */,
Collections.emptyList(),
100));
}

@Override
public Optional<Protocol> getProtocol() {
if (propertiesMap.containsKey(PROTOCOL_KEY)) {
return Optional.of(Protocol.fromJson(getOrThrow(PROTOCOL_KEY)));
}

return Optional.empty();
}

// TODO: JSON serializing the entire metadata will make small updates to large schemas
// unnecessarily expensive
@Override
public Optional<Metadata> getMetadata() {
if (propertiesMap.containsKey(METADATA_KEY)) {
return Optional.of(Metadata.fromJson(getOrThrow(METADATA_KEY)));
}

return Optional.empty();
}

@Override
public Optional<String> getSchemaString() {
return getMetadata().map(Metadata::getSchemaString);
}

// ===== Note: commit is *not* implemented ===== //

private String getOrThrow(String key) {
return propertiesMap.computeIfAbsent(
key,
k -> {
throw new RuntimeException(String.format("%s not found", k));
});
}

private List<FileStatus> getCatalogTrackedFileStatuses() {
return propertiesMap.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(CATALOG_TRACKED_COMMIT_FILES_PREFIX))
.map(
entry -> {
try {
final String filePath =
entry.getKey().substring(CATALOG_TRACKED_COMMIT_FILES_PREFIX.length());
final JsonNode jsonNode = OBJECT_MAPPER.readTree(entry.getValue());
final long size = jsonNode.get("size").asLong();
final long modificationTime = jsonNode.get("modificationTime").asLong();
return FileStatus.of(filePath, size, modificationTime);
} catch (Exception ex) {
throw new RuntimeException(
String.format(
"Failed to parse JSON entry %s -> %s", entry.getKey(), entry.getValue()),
ex);
}
})
.sorted(Comparator.comparing(FileStatus::getPath))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.delta.kernel.ccv2;

import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.internal.util.Tuple2;
import java.util.List;

public interface CommitResult {
Expand Down Expand Up @@ -52,8 +52,10 @@ default String resultString() {

String getMessage();

List<Tuple2<String, String>> properties();

// TODO: just call this catalog-registered commits? might be unbackfilled, might be backfilled,
// but we can all agree that they are registered in the catalog
List<FileStatus> unbackfilledCommits();
// List<FileStatus> unbackfilledCommits();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.CloseableIterator;
import java.util.Iterator;
import java.util.Optional;

public interface ResolvedMetadata {
Expand Down Expand Up @@ -60,10 +62,15 @@ public interface ResolvedMetadata {
// APIs for Kernel to interact with the invoking connector //
/////////////////////////////////////////////////////////////

// TODO: CommitInfo / timestamp
// TODO: metaInfo seems tightly coupled to catalog commits?
// why would a file system committer need this?

/**
* @param metaInfo ordered key value pairs of UPDATES and OVERRIDES and REMOVES (where the value
* is null) representing various Delta commit meta information
*/
CommitResult commit(
long commitAsVersion,
CloseableIterator<Row> finalizedActions,
Optional<Protocol> newProtocol,
Optional<Metadata> newMetadata);
Iterator<Tuple2<String, String>> metaInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

import io.delta.kernel.*;
import io.delta.kernel.ccv2.BagOfPropertiesResolvedMetadata;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
Expand Down Expand Up @@ -161,6 +162,31 @@ public Optional<Metadata> getUpdatedMetadata() {
return Optional.empty();
}

@Override
public Iterator<Tuple2<String, String>> getMetaInfo() {
final List<Tuple2<String, String>> output = new ArrayList<>();
getUpdatedProtocol()
.ifPresent(
newProtocol ->
output.add(
new Tuple2<>(
BagOfPropertiesResolvedMetadata.PROTOCOL_KEY,
JsonUtils.rowToJson(newProtocol.toRow()))));

getUpdatedMetadata()
.ifPresent(
newMetadata ->
output.add(
new Tuple2<>(
BagOfPropertiesResolvedMetadata.METADATA_KEY,
JsonUtils.rowToJson(newMetadata.toRow()))));

output.add(
new Tuple2<>(BagOfPropertiesResolvedMetadata.VERSION_KEY, Long.toString(commitAsVersion)));

return output.iterator();
}

@Override
public long getCommitAsVersion() {
return commitAsVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.delta.kernel.data.*;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.types.DataTypeJsonSerDe;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.*;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Metadata {

private static final Logger LOGGER = LoggerFactory.getLogger(Metadata.class);

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static Metadata fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
Expand All @@ -54,6 +64,58 @@ public static Metadata fromColumnVector(ColumnVector vector, int rowId) {
vector.getChild(7).getMap(rowId));
}

public static Metadata fromJson(String json) {
LOGGER.info("Parsing Metadata from JSON: " + json);
try {
final JsonNode rootNode = OBJECT_MAPPER.readTree(json);

String id = rootNode.get("id").asText();
Optional<String> name =
rootNode.has("name") && !rootNode.get("name").isNull()
? Optional.of(rootNode.get("name").asText())
: Optional.empty();
Optional<String> description =
rootNode.has("description") && !rootNode.get("description").isNull()
? Optional.of(rootNode.get("description").asText())
: Optional.empty();

Format format = new Format("parquet", Collections.emptyMap());

String schemaString = rootNode.get("schemaString").asText();
StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaString);

List<String> partitionColumns =
rootNode.has("partitionColumns")
? OBJECT_MAPPER.convertValue(
rootNode.get("partitionColumns"), new TypeReference<List<String>>() {})
: Collections.emptyList();

Optional<Long> createdTime =
rootNode.has("createdTime") && !rootNode.get("createdTime").isNull()
? Optional.of(rootNode.get("createdTime").asLong())
: Optional.empty();

Map<String, String> configuration =
rootNode.has("configuration")
? OBJECT_MAPPER.convertValue(
rootNode.get("configuration"), new TypeReference<Map<String, String>>() {})
: Collections.emptyMap();

return new Metadata(
id,
name,
description,
format,
schemaString,
schema,
VectorUtils.stringArrayValue(partitionColumns),
createdTime,
VectorUtils.stringStringMapValue(configuration));
} catch (IOException e) {
throw new IllegalArgumentException("Failed to parse Metadata from JSON: " + json, e);
}
}

public static final StructType FULL_SCHEMA =
new StructType()
.add("id", StringType.STRING, false /* nullable */)
Expand Down
Loading
Loading