Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.1' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Dec 10, 2024
2 parents ed97a2f + 8a76d06 commit 9f597c6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
8 changes: 8 additions & 0 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ test.ci.ingest.bulk.workdir=
# When using bulk import to ingest data this determines how much memory can be used to buffer mutations before creating
# rfiles and importing them.
test.ci.ingest.bulk.memory.limit=512000000
# Enables Zipfian distribution for value size. If set to true, the value will have random bytes inserted into it with a size generated based on a Zipfian distribution.
test.ci.ingest.zipfian.enabled=true
# Minimum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.min.size=0
# Maximum size to insert into the value when Zipfian distribution is enabled
test.ci.ingest.zipfian.max.size=10000
# Exponent of the Zipfian distribution
test.ci.ingest.zipfian.exponent=1.5

# Batch walker
# ------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.testing.TestProps;
import org.apache.accumulo.testing.util.FastFormat;
import org.apache.commons.math3.random.RandomDataGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
Expand All @@ -66,6 +67,13 @@ public class ContinuousIngest {
private static int pauseMin;
private static int pauseMax;

private static boolean zipfianEnabled;
private static int minSize;
private static int maxSize;
private static double exponent;

private static RandomDataGenerator rnd;

public interface RandomGeneratorFactory extends Supplier<LongSupplier> {
static RandomGeneratorFactory create(ContinuousEnv env, AccumuloClient client,
Supplier<SortedSet<Text>> splitSupplier, Random random) {
Expand Down Expand Up @@ -321,6 +329,19 @@ protected static void doIngest(AccumuloClient client, RandomGeneratorFactory ran
String.format("%.02f", deleteProbability));

try (BatchWriter bw = batchWriterFactory.create(tableName)) {
zipfianEnabled =
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));

if (zipfianEnabled) {
minSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
maxSize = Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
exponent = Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
rnd = new RandomDataGenerator();

log.info("Zipfian distribution enabled with min size: {}, max size: {}, exponent: {}",
minSize, maxSize, exponent);
}

out: while (true) {
ColumnVisibility cv = getVisibility(random);

Expand Down Expand Up @@ -469,25 +490,54 @@ public static byte[] genRow(long rowLong) {

public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow,
Checksum cksum) {
int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
final int numOfSeparators = 4;
int dataLen =
ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + numOfSeparators;
if (cksum != null)
dataLen += 8;

int zipfLength = 0;
if (zipfianEnabled) {
// add the length of the zipfian data to the value
int range = maxSize - minSize;
zipfLength = rnd.nextZipf(range, exponent) + minSize;
dataLen += zipfLength;
}

byte[] val = new byte[dataLen];

// add the ingest instance id to the value
System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
int index = ingestInstanceId.length;

val[index++] = ':';

// add the count of entries written to the value
int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES);
if (added != 16)
throw new RuntimeException(" " + added);
index += 16;

val[index++] = ':';

// add the previous row to the value
if (prevRow != null) {
System.arraycopy(prevRow, 0, val, index, prevRow.length);
index += prevRow.length;
}

val[index++] = ':';

if (zipfianEnabled) {
// add random data to the value of length zipfLength
for (int i = 0; i < zipfLength; i++) {
val[index++] = (byte) rnd.nextInt(0, 256);
}

val[index++] = ':';
}

// add the checksum to the value
if (cksum != null) {
cksum.update(val, 0, index);
cksum.getValue();
Expand Down

0 comments on commit 9f597c6

Please sign in to comment.