Skip to content

Commit

Permalink
fixes sorting in bulk batch writer
Browse files Browse the repository at this point in the history
BulkBatchWriter was sorting only on the mutation row.  Occasionally
continuous ingest would generate the same 64 bit id twice in the same
batch and if its columns were not sorted properly then the write to the
rfile would fail.

This fixes the problem by sorting on the entire key instead of only the
row.
  • Loading branch information
keith-turner committed Dec 10, 2024
1 parent bc5379c commit f72630e
Showing 1 changed file with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.accumulo.testing.continuous;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
Expand All @@ -32,6 +35,7 @@
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -47,7 +51,7 @@ public class BulkBatchWriter implements BatchWriter {

private static final Logger log = LoggerFactory.getLogger(BulkBatchWriter.class);

private final List<Mutation> mutations = new ArrayList<>();
private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
private final String tableName;
private final FileSystem fileSystem;
Expand All @@ -72,7 +76,7 @@ public BulkBatchWriter(AccumuloClient client, String tableName, FileSystem fileS
public synchronized void addMutation(Mutation mutation) throws MutationsRejectedException {
Preconditions.checkState(!closed);
mutation = new Mutation(mutation);
mutations.add(mutation);
mutations.addLast(mutation);
memUsed += mutation.estimatedMemoryUsed();
if (memUsed > memLimit) {
flush();
Expand All @@ -96,22 +100,43 @@ public synchronized void flush() throws MutationsRejectedException {

Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
fileSystem.mkdirs(tmpDir);
mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow()));

List<KeyValue> keysValues = new ArrayList<>(mutations.size());

// remove mutations from the dequeue as we convert them to Keys making the Mutation objects
// available for garbage collection
Mutation mutation;
while ((mutation = mutations.pollFirst()) != null) {
for (var columnUpdate : mutation.getUpdates()) {
var builder = Key.builder(false).row(mutation.getRow())
.family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
.visibility(columnUpdate.getColumnVisibility());
if (columnUpdate.hasTimestamp()) {
builder = builder.timestamp(columnUpdate.getTimestamp());
}
Key key = builder.deleted(columnUpdate.isDeleted()).build();
keysValues.add(new KeyValue(key, columnUpdate.getValue()));
}
}

Comparator<KeyValue> kvComparator = (kv1, kv2) -> kv1.getKey().compareTo(kv2.getKey());
keysValues.sort(kvComparator);

RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;

var loadPlanBuilder = LoadPlan.builder();

for (var mutation : mutations) {
for (var keyValue : keysValues) {
var key = keyValue.getKey();
if (writer == null
|| (currEndRow != null && Arrays.compare(mutation.getRow(), currEndRow) > 0)) {
|| (currEndRow != null && Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}

var row = new Text(mutation.getRow());
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
Expand All @@ -126,17 +151,7 @@ public synchronized void flush() throws MutationsRejectedException {
log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow);
}

for (var colUpdate : mutation.getUpdates()) {
var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(),
colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility());
if (colUpdate.hasTimestamp()) {
key.setTimestamp(colUpdate.getTimestamp());
}
if (colUpdate.isDeleted()) {
key.setDeleted(true);
}
writer.append(key, colUpdate.getValue());
}
writer.append(key, keyValue.getValue());
}

if (writer != null) {
Expand Down

0 comments on commit f72630e

Please sign in to comment.