|
| 1 | +/* |
| 2 | + * Copyright (2025) The Delta Lake Project Authors. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package io.delta.kernel.defaults.benchmarks; |
| 18 | + |
| 19 | +import io.delta.kernel.commit.CommitFailedException; |
| 20 | +import io.delta.kernel.commit.CommitMetadata; |
| 21 | +import io.delta.kernel.commit.CommitResponse; |
| 22 | +import io.delta.kernel.commit.Committer; |
| 23 | +import io.delta.kernel.data.Row; |
| 24 | +import io.delta.kernel.defaults.benchmarks.models.CCv2Info; |
| 25 | +import io.delta.kernel.defaults.benchmarks.models.TableInfo; |
| 26 | +import io.delta.kernel.engine.Engine; |
| 27 | +import io.delta.kernel.internal.files.ParsedCatalogCommitData; |
| 28 | +import io.delta.kernel.internal.files.ParsedLogData; |
| 29 | +import io.delta.kernel.utils.CloseableIterator; |
| 30 | +import io.delta.kernel.utils.FileStatus; |
| 31 | +import io.delta.storage.commit.Commit; |
| 32 | +import io.delta.storage.commit.uccommitcoordinator.UCClient; |
| 33 | +import java.io.IOException; |
| 34 | +import java.net.URI; |
| 35 | +import java.nio.file.Paths; |
| 36 | +import java.util.ArrayList; |
| 37 | +import java.util.List; |
| 38 | +import java.util.Optional; |
| 39 | +import java.util.UUID; |
| 40 | +import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCClient; |
| 41 | +import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCCommitCoordinator; |
| 42 | +import scala.Option; |
| 43 | + |
| 44 | +/** |
| 45 | + * This class encapsulates all the infrastructure needed to work with CCv2 tables. It's created from |
| 46 | + * a {@link TableInfo} that has CCv2 configuration. |
| 47 | + * |
| 48 | + * <p>The context pre-populates the commit coordinator with staged commits from the CCv2Info |
| 49 | + * configuration, allowing benchmarks to read from and write to CCv2 tables. |
| 50 | + */ |
| 51 | +public class CCv2Context { |
| 52 | + |
| 53 | + private final BenchmarkingCCv2Committer committer; |
| 54 | + private final List<ParsedLogData> parsedLogData; |
| 55 | + |
| 56 | + /** |
| 57 | + * Private constructor. Use {@link #createFromTableInfo(TableInfo, Engine)} to create instances. |
| 58 | + */ |
| 59 | + private CCv2Context(BenchmarkingCCv2Committer committer, List<ParsedLogData> parsedLogData) { |
| 60 | + this.committer = committer; |
| 61 | + this.parsedLogData = parsedLogData; |
| 62 | + } |
| 63 | + |
| 64 | + /** @return the committer for CCv2 commits */ |
| 65 | + public Committer getCommitter() { |
| 66 | + return committer; |
| 67 | + } |
| 68 | + |
| 69 | + /** @return the list of parsed log data (staged commits) for SnapshotBuilder */ |
| 70 | + public List<ParsedLogData> getParsedLogData() { |
| 71 | + return parsedLogData; |
| 72 | + } |
| 73 | + |
| 74 | + /** |
| 75 | + * Creates a CCv2Context from a TableInfo that has CCv2 configuration. |
| 76 | + * |
| 77 | + * @param tableInfo the TableInfo containing CCv2 configuration |
| 78 | + * @param engine the Engine to use for filesystem operations |
| 79 | + * @return a CCv2Context ready for use |
| 80 | + * @throws IllegalArgumentException if the TableInfo is not a CCv2 table |
| 81 | + * @throws RuntimeException if there's an error setting up the CCv2 infrastructure |
| 82 | + */ |
| 83 | + public static CCv2Context createFromTableInfo(TableInfo tableInfo, Engine engine) { |
| 84 | + if (!tableInfo.isCCv2Enabled()) { |
| 85 | + throw new IllegalArgumentException("TableInfo does not have CCv2 configuration"); |
| 86 | + } |
| 87 | + |
| 88 | + try { |
| 89 | + CCv2Info ccv2Info = tableInfo.getCCv2Info(); |
| 90 | + String tableRoot = tableInfo.getResolvedTableRoot(); |
| 91 | + |
| 92 | + // Since this isn't a real UC, generate dummy tableId and tableUri |
| 93 | + String tableId = UUID.randomUUID().toString(); |
| 94 | + URI tableUri = Paths.get(tableRoot).toUri(); |
| 95 | + |
| 96 | + // Create in-memory commit coordinator and pre-populate with staged commits from the |
| 97 | + // CCv2Info log_tail |
| 98 | + InMemoryUCCommitCoordinator coordinator = new InMemoryUCCommitCoordinator(); |
| 99 | + List<ParsedLogData> parsedLogDataList = new ArrayList<>(); |
| 100 | + for (CCv2Info.StagedCommit stagedCommit : ccv2Info.getLogTail()) { |
| 101 | + // Get file info for the staged commit using Engine's filesystem. We do this to |
| 102 | + // get accurate file size and modification time. |
| 103 | + String stagedCommitPath = stagedCommit.getFullPath(tableRoot); |
| 104 | + FileStatus fileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath); |
| 105 | + |
| 106 | + coordinator.commitToCoordinator( |
| 107 | + tableId, |
| 108 | + tableUri, |
| 109 | + Option.apply(stagedCommitPath), // commitFileName (full path) |
| 110 | + Option.apply(stagedCommit.getVersion()), // commitVersion |
| 111 | + Option.apply(fileStatus.getSize()), // commitFileSize |
| 112 | + Option.apply(fileStatus.getModificationTime()), // commitFileModTime |
| 113 | + Option.apply(System.currentTimeMillis()), // commitTimestamp |
| 114 | + Option.empty(), // lastKnownBackfilledVersion |
| 115 | + false, // isDisownCommit |
| 116 | + Option.empty(), // protocolOpt |
| 117 | + Option.empty() // metadataOpt |
| 118 | + ); |
| 119 | + |
| 120 | + parsedLogDataList.add(ParsedCatalogCommitData.forFileStatus(fileStatus)); |
| 121 | + } |
| 122 | + |
| 123 | + // Create a committer using an in-memory UC client. We don't need real UC integration for |
| 124 | + // benchmarks. |
| 125 | + String metastoreId = "benchmark-metastore-" + tableId; |
| 126 | + InMemoryUCClient ucClient = new InMemoryUCClient(metastoreId, coordinator); |
| 127 | + BenchmarkingCCv2Committer committer = |
| 128 | + new BenchmarkingCCv2Committer(ucClient, tableId, tableUri, tableRoot); |
| 129 | + |
| 130 | + return new CCv2Context(committer, parsedLogDataList); |
| 131 | + |
| 132 | + } catch (IOException e) { |
| 133 | + throw new RuntimeException( |
| 134 | + "Failed to create CCv2Context: I/O error reading staged commits", e); |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + /** |
| 139 | + * Helper method to convert Kernel FileStatus to Hadoop FileStatus. |
| 140 | + * |
| 141 | + * @param kernelFileStatus Kernel FileStatus to convert |
| 142 | + * @return Hadoop FileStatus |
| 143 | + */ |
| 144 | + private static org.apache.hadoop.fs.FileStatus kernelFileStatusToHadoopFileStatus( |
| 145 | + FileStatus kernelFileStatus) { |
| 146 | + return new org.apache.hadoop.fs.FileStatus( |
| 147 | + kernelFileStatus.getSize() /* length */, |
| 148 | + false /* isDirectory */, |
| 149 | + 1 /* blockReplication */, |
| 150 | + 128 * 1024 * 1024 /* blockSize (128MB) */, |
| 151 | + kernelFileStatus.getModificationTime() /* modificationTime */, |
| 152 | + kernelFileStatus.getModificationTime() /* accessTime */, |
| 153 | + org.apache.hadoop.fs.permission.FsPermission.getFileDefault() /* permission */, |
| 154 | + "unknown" /* owner */, |
| 155 | + "unknown" /* group */, |
| 156 | + new org.apache.hadoop.fs.Path(kernelFileStatus.getPath()) /* path */); |
| 157 | + } |
| 158 | + |
| 159 | + /** |
| 160 | + * Committer implementation for CCv2 benchmarks. |
| 161 | + * |
| 162 | + * <p>This committer writes staged commits to the `_staged_commits/` directory and registers them |
| 163 | + * with the Unity Catalog coordinator. |
| 164 | + */ |
| 165 | + static class BenchmarkingCCv2Committer implements Committer { |
| 166 | + private final UCClient ucClient; |
| 167 | + private final String tableId; |
| 168 | + private final URI tableUri; |
| 169 | + private final String tableRoot; |
| 170 | + |
| 171 | + BenchmarkingCCv2Committer(UCClient ucClient, String tableId, URI tableUri, String tableRoot) { |
| 172 | + this.ucClient = ucClient; |
| 173 | + this.tableId = tableId; |
| 174 | + this.tableUri = tableUri; |
| 175 | + this.tableRoot = tableRoot; |
| 176 | + } |
| 177 | + |
| 178 | + @Override |
| 179 | + public CommitResponse commit( |
| 180 | + Engine engine, CloseableIterator<Row> finalizedActions, CommitMetadata commitMetadata) |
| 181 | + throws CommitFailedException { |
| 182 | + |
| 183 | + long version = commitMetadata.getVersion(); |
| 184 | + String stagedCommitsDir = Paths.get(tableRoot, "_delta_log", "_staged_commits").toString(); |
| 185 | + |
| 186 | + String commitUuid = UUID.randomUUID().toString(); |
| 187 | + String stagedCommitFileName = String.format("%020d.%s.json", version, commitUuid); |
| 188 | + String stagedCommitPath = Paths.get(stagedCommitsDir, stagedCommitFileName).toString(); |
| 189 | + |
| 190 | + try { |
| 191 | + // Write the staged commit file |
| 192 | + engine |
| 193 | + .getJsonHandler() |
| 194 | + .writeJsonFileAtomically(stagedCommitPath, finalizedActions, false /* overwrite */); |
| 195 | + |
| 196 | + FileStatus stagedFileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath); |
| 197 | + Commit commit = |
| 198 | + new Commit( |
| 199 | + version, |
| 200 | + kernelFileStatusToHadoopFileStatus(stagedFileStatus), |
| 201 | + System.currentTimeMillis() // commitTimestamp |
| 202 | + ); |
| 203 | + |
| 204 | + ucClient.commit( |
| 205 | + tableId, |
| 206 | + tableUri, |
| 207 | + Optional.of(commit), |
| 208 | + Optional.empty(), // lastKnownBackfilledVersion |
| 209 | + false, // disown |
| 210 | + Optional.empty(), // newMetadata |
| 211 | + Optional.empty() // newProtocol |
| 212 | + ); |
| 213 | + |
| 214 | + // Return commit response with the staged commit file |
| 215 | + return new CommitResponse(ParsedCatalogCommitData.forFileStatus(stagedFileStatus)); |
| 216 | + |
| 217 | + } catch (IOException e) { |
| 218 | + throw new CommitFailedException( |
| 219 | + true /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e); |
| 220 | + } catch (Exception e) { |
| 221 | + throw new CommitFailedException( |
| 222 | + false /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e); |
| 223 | + } |
| 224 | + } |
| 225 | + } |
| 226 | +} |
0 commit comments