diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java index 42f7ae608d..bac42de85b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java @@ -36,9 +36,11 @@ import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.actions.SingleAction; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.*; import java.net.URI; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -82,10 +84,14 @@ public interface Transaction { // CloseableIterator finalizeActions(Engine engine, CloseableIterator dataActions); + // TODO: this is for the old POJO approach .... we don't need this for bag of properties? Optional getUpdatedProtocol(); + // TODO: this is for the old POJO approach .... we don't need this for bag of properties? Optional getUpdatedMetadata(); + Iterator> getMetaInfo(); + long getCommitAsVersion(); void resolveConflictsAndRebase(Engine engine, List unbackfilledCommits); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/BagOfPropertiesResolvedMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/BagOfPropertiesResolvedMetadata.java new file mode 100644 index 0000000000..fef46a8020 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/BagOfPropertiesResolvedMetadata.java @@ -0,0 +1,162 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.ccv2; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.snapshot.LogSegment; +import io.delta.kernel.internal.util.Tuple2; +import io.delta.kernel.utils.FileStatus; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public abstract class BagOfPropertiesResolvedMetadata implements ResolvedMetadata { + + ////////////////////// + // static constants // + ////////////////////// + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static final String PREFIX = "__delta__."; + + /** __delta__.path */ + public static final String PATH_KEY = PREFIX + "path"; + + /** __delta__.version */ + public static final String VERSION_KEY = PREFIX + "version"; + + /** __delta__.__tracked_commit_files__.$filePath=json-encoded SIZE and MODIFICATION_TIME */ + public static final String CATALOG_TRACKED_COMMIT_FILES_PREFIX = + PREFIX + "__tracked_commit_files__."; + + // TODO: flatten this out + /** __delta__.protocol */ + public static final String PROTOCOL_KEY = PREFIX + "protocol"; + + // TODO: flatten this out + /** __delta__.metadata */ + public static final String METADATA_KEY = PREFIX + "metadata"; + + //////////////////////// + // instance variables // + //////////////////////// + + public List> propertiesList; + public Map propertiesMap; + + public BagOfPropertiesResolvedMetadata() { + this.propertiesList = null; + this.propertiesMap = null; + } + + // TODO: this is janky / hacky + public void initialize(List> propertiesList) { + this.propertiesList = propertiesList; + this.propertiesMap = propertiesList.stream().collect(Collectors.toMap(x -> x._1, x -> x._2)); + } + + //////////////////////////////// + // ResolvedMetadata Overrides // + //////////////////////////////// + + @Override + public String getPath() { + return getOrThrow(PATH_KEY); + } + + @Override + public long getVersion() { + return Long.parseLong(getOrThrow(VERSION_KEY)); + } + + @Override + public Optional getLogSegment() { + return Optional.of( + new LogSegment( + new Path(getPath(), "_delta_log"), + getVersion(), + getCatalogTrackedFileStatuses() /* deltas */, + Collections.emptyList(), + 100)); + } + + @Override + public Optional getProtocol() { + if (propertiesMap.containsKey(PROTOCOL_KEY)) { + return Optional.of(Protocol.fromJson(getOrThrow(PROTOCOL_KEY))); + } + + return Optional.empty(); + } + + // TODO: JSON serializing the entire metadata will make small updates to large schemas + // unnecessarily expensive + @Override + public Optional getMetadata() { + if (propertiesMap.containsKey(METADATA_KEY)) { + return Optional.of(Metadata.fromJson(getOrThrow(METADATA_KEY))); + } + + return Optional.empty(); + } + + @Override + public Optional getSchemaString() { + return getMetadata().map(Metadata::getSchemaString); + } + + // ===== Note: commit is *not* implemented ===== // + + private String getOrThrow(String key) { + return propertiesMap.computeIfAbsent( + key, + k -> { + throw new RuntimeException(String.format("%s not found", k)); + }); + } + + private List getCatalogTrackedFileStatuses() { + return propertiesMap.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(CATALOG_TRACKED_COMMIT_FILES_PREFIX)) + .map( + entry -> { + try { + final String filePath = + entry.getKey().substring(CATALOG_TRACKED_COMMIT_FILES_PREFIX.length()); + final JsonNode jsonNode = OBJECT_MAPPER.readTree(entry.getValue()); + final long size = jsonNode.get("size").asLong(); + final long modificationTime = jsonNode.get("modificationTime").asLong(); + return FileStatus.of(filePath, size, modificationTime); + } catch (Exception ex) { + throw new RuntimeException( + String.format( + "Failed to parse JSON entry %s -> %s", entry.getKey(), entry.getValue()), + ex); + } + }) + .sorted(Comparator.comparing(FileStatus::getPath)) + .collect(Collectors.toList()); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/CommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/CommitResult.java index eb30fe847f..f3fff7e001 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/CommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/CommitResult.java @@ -16,7 +16,7 @@ package io.delta.kernel.ccv2; -import io.delta.kernel.utils.FileStatus; +import io.delta.kernel.internal.util.Tuple2; import java.util.List; public interface CommitResult { @@ -52,8 +52,10 @@ default String resultString() { String getMessage(); + List> properties(); + // TODO: just call this catalog-registered commits? might be unbackfilled, might be backfilled, // but we can all agree that they are registered in the catalog - List unbackfilledCommits(); + // List unbackfilledCommits(); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/ResolvedMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/ResolvedMetadata.java index 9d77e8ba3e..dd22eb3d59 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/ResolvedMetadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/ccv2/ResolvedMetadata.java @@ -20,7 +20,9 @@ import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.snapshot.LogSegment; +import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.utils.CloseableIterator; +import java.util.Iterator; import java.util.Optional; public interface ResolvedMetadata { @@ -60,10 +62,15 @@ public interface ResolvedMetadata { // APIs for Kernel to interact with the invoking connector // ///////////////////////////////////////////////////////////// - // TODO: CommitInfo / timestamp + // TODO: metaInfo seems tightly coupled to catalog commits? + // why would a file system committer need this? + + /** + * @param metaInfo ordered key value pairs of UPDATES and OVERRIDES and REMOVES (where the value + * is null) representing various Delta commit meta information + */ CommitResult commit( long commitAsVersion, CloseableIterator finalizedActions, - Optional newProtocol, - Optional newMetadata); + Iterator> metaInfo); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index bc3c4c2dc1..14145f7a4a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -23,6 +23,7 @@ import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import io.delta.kernel.*; +import io.delta.kernel.ccv2.BagOfPropertiesResolvedMetadata; import io.delta.kernel.data.Row; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.ConcurrentWriteException; @@ -161,6 +162,31 @@ public Optional getUpdatedMetadata() { return Optional.empty(); } + @Override + public Iterator> getMetaInfo() { + final List> output = new ArrayList<>(); + getUpdatedProtocol() + .ifPresent( + newProtocol -> + output.add( + new Tuple2<>( + BagOfPropertiesResolvedMetadata.PROTOCOL_KEY, + JsonUtils.rowToJson(newProtocol.toRow())))); + + getUpdatedMetadata() + .ifPresent( + newMetadata -> + output.add( + new Tuple2<>( + BagOfPropertiesResolvedMetadata.METADATA_KEY, + JsonUtils.rowToJson(newMetadata.toRow())))); + + output.add( + new Tuple2<>(BagOfPropertiesResolvedMetadata.VERSION_KEY, Long.toString(commitAsVersion))); + + return output.iterator(); + } + @Override public long getCommitAsVersion() { return commitAsVersion; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java index dac74b4f8a..9a74821679 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java @@ -19,17 +19,27 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.delta.kernel.data.*; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.types.DataTypeJsonSerDe; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.*; +import java.io.IOException; import java.util.*; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Metadata { + private static final Logger LOGGER = LoggerFactory.getLogger(Metadata.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static Metadata fromColumnVector(ColumnVector vector, int rowId) { if (vector.isNullAt(rowId)) { return null; @@ -54,6 +64,58 @@ public static Metadata fromColumnVector(ColumnVector vector, int rowId) { vector.getChild(7).getMap(rowId)); } + public static Metadata fromJson(String json) { + LOGGER.info("Parsing Metadata from JSON: " + json); + try { + final JsonNode rootNode = OBJECT_MAPPER.readTree(json); + + String id = rootNode.get("id").asText(); + Optional name = + rootNode.has("name") && !rootNode.get("name").isNull() + ? Optional.of(rootNode.get("name").asText()) + : Optional.empty(); + Optional description = + rootNode.has("description") && !rootNode.get("description").isNull() + ? Optional.of(rootNode.get("description").asText()) + : Optional.empty(); + + Format format = new Format("parquet", Collections.emptyMap()); + + String schemaString = rootNode.get("schemaString").asText(); + StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaString); + + List partitionColumns = + rootNode.has("partitionColumns") + ? OBJECT_MAPPER.convertValue( + rootNode.get("partitionColumns"), new TypeReference>() {}) + : Collections.emptyList(); + + Optional createdTime = + rootNode.has("createdTime") && !rootNode.get("createdTime").isNull() + ? Optional.of(rootNode.get("createdTime").asLong()) + : Optional.empty(); + + Map configuration = + rootNode.has("configuration") + ? OBJECT_MAPPER.convertValue( + rootNode.get("configuration"), new TypeReference>() {}) + : Collections.emptyMap(); + + return new Metadata( + id, + name, + description, + format, + schemaString, + schema, + VectorUtils.stringArrayValue(partitionColumns), + createdTime, + VectorUtils.stringStringMapValue(configuration)); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse Metadata from JSON: " + json, e); + } + } + public static final StructType FULL_SCHEMA = new StructType() .add("id", StringType.STRING, false /* nullable */) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java index e71036935d..a479d4b376 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java @@ -17,6 +17,9 @@ import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.delta.kernel.data.*; import io.delta.kernel.internal.TableFeatures; import io.delta.kernel.internal.data.GenericRow; @@ -26,10 +29,17 @@ import io.delta.kernel.types.IntegerType; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; +import java.io.IOException; import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Protocol { + private static final Logger LOGGER = LoggerFactory.getLogger(Protocol.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static Protocol fromColumnVector(ColumnVector vector, int rowId) { if (vector.isNullAt(rowId)) { return null; @@ -46,6 +56,26 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) { : VectorUtils.toJavaList(vector.getChild(3).getArray(rowId))); } + public static Protocol fromJson(String json) { + LOGGER.info("Parsing Protocol from JSON: " + json); + try { + final JsonNode jsonNode = OBJECT_MAPPER.readTree(json); + return new Protocol( + jsonNode.get("minReaderVersion").asInt(), + jsonNode.get("minWriterVersion").asInt(), + jsonNode.has("readerFeatures") + ? OBJECT_MAPPER.convertValue( + jsonNode.get("readerFeatures"), new TypeReference>() {}) + : Collections.emptyList(), + jsonNode.has("writerFeatures") + ? OBJECT_MAPPER.convertValue( + jsonNode.get("writerFeatures"), new TypeReference>() {}) + : Collections.emptyList()); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse Protocol from JSON: " + json, e); + } + } + public static final StructType FULL_SCHEMA = new StructType() .add("minReaderVersion", IntegerType.INTEGER, false /* nullable */) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/JsonUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/JsonUtils.java index 832e2b6193..27e5ce97b6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/JsonUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/JsonUtils.java @@ -16,9 +16,34 @@ package io.delta.kernel.internal.util; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -50,4 +75,161 @@ public static Map parseJSONKeyValueMap(String jsonString) { throw new KernelException(String.format("Failed to parse JSON string: %s", jsonString), e); } } + + static { + MAPPER.registerModule(new SimpleModule().addSerializer(Row.class, new RowSerializer())); + } + + /** + * Converts a {@link Row} to a single line JSON string. This is currently used just in tests. Wll + * be used as part of the refactoring planned in #2929 + * + * @param row the row to convert + * @return JSON string + */ + public static String rowToJson(Row row) { + try { + return MAPPER.writeValueAsString(row); + } catch (JsonProcessingException ex) { + throw new RuntimeException("Could not serialize row object to JSON", ex); + } + } + + public static class RowSerializer extends StdSerializer { + public RowSerializer() { + super(Row.class); + } + + @Override + public void serialize(Row row, JsonGenerator gen, SerializerProvider provider) + throws IOException { + writeRow(gen, row, row.getSchema()); + } + + private void writeRow(JsonGenerator gen, Row row, StructType schema) throws IOException { + gen.writeStartObject(); + for (int columnOrdinal = 0; columnOrdinal < schema.length(); columnOrdinal++) { + StructField field = schema.at(columnOrdinal); + if (!row.isNullAt(columnOrdinal)) { + gen.writeFieldName(field.getName()); + writeValue(gen, row, columnOrdinal, field.getDataType()); + } + } + gen.writeEndObject(); + } + + private void writeStruct(JsonGenerator gen, ColumnVector vector, StructType type, int rowId) + throws IOException { + gen.writeStartObject(); + for (int columnOrdinal = 0; columnOrdinal < type.length(); columnOrdinal++) { + StructField field = type.at(columnOrdinal); + ColumnVector childVector = vector.getChild(columnOrdinal); + if (!childVector.isNullAt(rowId)) { + gen.writeFieldName(field.getName()); + writeValue(gen, childVector, rowId, field.getDataType()); + } + } + gen.writeEndObject(); + } + + private void writeArrayValue(JsonGenerator gen, ArrayValue arrayValue, ArrayType arrayType) + throws IOException { + gen.writeStartArray(); + ColumnVector arrayElems = arrayValue.getElements(); + for (int i = 0; i < arrayValue.getSize(); i++) { + if (arrayElems.isNullAt(i)) { + // Jackson serializes the null values in the array, but not in the map + gen.writeNull(); + } else { + writeValue(gen, arrayValue.getElements(), i, arrayType.getElementType()); + } + } + gen.writeEndArray(); + } + + private void writeMapValue(JsonGenerator gen, MapValue mapValue, MapType mapType) + throws IOException { + assertSupportedMapType(mapType); + gen.writeStartObject(); + ColumnVector keys = mapValue.getKeys(); + ColumnVector values = mapValue.getValues(); + for (int i = 0; i < mapValue.getSize(); i++) { + gen.writeFieldName(keys.getString(i)); + if (!values.isNullAt(i)) { + writeValue(gen, values, i, mapType.getValueType()); + } else { + gen.writeNull(); + } + } + gen.writeEndObject(); + } + + private void writeValue(JsonGenerator gen, Row row, int columnOrdinal, DataType type) + throws IOException { + checkArgument(!row.isNullAt(columnOrdinal), "value should not be null"); + if (type instanceof BooleanType) { + gen.writeBoolean(row.getBoolean(columnOrdinal)); + } else if (type instanceof ByteType) { + gen.writeNumber(row.getByte(columnOrdinal)); + } else if (type instanceof ShortType) { + gen.writeNumber(row.getShort(columnOrdinal)); + } else if (type instanceof IntegerType) { + gen.writeNumber(row.getInt(columnOrdinal)); + } else if (type instanceof LongType) { + gen.writeNumber(row.getLong(columnOrdinal)); + } else if (type instanceof FloatType) { + gen.writeNumber(row.getFloat(columnOrdinal)); + } else if (type instanceof DoubleType) { + gen.writeNumber(row.getDouble(columnOrdinal)); + } else if (type instanceof StringType) { + gen.writeString(row.getString(columnOrdinal)); + } else if (type instanceof StructType) { + writeRow(gen, row.getStruct(columnOrdinal), (StructType) type); + } else if (type instanceof ArrayType) { + writeArrayValue(gen, row.getArray(columnOrdinal), (ArrayType) type); + } else if (type instanceof MapType) { + writeMapValue(gen, row.getMap(columnOrdinal), (MapType) type); + } else { + // `binary` type is not supported according the Delta Protocol + throw new UnsupportedOperationException("unsupported data type: " + type); + } + } + + private void writeValue(JsonGenerator gen, ColumnVector vector, int rowId, DataType type) + throws IOException { + checkArgument(!vector.isNullAt(rowId), "value should not be null"); + if (type instanceof BooleanType) { + gen.writeBoolean(vector.getBoolean(rowId)); + } else if (type instanceof ByteType) { + gen.writeNumber(vector.getByte(rowId)); + } else if (type instanceof ShortType) { + gen.writeNumber(vector.getShort(rowId)); + } else if (type instanceof IntegerType) { + gen.writeNumber(vector.getInt(rowId)); + } else if (type instanceof LongType) { + gen.writeNumber(vector.getLong(rowId)); + } else if (type instanceof FloatType) { + gen.writeNumber(vector.getFloat(rowId)); + } else if (type instanceof DoubleType) { + gen.writeNumber(vector.getDouble(rowId)); + } else if (type instanceof StringType) { + gen.writeString(vector.getString(rowId)); + } else if (type instanceof StructType) { + writeStruct(gen, vector, (StructType) type, rowId); + } else if (type instanceof ArrayType) { + writeArrayValue(gen, vector.getArray(rowId), (ArrayType) type); + } else if (type instanceof MapType) { + writeMapValue(gen, vector.getMap(rowId), (MapType) type); + } else { + throw new UnsupportedOperationException("unsupported data type: " + type); + } + } + } + + private static void assertSupportedMapType(MapType keyType) { + checkArgument( + keyType.getKeyType() instanceof StringType, + "Only STRING type keys are supported in MAP type in JSON serialization"); + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/InMemoryCCv2Suite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/InMemoryCCv2Suite.scala index 92c2ce3628..615a92cc22 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/InMemoryCCv2Suite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/InMemoryCCv2Suite.scala @@ -11,7 +11,6 @@ import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.defaults.utils.TestRow import io.delta.kernel.test.VectorTestUtils import io.delta.kernel.utils.{CloseableIterable, CloseableIterator} -import org.datanucleus.ExecutionContext import org.scalatest.funsuite.AnyFunSuite // scalastyle:off println @@ -129,8 +128,9 @@ class InMemoryCCv2Suite extends AnyFunSuite val result = rm.commit( txn.getCommitAsVersion, txn.getFinalizedActions(defaultEngine), - txn.getUpdatedProtocol, - txn.getUpdatedMetadata + txn.getMetaInfo +// txn.getUpdatedProtocol, +// txn.getUpdatedMetadata ) attempt += 1 @@ -146,16 +146,17 @@ class InMemoryCCv2Suite extends AnyFunSuite println(s"Commit failed (non-retryable) with: ${fail.getMessage}") throw new RuntimeException(s"Commit failed (non-retryable): ${fail.getMessage}") case retryable: CommitResult.RetryableFailure => - println(s"Commit failed (retryable) with: ${retryable.getMessage}. " + - s"Unbackfilled commits: ${retryable.unbackfilledCommits()}") - - txn.resolveConflictsAndRebase(defaultEngine, retryable.unbackfilledCommits()) - - if (attempt < MAX_ATTEMPTS) { - println(s"Retrying in $sleepMillis ms...") - Thread.sleep(sleepMillis) - sleepMillis *= 2 // Exponential backoff (doubles each time) - } + // TODO: get the unbackfilled commits from the properties returned by the catalog +// println(s"Commit failed (retryable) with: ${retryable.getMessage}. " + +// s"Unbackfilled commits: ${retryable.unbackfilledCommits()}") +// +// txn.resolveConflictsAndRebase(defaultEngine, retryable.unbackfilledCommits()) +// +// if (attempt < MAX_ATTEMPTS) { +// println(s"Retrying in $sleepMillis ms...") +// Thread.sleep(sleepMillis) +// sleepMillis *= 2 // Exponential backoff (doubles each time) +// } } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/CCv2Client.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/CCv2Client.scala index 4ef7efdacc..61c074b255 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/CCv2Client.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/CCv2Client.scala @@ -1,17 +1,15 @@ package io.delta.kernel.defaults.ccv2.setup -import java.util -import java.util.{Collections, Optional, UUID} +import java.util.{Collections, UUID, Iterator => IteratorJ} import scala.collection.JavaConverters._ -import io.delta.kernel.ccv2.{CommitResult, ResolvedMetadata} +import com.fasterxml.jackson.databind.ObjectMapper +import io.delta.kernel.ccv2.{BagOfPropertiesResolvedMetadata, CommitResult, ResolvedMetadata} import io.delta.kernel.data.Row import io.delta.kernel.engine.Engine -import io.delta.kernel.internal.actions.{Metadata, Protocol} import io.delta.kernel.internal.fs.Path -import io.delta.kernel.internal.snapshot.LogSegment -import io.delta.kernel.internal.util.FileNames +import io.delta.kernel.internal.util.{FileNames, Tuple2 => Tuple2J} import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path => HadoopPath} @@ -35,8 +33,9 @@ class CCv2Client(engine: Engine, catalogClient: CatalogClient) { // ResolvedCatalogMetadataCommitter // ////////////////////////////////////// -trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => - +trait ResolvedCatalogMetadataCommitter extends { self: BagOfPropertiesResolvedMetadata => + import ResolvedCatalogMetadataCommitter._ + import BagOfPropertiesResolvedMetadata._ import JavaScalaUtils._ // lazy so the child can finish initializing before we use `getPath` @@ -48,14 +47,11 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => def logger: Logger def tableName: String - /** The *potentially* unbackfilled commits. Some may actually be backfilled. */ - def unbackfilledCommits: Seq[FileStatus] - override def commit( commitAsVersion: Long, finalizedActions: CloseableIterator[Row], - newProtocol: Optional[Protocol], - newMetadata: Optional[Metadata]): CommitResult = { + metaInfo: IteratorJ[Tuple2J[String, String]]): CommitResult = { + val metaInfoList = metaInfo.asScala.toList // we want to print it out + use it multiple times val logPath = s"$getPath/_delta_log" val uuidCommitsPath = s"$logPath/_commits" val commitFilePath = @@ -65,8 +61,7 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => logger.info(s"dataPath: $getPath") logger.info(s"commitAsVersion: $commitAsVersion") logger.info(s"commitFilePath: $commitFilePath") - logger.info(s"newProtocol: $newProtocol") - logger.info(s"newMetadata: $newMetadata") + logger.info(s"metaInfo: ${metaInfoList.map(x => (x._1, x._2)).mkString(", ")}") logger.info("Write UUID commit file: START") engine @@ -79,31 +74,60 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => val kernelFs = FileStatus.of(hadoopFs.getPath.toString, hadoopFs.getLen, hadoopFs.getModificationTime) + val fsMapData = Map( + "size" -> hadoopFs.getLen.toString, + "modificationTime" -> hadoopFs.getModificationTime.toString) + + val commitKey = CATALOG_TRACKED_COMMIT_FILES_PREFIX + + hadoopFs.getPath.toString + val commitJsonValue = OBJECT_MAPPER.writeValueAsString(fsMapData.asJava) + val requirement = Requirement( + name = "commitAsVersion", + f = latestProperties => { + val latestVersion = latestProperties.getOrElse(VERSION_KEY, "-1").toLong + + latestVersion == commitAsVersion - 1 + } + ) + + var allProperties = List((commitKey, commitJsonValue)) ++ + metaInfoList.map(x => (x._1, x._2)) + + if (commitAsVersion == 0) { + allProperties = allProperties :+ (PATH_KEY, getPath) + } + logger.info(s"hadoopFS: $hadoopFs") logger.info(s"kernelFs: $kernelFs") - + logger.info(s"File status Entry: $commitKey -> $commitJsonValue") + logger.info(s"allProperties to write to catalog:\n${allProperties.mkString("\n")}") logger.info("Commit to catalog: START") val result = catalogClient - .commit(tableName, kernelFs, newProtocol.asScala, newMetadata.asScala) match { - case CommitResponse.Success => + .setProperties(tableName, allProperties, List(requirement)) match { + case SetPropertiesResponse.Success => logger.info("Commit to catalog: SUCCESS") new CommitResult.Success { override def getCommitAttemptVersion: Long = commitAsVersion } - case CommitResponse.TableDoesNotExist(tableName) => + case SetPropertiesResponse.TableDoesNotExist => logger.info("Commit to catalog: TABLE DOES NOT EXIST") new CommitResult.NonRetryableFailure { override def getMessage: String = s"Table $tableName does not exist" + override def getCommitAttemptVersion: Long = commitAsVersion } - case CommitResponse.CommitVersionConflict(attempted, expected, commits) => + case SetPropertiesResponse.RequirementFailed( + requirement, latestProperties: List[(String, String)]) => + logger.info(s"Commit to catalog: REQUIREMENT FAILED ${requirement.name}") new CommitResult.RetryableFailure { - override def getMessage: String = - s"Commit version conflict: attempted=$attempted, expected=$expected" + override def getMessage: String = s"Requirement failed: ${requirement.name}" + override def getCommitAttemptVersion: Long = commitAsVersion - override def unbackfilledCommits(): util.List[FileStatus] = commits.toList.asJava + override def properties(): java.util.List[Tuple2J[String, String]] = { + latestProperties.map { case (k, v) => new Tuple2J(k, v) }.asJava + } } } @@ -122,15 +146,20 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => private def backfill(commitAsVersion: Long, committedFileStatus: FileStatus): Unit = { logger.info(s"Backfilling: START. commitAsVersion=$commitAsVersion") - val allCandidateUnbackfilledFiles = unbackfilledCommits ++ Seq(committedFileStatus) + val allCandidateUnbackfilledFilePaths = Seq(committedFileStatus.getPath) ++ propertiesMap + .asScala + .filter { case (k, _) => k.startsWith(CATALOG_TRACKED_COMMIT_FILES_PREFIX) } + .map { case (k, _) => k.stripPrefix(CATALOG_TRACKED_COMMIT_FILES_PREFIX) } + + logger.info(s"allCandidateUnbackfilledFilePaths: $allCandidateUnbackfilledFilePaths") - allCandidateUnbackfilledFiles + allCandidateUnbackfilledFilePaths // e.g. perhaps some of the deltas we got back from the catalog were in fact backfilled - .filter(fs => FileNames.isUnbackfilledDeltaFile(fs.getPath)) - .foreach { fs => - val fsVersion = FileNames.uuidCommitDeltaVersion(fs.getPath) + .filter(path => FileNames.isUnbackfilledDeltaFile(path)) + .foreach { path => + val fsVersion = FileNames.uuidCommitDeltaVersion(path) val backfilledFilePath = FileNames.deltaFile(logPath, fsVersion) - logger.info(s"Unbackfilled fs: ${fs.getPath}") + logger.info(s"Unbackfilled fs: ${path}") logger.info(s"Unbackfilled version: $fsVersion") logger.info(s"Backfilled file path: $backfilledFilePath") @@ -138,7 +167,7 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => logger.info(s"Backfilled file already exists: $backfilledFilePath") } else { logger.info(s"Backfilling: $backfilledFilePath") - val sourceUnbackfilledPath = new HadoopPath(fs.getPath) + val sourceUnbackfilledPath = new HadoopPath(path) val targetBackfilledPath = new HadoopPath(backfilledFilePath) logger.info(s"Copying $sourceUnbackfilledPath to $targetBackfilledPath") @@ -155,11 +184,22 @@ trait ResolvedCatalogMetadataCommitter extends { self: ResolvedMetadata => } logger.info(s"Invoking catalog with latest backfilled version: $commitAsVersion") - catalogClient.setLatestBackfilledVersion(tableName, commitAsVersion) + val backfillProperties = allCandidateUnbackfilledFilePaths.map { path => + (CATALOG_TRACKED_COMMIT_FILES_PREFIX + path, null) + }.toList + logger.info(s"backfillProperties: $backfillProperties") + catalogClient + .setProperties(tableName, backfillProperties) logger.info("Backfilling: END") } } +object ResolvedCatalogMetadataCommitter { + // create object mapper + private val OBJECT_MAPPER = new ObjectMapper; + +} + //////////////////////////////////// // StagingCatalogResolvedMetadata // //////////////////////////////////// @@ -168,37 +208,26 @@ class StagingCatalogResolvedMetadata( override val tableName: String, override val engine: Engine, override val catalogClient: CatalogClient) - extends ResolvedMetadata with ResolvedCatalogMetadataCommitter { + extends BagOfPropertiesResolvedMetadata with ResolvedCatalogMetadataCommitter { + import BagOfPropertiesResolvedMetadata._ import StagingCatalogResolvedMetadata._ - val stagingTablePath = catalogClient.createStagingTable(tableName) match { - case CreateStagingTableResponse.Success(path) => path - case CreateStagingTableResponse.TableAlreadyExists(tableName) => + catalogClient.createStagingTable(tableName) match { + case CreateStagingTableResponse.Success(path) => + super.initialize( + Seq( + new Tuple2J(PATH_KEY, path), + new Tuple2J(VERSION_KEY, "-1"), + ).toList.asJava + ) + case CreateStagingTableResponse.TableAlreadyExists => throw new RuntimeException(s"Table $tableName already exists") } - _logger.info(s"stagingTablePath: $stagingTablePath") - // ===== ResolvedCatalogMetadataCommitter overrides ===== // override def logger: Logger = _logger - - override def unbackfilledCommits: Seq[FileStatus] = Seq.empty - - // ===== ResolvedMetadata overrides ===== // - - override def getPath: String = stagingTablePath - - override def getVersion: Long = -1 - - override def getLogSegment: Optional[LogSegment] = Optional.empty() - - override def getProtocol: Optional[Protocol] = Optional.empty() - - override def getMetadata: Optional[Metadata] = Optional.empty() - - override def getSchemaString: Optional[String] = Optional.empty() } object StagingCatalogResolvedMetadata { @@ -213,64 +242,22 @@ class ResolvedCatalogMetadata( override val tableName: String, override val engine: Engine, override val catalogClient: CatalogClient) - extends ResolvedMetadata with ResolvedCatalogMetadataCommitter { + extends BagOfPropertiesResolvedMetadata with ResolvedCatalogMetadataCommitter { + import JavaScalaUtils._ import ResolvedCatalogMetadata._ - private val resolvedTableResponse: ResolveTableResponse.Success = - catalogClient.resolveTable(tableName) match { - case success: ResolveTableResponse.Success => - _logger.info(s"Received success ResolveTableResponse: $success") - success - case ResolveTableResponse.TableDoesNotExist(tableName) => - throw new RuntimeException(s"Table $tableName does not exist") - } - - private val getCommitsResponse: GetCommitsResponse.Success = - catalogClient.getCommits(tableName) match { - case success: GetCommitsResponse.Success => - _logger.info(s"Received success GetCommitsResponse: $success") - success - case GetCommitsResponse.TableDoesNotExist(tableName) => - throw new RuntimeException(s"Table $tableName does not exist") - } - - private val dataPath = resolvedTableResponse.path + catalogClient.getProperties(tableName) match { + case GetPropertiesResponse.Success(properties) => + super.initialize(properties.map { case (k, v) => new Tuple2J(k, v) }.asJava) + case GetPropertiesResponse.TableDoesNotExist => + throw new RuntimeException(s"Table $tableName does not exists") + } // ===== ResolvedCatalogMetadataCommitter overrides ===== // override def logger: Logger = _logger - override def unbackfilledCommits: Seq[FileStatus] = - getLogSegment - .map[List[io.delta.kernel.utils.FileStatus]]( - logSegment => logSegment.getDeltas.asScala.toList) - .asScala - .getOrElse(Seq.empty) - - // ===== ResolvedMetadata overrides ===== // - - override def getPath: String = dataPath - - override def getVersion: Long = resolvedTableResponse.version - - override def getLogSegment: Optional[LogSegment] = - Optional.of( - new LogSegment( - new Path(resolvedTableResponse.path, "_delta_log"), - resolvedTableResponse.version, - getCommitsResponse.commits.toList.asJava, - Collections.emptyList(), - 100 - ) - ) - - override def getProtocol: Optional[Protocol] = resolvedTableResponse.protocol.asJava - - override def getMetadata: Optional[Metadata] = resolvedTableResponse.metadata.asJava - - override def getSchemaString: Optional[String] = resolvedTableResponse.schemaString.asJava - } object ResolvedCatalogMetadata { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/Catalog.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/Catalog.scala index c27125b400..6249053a44 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/Catalog.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/Catalog.scala @@ -10,81 +10,51 @@ import io.delta.kernel.utils.FileStatus trait CatalogClient { def createStagingTable(tableName: String): CreateStagingTableResponse - def resolveTable(tableName: String): ResolveTableResponse + def getProperties(tableName: String): GetPropertiesResponse - def getCommits(tableName: String): GetCommitsResponse - - // TODO: we might want to pass the commit timestamp? For the case of ICT? - // TODO: can we avoid passing in Kernel/Delta types to the catalog? - def commit( + // e.g. COMMIT --> add new properties for UUID file status, protocol, metadata + // e.g. SET LAST BACKFILLED VERSION --> ???? remove all properties which we parse and determine + // the version is less than the incoming version ??? + def setProperties( tableName: String, - commitFile: FileStatus, - updatedProtocol: Option[Protocol] = None, - updatedMetadata: Option[Metadata] = None): CommitResponse + properties: List[(String, String)], + requirements: List[Requirement] = List.empty + ): SetPropertiesResponse - def setLatestBackfilledVersion( - tableName: String, - latestBackfilledVersion: Long): SetLatestBackfilledVersionResponse } -// ===== CreateStagingTableResponse ===== +case class Requirement(name: String, f: Map[String, String] => Boolean) + +// ===== CreateStagingTableResponse ===== // sealed trait CreateStagingTableResponse object CreateStagingTableResponse { final case class Success(path: String) extends CreateStagingTableResponse - final case class TableAlreadyExists(tableName: String) extends CreateStagingTableResponse -} - -// ===== ResolveTableResponse ===== - -sealed trait ResolveTableResponse - -object ResolveTableResponse { - - final case class Success( - path: String, - version: Long, - protocol: Option[Protocol], - metadata: Option[Metadata], - schemaString: Option[String]) extends ResolveTableResponse - - final case class TableDoesNotExist(tableName: String) extends ResolveTableResponse + final case object TableAlreadyExists extends CreateStagingTableResponse } -// ===== GetCommitsResponse ===== +// ===== GetPropertiesResponse ===== // -sealed trait GetCommitsResponse +sealed trait GetPropertiesResponse -object GetCommitsResponse { - final case class Success( - commits: scala.collection.immutable.Seq[FileStatus]) extends GetCommitsResponse +object GetPropertiesResponse { + final case class Success(properties: List[(String, String)]) extends GetPropertiesResponse - final case class TableDoesNotExist(tableName: String) extends GetCommitsResponse + final case object TableDoesNotExist extends GetPropertiesResponse } -// ===== CommitResponse =====commits - -sealed trait CommitResponse - -object CommitResponse { - case object Success extends CommitResponse - - final case class TableDoesNotExist(tableName: String) extends CommitResponse - - final case class CommitVersionConflict( - attemptedCommitVersion: Long, - expectedVersion: Long, - commits: scala.collection.immutable.Seq[FileStatus]) extends CommitResponse -} +// ===== SetPropertiesResponse ===== // -// ===== SetLatestBackfilledVersionResponse ===== +sealed trait SetPropertiesResponse -sealed trait SetLatestBackfilledVersionResponse +object SetPropertiesResponse { + final case object Success extends SetPropertiesResponse -object SetLatestBackfilledVersionResponse { - case object Success extends SetLatestBackfilledVersionResponse + final case object TableDoesNotExist extends SetPropertiesResponse - final case class TableDoesNotExist(tableName: String) extends SetLatestBackfilledVersionResponse + final case class RequirementFailed( + requirement: Requirement, + properties: List[(String, String)]) extends SetPropertiesResponse } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/InMemoryCatalogClient.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/InMemoryCatalogClient.scala index d5f566bfab..b7cf58d863 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/InMemoryCatalogClient.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ccv2/setup/InMemoryCatalogClient.scala @@ -3,169 +3,96 @@ package io.delta.kernel.defaults.ccv2.setup import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import io.delta.kernel.internal.actions.{Metadata, Protocol} import io.delta.kernel.internal.fs.Path -import io.delta.kernel.internal.util.FileNames -import io.delta.kernel.utils.FileStatus class InMemoryCatalogClient(workspace: Path = new Path("/tmp/in_memory_catalog/")) extends CatalogClient { import InMemoryCatalogClient._ + java.nio.file.Files.createDirectories(java.nio.file.Paths.get(workspace.toString)) - case class CatalogTableData( - path: String, - var maxCommitVersion: Long = -1L, - var commits: scala.collection.mutable.ArrayBuffer[FileStatus], - var latestBackfilledVersion: Option[Long] = None, - var latestProtocol: Option[Protocol], - var latestMetadata: Option[Metadata]) { - - def latestSchemaString: Option[String] = latestMetadata.map(_.getSchemaString) - } + /////////////////// + // Member values // + /////////////////// - java.nio.file.Files.createDirectories(java.nio.file.Paths.get(workspace.toString)) + case class CatalogTableData(properties: scala.collection.mutable.TreeMap[String, String]) /** Map from tableName -> CatalogTableData */ val catalogTables = new ConcurrentHashMap[String, CatalogTableData]() /** Map from tableName -> stagingTablePath */ - // TODO: support concurrent staging tables + // TODO: support concurrent staging tables, probably using some UUID? val stagingTables = new ConcurrentHashMap[String, String]() + //////////////// + // Public API // + //////////////// + override def createStagingTable(tableName: String): CreateStagingTableResponse = { catalogTables.get(tableName) match { case null => + // TODO: support concurrent staging tables, probably using some UUID? val uuid = UUID.randomUUID().toString.replace("-", "").take(15) val tablePath = s"${workspace.toString}/${tableName}_$uuid" - logger.info(s"creating new staging table $tableName -> $tablePath") + logger.info(s"CREATE STAGING TABLE :: NEW :: $tableName -> $tablePath") stagingTables.put(tableName, tablePath) CreateStagingTableResponse.Success(tablePath) - case data => - logger.info(s"table already exists $tableName, cannot create a new staging table") - CreateStagingTableResponse.TableAlreadyExists(tableName) - } - } - - override def resolveTable(tableName: String): ResolveTableResponse = { - catalogTables.get(tableName) match { - case null => - ResolveTableResponse.TableDoesNotExist(tableName) - case data => - ResolveTableResponse.Success( - path = data.path, - version = data.maxCommitVersion, - protocol = data.latestProtocol, - metadata = data.latestMetadata, - schemaString = data.latestSchemaString - ) + case tableData => + logger.info(s"CREATE STAGING TABLE :: ALREADY EXISTS :: $tableName") + CreateStagingTableResponse.TableAlreadyExists } } - override def getCommits(tableName: String): GetCommitsResponse = { + override def getProperties(tableName: String): GetPropertiesResponse = { catalogTables.get(tableName) match { case null => - logger.info(s"table does not exist :: $tableName") - GetCommitsResponse.TableDoesNotExist(tableName) + logger.info(s"GET PROPERTIES > TABLE $tableName DOES NOT EXIST") + GetPropertiesResponse.TableDoesNotExist case tableData => - tableData.synchronized { - logger.info(s"table exists :: $tableName") - val tableCommitsStr = tableData.commits.map(f => s" - ${f.getPath}").mkString("\n") - logger.info(s"tableData.commits (size=${tableData.commits.size}):\n$tableCommitsStr") - - GetCommitsResponse.Success(tableData.commits.toList) // return an IMMUTABLE COPY - } + logger.info(s"GET PROPERTIES > TABLE $tableName EXISTS") + GetPropertiesResponse.Success(tableData.properties.toList) } } - override def commit( + override def setProperties( tableName: String, - commitFile: FileStatus, - updatedProtocol: Option[Protocol], - updatedMetadata: Option[Metadata]): CommitResponse = { + properties: List[(String, String)], + requirements: List[Requirement] = List.empty): SetPropertiesResponse = { catalogTables.get(tableName) match { case null => + logger.info(s"SET PROPERTIES > TABLE $tableName DOES NOT EXIST") stagingTables.get(tableName) match { case null => - logger.info(s"table does not exist :: $tableName") - CommitResponse.TableDoesNotExist(tableName) + logger.info(s"SET PROPERTIES > STAGING TABLE $tableName DOES NOT EXIST, EITHER") + SetPropertiesResponse.TableDoesNotExist case stagingTablePath => - logger.info(s"Committing to a staging table :: $tableName") - val commitVersion = FileNames.uuidCommitDeltaVersion(commitFile.getPath) - logger.info(s"[staging table] commitVersion: $commitVersion") - - if (commitVersion != 0) { - // TODO: support converting a fs table to a ccv2 table ??? - throw new RuntimeException( - s"[staging table] Expected first commit version 0 but got version $commitVersion") - } - val tableData = CatalogTableData( - path = stagingTablePath, - maxCommitVersion = commitVersion, - commits = scala.collection.mutable.ArrayBuffer(commitFile), - latestProtocol = updatedProtocol, - latestMetadata = updatedMetadata - ) - catalogTables.put(tableName, tableData) - stagingTables.remove(tableName) - CommitResponse.Success + logger.info(s"SET PROPERTIES > WRITING TO STAGING TABLE $stagingTablePath") + val data = CatalogTableData(scala.collection.mutable.TreeMap(properties: _*)) + catalogTables.put(tableName, data) + SetPropertiesResponse.Success } case tableData => - tableData.synchronized { - logger.info(s"table exists :: $tableName") - logger.info(s"commitFile: ${commitFile.getPath}") - - val expectedCommitVersion = tableData.maxCommitVersion + 1 - val commitVersion = FileNames.uuidCommitDeltaVersion(commitFile.getPath) - - logger.info(s"expectedCommitVersion: $expectedCommitVersion") - logger.info(s"commitVersion: $commitVersion") - - if (commitVersion != expectedCommitVersion) { - return CommitResponse.CommitVersionConflict( - commitVersion, - expectedCommitVersion, - tableData.commits.toList // return an IMMUTABLE COPY - ) + logger.info(s"SET PROPERTIES > TABLE $tableName EXISTS") + + requirements.foreach { requirement => + requirement.f.apply(tableData.properties.toMap) match { + case false => + logger.info(s"REQUIREMENT FAILED: ${requirement.name}") + return SetPropertiesResponse.RequirementFailed( + requirement, tableData.properties.toList) + case true => + logger.info(s"REQUIREMENT SUCCEEDED: ${requirement.name}") } - - tableData.maxCommitVersion = commitVersion - tableData.commits += commitFile - updatedProtocol.foreach(newP => tableData.latestProtocol = Some(newP)) - updatedMetadata.foreach(newM => tableData.latestMetadata = Some(newM)) - - val tableCommitsStr = tableData.commits.map(f => s" - ${f.getPath}").mkString("\n") - logger.info(s"tableData.commits:\n$tableCommitsStr") - - CommitResponse.Success } - } - } - - override def setLatestBackfilledVersion( - tableName: String, - latestBackfilledVersion: Long): SetLatestBackfilledVersionResponse = { - logger.info(s"tableName: $tableName, latestBackfilledVersion: $latestBackfilledVersion") - catalogTables.get(tableName) match { - case null => SetLatestBackfilledVersionResponse.TableDoesNotExist(tableName) - case tableData => - tableData.synchronized { - if (latestBackfilledVersion > tableData.latestBackfilledVersion.getOrElse(-1L)) { - tableData.latestBackfilledVersion = Some(latestBackfilledVersion) - logger.info(s"Updated latestBackfilledVersion to $latestBackfilledVersion") - - tableData.commits = tableData - .commits - .filter(f => FileNames.isUnbackfilledDeltaFile(f.getPath)) - .dropWhile(f => { - val doDrop = FileNames.uuidCommitDeltaVersion(f.getPath) <= latestBackfilledVersion - logger.info(s"Checking if we should drop ${f.getPath}: $doDrop") - doDrop - }) - logger.info(s"Removed catalog commits older than or equal to $latestBackfilledVersion") - } - SetLatestBackfilledVersionResponse.Success + properties.foreach { + case (key, null) => + logger.info(s"REMOVE KEY $key. Key exists? ${tableData.properties.contains(key)}") + tableData.properties -= key + case (key, value) => + tableData.properties.update(key, value) } + logger.info(s"PROPERTIES IS NOW:\n${tableData.properties.mkString("\n")}") + SetPropertiesResponse.Success } } }