Skip to content

Commit 72930ea

Browse files
committed
first cut of ccv2
1 parent 4e42e93 commit 72930ea

19 files changed

+687
-59
lines changed
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.defaults.benchmarks;
18+
19+
import io.delta.kernel.commit.CommitFailedException;
20+
import io.delta.kernel.commit.CommitMetadata;
21+
import io.delta.kernel.commit.CommitResponse;
22+
import io.delta.kernel.commit.Committer;
23+
import io.delta.kernel.data.Row;
24+
import io.delta.kernel.defaults.benchmarks.models.CCv2Info;
25+
import io.delta.kernel.defaults.benchmarks.models.TableInfo;
26+
import io.delta.kernel.engine.Engine;
27+
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
28+
import io.delta.kernel.internal.files.ParsedLogData;
29+
import io.delta.kernel.utils.CloseableIterator;
30+
import io.delta.kernel.utils.FileStatus;
31+
import io.delta.storage.commit.Commit;
32+
import io.delta.storage.commit.uccommitcoordinator.UCClient;
33+
import java.io.IOException;
34+
import java.net.URI;
35+
import java.nio.file.Paths;
36+
import java.util.ArrayList;
37+
import java.util.List;
38+
import java.util.Optional;
39+
import java.util.UUID;
40+
import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCClient;
41+
import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCCommitCoordinator;
42+
import scala.Option;
43+
44+
/**
45+
* Context for CCv2 (Coordinated Commits v2) tables in benchmarks.
46+
*
47+
* <p>This class encapsulates all the infrastructure needed to work with CCv2 tables, including the
48+
* Unity Catalog client, commit coordinator, and custom committer. It's created from a {@link
49+
* TableInfo} that has CCv2 configuration.
50+
*
51+
* <p>The context pre-populates the commit coordinator with staged commits from the CCv2Info
52+
* configuration, allowing benchmarks to read from and write to CCv2 tables.
53+
*/
54+
public class CCv2Context {
55+
56+
private final InMemoryUCCommitCoordinator coordinator;
57+
private final InMemoryUCClient ucClient;
58+
private final String tableId;
59+
private final URI tableUri;
60+
private final BenchmarkingCCv2Committer committer;
61+
private final List<ParsedLogData> parsedLogData;
62+
private final String tableRoot;
63+
64+
/**
65+
* Private constructor. Use {@link #createFromTableInfo(TableInfo, Engine)} to create instances.
66+
*/
67+
private CCv2Context(
68+
InMemoryUCCommitCoordinator coordinator,
69+
InMemoryUCClient ucClient,
70+
String tableId,
71+
URI tableUri,
72+
BenchmarkingCCv2Committer committer,
73+
List<ParsedLogData> parsedLogData,
74+
String tableRoot) {
75+
this.coordinator = coordinator;
76+
this.ucClient = ucClient;
77+
this.tableId = tableId;
78+
this.tableUri = tableUri;
79+
this.committer = committer;
80+
this.parsedLogData = parsedLogData;
81+
this.tableRoot = tableRoot;
82+
}
83+
84+
/** @return the committer for CCv2 commits */
85+
public Committer getCommitter() {
86+
return committer;
87+
}
88+
89+
/** @return the list of parsed log data (staged commits) for SnapshotBuilder */
90+
public List<ParsedLogData> getParsedLogData() {
91+
return parsedLogData;
92+
}
93+
94+
/**
95+
* Creates a CCv2Context from a TableInfo that has CCv2 configuration.
96+
*
97+
* <p>This method:
98+
*
99+
* <ol>
100+
* <li>Loads CCv2Info from the TableInfo
101+
* <li>Creates an InMemoryUCCommitCoordinator
102+
* <li>Pre-populates the coordinator with staged commits from the log_tail
103+
* <li>Creates an InMemoryUCClient wrapping the coordinator
104+
* <li>Creates a BenchmarkingCCv2Committer for writes
105+
* <li>Converts staged commits to ParsedLogData for SnapshotBuilder
106+
* </ol>
107+
*
108+
* @param tableInfo the TableInfo containing CCv2 configuration
109+
* @param engine the Engine to use for filesystem operations
110+
* @return a CCv2Context ready for use
111+
* @throws IllegalArgumentException if the TableInfo is not a CCv2 table
112+
* @throws RuntimeException if there's an error setting up the CCv2 infrastructure
113+
*/
114+
public static CCv2Context createFromTableInfo(TableInfo tableInfo, Engine engine) {
115+
if (!tableInfo.isCCv2Table()) {
116+
throw new IllegalArgumentException("TableInfo does not have CCv2 configuration");
117+
}
118+
119+
try {
120+
// 1. Load CCv2Info
121+
CCv2Info ccv2Info = tableInfo.getCCv2Info();
122+
String tableRoot = tableInfo.getResolvedTableRoot();
123+
124+
// 2. Create coordinator
125+
InMemoryUCCommitCoordinator coordinator = new InMemoryUCCommitCoordinator();
126+
127+
// 3. Generate table ID and URI
128+
String tableId = UUID.randomUUID().toString();
129+
URI tableUri = Paths.get(tableRoot).toUri();
130+
131+
// 4. Pre-populate coordinator with log_tail commits
132+
List<ParsedLogData> parsedLogDataList = new ArrayList<>();
133+
for (CCv2Info.StagedCommit stagedCommit : ccv2Info.getLogTail()) {
134+
// Get file info for the staged commit using Engine's filesystem
135+
String stagedCommitPath = stagedCommit.getFullPath(tableRoot);
136+
FileStatus fileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath);
137+
138+
// Register with coordinator (use full path to the staged commit)
139+
coordinator.commitToCoordinator(
140+
tableId,
141+
tableUri,
142+
Option.apply(stagedCommitPath), // commitFileName (full path)
143+
Option.apply(stagedCommit.getVersion()), // commitVersion
144+
Option.apply(fileStatus.getSize()), // commitFileSize
145+
Option.apply(fileStatus.getModificationTime()), // commitFileModTime
146+
Option.apply(System.currentTimeMillis()), // commitTimestamp
147+
Option.empty(), // lastKnownBackfilledVersion
148+
false, // isDisownCommit
149+
Option.empty(), // protocolOpt
150+
Option.empty() // metadataOpt
151+
);
152+
153+
// Convert to ParsedLogData
154+
parsedLogDataList.add(ParsedCatalogCommitData.forFileStatus(fileStatus));
155+
}
156+
157+
// 5. Create UCClient
158+
String metastoreId = "benchmark-metastore-" + tableId;
159+
InMemoryUCClient ucClient = new InMemoryUCClient(metastoreId, coordinator);
160+
161+
// 6. Create committer
162+
BenchmarkingCCv2Committer committer =
163+
new BenchmarkingCCv2Committer(ucClient, tableId, tableUri, tableRoot);
164+
165+
// 7. Return context
166+
return new CCv2Context(
167+
coordinator, ucClient, tableId, tableUri, committer, parsedLogDataList, tableRoot);
168+
169+
} catch (Exception e) {
170+
throw new RuntimeException("Failed to create CCv2Context", e);
171+
}
172+
}
173+
174+
/**
175+
* Helper method to convert Kernel FileStatus to Hadoop FileStatus.
176+
*
177+
* @param kernelFileStatus Kernel FileStatus to convert
178+
* @return Hadoop FileStatus
179+
*/
180+
private static org.apache.hadoop.fs.FileStatus kernelFileStatusToHadoopFileStatus(
181+
io.delta.kernel.utils.FileStatus kernelFileStatus) {
182+
return new org.apache.hadoop.fs.FileStatus(
183+
kernelFileStatus.getSize() /* length */,
184+
false /* isDirectory */,
185+
1 /* blockReplication */,
186+
128 * 1024 * 1024 /* blockSize (128MB) */,
187+
kernelFileStatus.getModificationTime() /* modificationTime */,
188+
kernelFileStatus.getModificationTime() /* accessTime */,
189+
org.apache.hadoop.fs.permission.FsPermission.getFileDefault() /* permission */,
190+
"unknown" /* owner */,
191+
"unknown" /* group */,
192+
new org.apache.hadoop.fs.Path(kernelFileStatus.getPath()) /* path */);
193+
}
194+
195+
/**
196+
* Committer implementation for CCv2 benchmarks.
197+
*
198+
* <p>This committer writes staged commits to the `_staged_commits/` directory and registers them
199+
* with the Unity Catalog coordinator.
200+
*/
201+
static class BenchmarkingCCv2Committer implements Committer {
202+
private final UCClient ucClient;
203+
private final String tableId;
204+
private final URI tableUri;
205+
private final String tableRoot;
206+
207+
public BenchmarkingCCv2Committer(
208+
UCClient ucClient, String tableId, URI tableUri, String tableRoot) {
209+
this.ucClient = ucClient;
210+
this.tableId = tableId;
211+
this.tableUri = tableUri;
212+
this.tableRoot = tableRoot;
213+
}
214+
215+
@Override
216+
public CommitResponse commit(
217+
Engine engine, CloseableIterator<Row> finalizedActions, CommitMetadata commitMetadata)
218+
throws CommitFailedException {
219+
220+
long version = commitMetadata.getVersion();
221+
String stagedCommitsDir = Paths.get(tableRoot, "_delta_log", "_staged_commits").toString();
222+
223+
// Ensure _staged_commits directory exists using Engine's filesystem
224+
try {
225+
engine.getFileSystemClient().mkdirs(stagedCommitsDir);
226+
} catch (IOException e) {
227+
throw new CommitFailedException(
228+
true /* retryable */,
229+
false /* conflict */,
230+
"Failed to create _staged_commits directory: " + e.getMessage(),
231+
e);
232+
}
233+
234+
// 1. Write staged commit with UUID name
235+
String commitUuid = UUID.randomUUID().toString();
236+
String stagedCommitFileName = String.format("%020d.%s.json", version, commitUuid);
237+
String stagedCommitPath = Paths.get(stagedCommitsDir, stagedCommitFileName).toString();
238+
239+
try {
240+
// Write the staged commit file
241+
engine
242+
.getJsonHandler()
243+
.writeJsonFileAtomically(stagedCommitPath, finalizedActions, false /* overwrite */);
244+
245+
// Get file status
246+
FileStatus stagedFileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath);
247+
248+
// Convert to Hadoop FileStatus
249+
org.apache.hadoop.fs.FileStatus hadoopFileStatus =
250+
kernelFileStatusToHadoopFileStatus(stagedFileStatus);
251+
252+
// 2. Register with UCClient
253+
Commit commit =
254+
new Commit(
255+
version, hadoopFileStatus, System.currentTimeMillis() // commitTimestamp
256+
);
257+
258+
ucClient.commit(
259+
tableId,
260+
tableUri,
261+
Optional.of(commit),
262+
Optional.empty(), // lastKnownBackfilledVersion
263+
false, // disown
264+
Optional.empty(), // newMetadata
265+
Optional.empty() // newProtocol
266+
);
267+
268+
// Return commit response with the staged commit file
269+
return new CommitResponse(ParsedCatalogCommitData.forFileStatus(stagedFileStatus));
270+
271+
} catch (IOException e) {
272+
throw new CommitFailedException(
273+
true /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e);
274+
} catch (Exception e) {
275+
throw new CommitFailedException(
276+
false /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e);
277+
}
278+
}
279+
}
280+
}

kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public static void main(String[] args) throws RunnerException, IOException {
103103
// TODO: In the future, we can filter specific workloads using command line args here.
104104
List<WorkloadSpec> variants = spec.getWorkloadVariants();
105105
for (WorkloadSpec variant : variants) {
106-
if (variant.getType().equals("write")) {
106+
System.out.println("Variant: " + variant.getFullName());
107+
if (variant.getTableInfo().name.equals("basic_ccv2")) {
107108
filteredSpecs.add(variant);
108109
}
109110
}
@@ -113,6 +114,8 @@ public static void main(String[] args) throws RunnerException, IOException {
113114
String[] workloadSpecsArray =
114115
filteredSpecs.stream().map(WorkloadSpec::toJsonString).toArray(String[]::new);
115116

117+
System.out.println("Filtered specs list: " + Arrays.toString(workloadSpecsArray));
118+
116119
// Configure and run JMH benchmark with the loaded workload specs
117120
Options opt =
118121
new OptionsBuilder()
@@ -128,6 +131,6 @@ public static void main(String[] args) throws RunnerException, IOException {
128131
.addProfiler(KernelMetricsProfiler.class)
129132
.build();
130133

131-
new Runner(opt).run();
134+
new Runner(opt, new WorkloadOutputFormat()).run();
132135
}
133136
}

kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadOutputFormat.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@
2424
import java.util.HashMap;
2525
import org.openjdk.jmh.infra.BenchmarkParams;
2626
import org.openjdk.jmh.infra.IterationParams;
27-
import org.openjdk.jmh.results.BenchmarkResult;
28-
import org.openjdk.jmh.results.IterationResult;
29-
import org.openjdk.jmh.results.Result;
30-
import org.openjdk.jmh.results.RunResult;
27+
import org.openjdk.jmh.results.*;
3128
import org.openjdk.jmh.runner.format.OutputFormat;
3229
import org.openjdk.jmh.util.Statistics;
3330

@@ -313,33 +310,40 @@ public void endRun(Collection<RunResult> result) {
313310
HashMap<String, BenchmarkDetails> benchmarks = new HashMap<>();
314311

315312
for (RunResult res : result) {
316-
for (BenchmarkResult br : res.getBenchmarkResults()) {
317-
try {
318-
WorkloadSpec spec =
319-
WorkloadSpec.fromJsonString(br.getParams().getParam("workloadSpecJson"));
320-
HashMap<String, String> additionalParams = new HashMap<>();
321-
additionalParams.put("engine", br.getParams().getParam("engineName"));
322-
323-
HashMap<String, Object> secondaryMetrics = new HashMap<>();
324-
for (String resultKey : br.getSecondaryResults().keySet()) {
325-
Result r = br.getSecondaryResults().get(resultKey);
326-
if (r instanceof org.openjdk.jmh.results.SampleTimeResult) {
327-
secondaryMetrics.put(r.getLabel(), TimingMetric.fromResult(r));
328-
} else if (r instanceof org.openjdk.jmh.results.ScalarResult) {
313+
System.out.println("Run result: " + res.toString());
314+
BenchmarkResult br = res.getAggregatedResult();
315+
System.out.println("Benchmark results: " + br.toString());
316+
try {
317+
WorkloadSpec spec =
318+
WorkloadSpec.fromJsonString(br.getParams().getParam("workloadSpecJson"));
319+
HashMap<String, String> additionalParams = new HashMap<>();
320+
additionalParams.put("engine", br.getParams().getParam("engineName"));
321+
322+
HashMap<String, Object> secondaryMetrics = new HashMap<>();
323+
for (String resultKey : br.getSecondaryResults().keySet()) {
324+
Result r = br.getSecondaryResults().get(resultKey);
325+
if (r instanceof org.openjdk.jmh.results.SampleTimeResult) {
326+
secondaryMetrics.put(r.getLabel(), TimingMetric.fromResult(r));
327+
} else if (r instanceof org.openjdk.jmh.results.ScalarResult) {
328+
ScalarResult scalarResult = (ScalarResult) r;
329+
if (scalarResult.getScoreUnit().equals("count")) {
330+
// Treat as a long count metric
329331
secondaryMetrics.put(r.getLabel(), (long) r.getScore());
332+
} else {
333+
secondaryMetrics.put(r.getLabel(), r.getScore());
330334
}
331335
}
332-
333-
BenchmarkDetails details =
334-
new BenchmarkDetails(
335-
spec,
336-
additionalParams,
337-
TimingMetric.fromResult(br.getPrimaryResult()),
338-
secondaryMetrics);
339-
benchmarks.put(spec.getFullName(), details);
340-
} catch (IOException e) {
341-
throw new RuntimeException(e);
342336
}
337+
338+
BenchmarkDetails details =
339+
new BenchmarkDetails(
340+
spec,
341+
additionalParams,
342+
TimingMetric.fromResult(br.getPrimaryResult()),
343+
secondaryMetrics);
344+
benchmarks.put(spec.getFullName(), details);
345+
} catch (IOException e) {
346+
throw new RuntimeException(e);
343347
}
344348
}
345349

0 commit comments

Comments
 (0)