Skip to content

Commit

Permalink
Create new test that creates lots of splits (#267)
Browse files Browse the repository at this point in the history
Co-authored-by: Keith Turner <kturner@apache.org>
  • Loading branch information
DomGarguilo and keith-turner authored Jan 5, 2024
1 parent 6611044 commit 3a4a09f
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 121 deletions.
9 changes: 7 additions & 2 deletions bin/cingest
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ Usage: cingest <application> {-o test.<prop>=<value>}
Available applications:
createtable Creates Accumulo table for continous ingest
createtable Creates Accumulo table for continuous ingest
ingest Inserts data into Accumulo that will form random graph.
walk Randomly walks the graph using a scanner
batchwalk Randomly walks the graph using a batch scanner
scan Scans the graph
verify Verifies continous ingest test. Stop ingest before running.
verify Verifies continuous ingest test. Stop ingest before running.
moru Stresses Accumulo by reading and writing to the ingest table.
Stop ingest before running.
manysplits Repeatedly lowers the split threshold on a table to create
many splits in order to test split performance
bulk Create RFiles in a Map Reduce job and calls importDirectory if successful
EOF
}
Expand Down Expand Up @@ -69,6 +71,9 @@ case "$1" in
moru)
ci_main="${ci_package}.ContinuousMoru"
;;
manysplits)
ci_main="${ci_package}.ManySplits"
;;
bulk)
if [ "$#" -ne 2 ]; then
echo "Usage : $0 $1 <bulk dir>"
Expand Down
23 changes: 23 additions & 0 deletions conf/accumulo-testing.properties
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,29 @@ test.ci.bulk.map.nodes=1000000
# produce a bulk import file.
test.ci.bulk.reducers.max=1024

# Splits Scaling
# -----------
# The number of tables to create
test.ci.split.table.count=3
# Minimum random row to generate
test.ci.split.ingest.row.min=0
# Maximum random row to generate
test.ci.split.ingest.row.max=9223372036854775807
# Maximum number of random column families to generate
test.ci.split.ingest.max.cf=32767
# Maximum number of random column qualifiers to generate
test.ci.split.ingest.max.cq=32767
# The number of tablets to create on each table on table creation
test.ci.split.initial.tablets=1
# The amount of data to write to each table
test.ci.split.write.size=10000000
# The split threshold to set for each table on creation
test.ci.split.threshold=1G
# The factor to reduce the split threshold by for each iteration of the test
test.ci.split.threshold.reduction.factor=10
# Number of rounds to run the test
test.ci.split.test.rounds=3

###############################
# Garbage Collection Simulation
###############################
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/apache/accumulo/testing/TestProps.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TestProps {
private static final String CI_SCANNER = CI + "scanner.";
private static final String CI_VERIFY = CI + "verify.";
private static final String CI_BULK = CI + "bulk.";
private static final String CI_SPLIT = CI + "split.";
public static final String TERASORT = PREFIX + "terasort.";
public static final String ROWHASH = PREFIX + "rowhash.";

Expand Down Expand Up @@ -148,6 +149,19 @@ public class TestProps {
public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes";
public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max";

/** Split **/
public static final String CI_SPLIT_TABLE_COUNT = CI_SPLIT + "table.count";
public static final String CI_SPLIT_INGEST_ROW_MIN = CI_SPLIT + "ingest.row.min";
public static final String CI_SPLIT_INGEST_ROW_MAX = CI_SPLIT + "ingest.row.max";
public static final String CI_SPLIT_INGEST_MAX_CF = CI_SPLIT + "ingest.max.cf";
public static final String CI_SPLIT_INGEST_MAX_CQ = CI_SPLIT + "ingest.max.cq";
public static final String CI_SPLIT_INITIAL_TABLETS = CI_SPLIT + "initial.tablets";
public static final String CI_SPLIT_WRITE_SIZE = CI_SPLIT + "write.size";
public static final String CI_SPLIT_THRESHOLD = CI_SPLIT + "threshold";
public static final String CI_SPLIT_THRESHOLD_REDUCTION_FACTOR =
CI_SPLIT + "threshold.reduction.factor";
public static final String CI_SPLIT_TEST_ROUNDS = CI_SPLIT + "test.rounds";

/** TeraSort **/
public static final String TERASORT_TABLE = TERASORT + "table";
public static final String TERASORT_NUM_ROWS = TERASORT + "num.rows";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,125 +106,134 @@ public static void main(String[] args) throws Exception {

final long rowMin = env.getRowMin();
final long rowMax = env.getRowMax();
Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
"Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");

String tableName = env.getAccumuloTableName();
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName,
"Consult the README and create the table before starting ingest.");
}
Properties testProps = env.getTestProperties();
final int maxColF = env.getMaxColF();
final int maxColQ = env.getMaxColQ();
Random random = env.getRandom();
final long numEntries =
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
final boolean checksum =
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));

byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());
doIngest(client, rowMin, rowMax, tableName, testProps, maxColF, maxColQ, numEntries, checksum,
random);
}
}

Properties testProps = env.getTestProperties();
protected static void doIngest(AccumuloClient client, long rowMin, long rowMax, String tableName,
Properties testProps, int maxColF, int maxColQ, long numEntries, boolean checksum,
Random random)
throws TableNotFoundException, MutationsRejectedException, InterruptedException {
Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
"Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");

long entriesWritten = 0L;
long entriesDeleted = 0L;
final int flushInterval = getFlushEntries(testProps);
log.info("A flush will occur after every {} entries written", flushInterval);
final int maxDepth = 25;
if (!client.tableOperations().exists(tableName)) {
throw new TableNotFoundException(null, tableName,
"Consult the README and create the table before starting ingest.");
}

// always want to point back to flushed data. This way the previous item should
// always exist in accumulo when verifying data. To do this make insert N point
// back to the row from insert (N - flushInterval). The array below is used to keep
// track of all inserts.
MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8),
System.currentTimeMillis());

long lastFlushTime = System.currentTimeMillis();
long entriesWritten = 0L;
long entriesDeleted = 0L;
final int flushInterval = getFlushEntries(testProps);
log.info("A flush will occur after every {} entries written", flushInterval);
final int maxDepth = 25;

final int maxColF = env.getMaxColF();
final int maxColQ = env.getMaxColQ();
final boolean checksum =
Boolean.parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
final long numEntries =
Long.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
log.info("Total entries to be written: {}", numEntries);
// always want to point back to flushed data. This way the previous item should
// always exist in accumulo when verifying data. To do this make insert N point
// back to the row from insert (N - flushInterval). The array below is used to keep
// track of all inserts.
MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];

visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
long lastFlushTime = System.currentTimeMillis();

pauseEnabled = pauseEnabled(testProps);
log.info("Total entries to be written: {}", numEntries);

pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
"Bad pause wait min/max, must conform to: 0 < min <= max");
visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));

if (pauseEnabled) {
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(env.getRandom());
log.info("PAUSING enabled");
log.info("INGESTING for {}s", pauseWaitSec);
}
pauseEnabled = pauseEnabled(testProps);

final float deleteProbability = getDeleteProbability(testProps);
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));
pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
"Bad pause wait min/max, must conform to: 0 < min <= max");

if (pauseEnabled) {
lastPauseNs = System.nanoTime();
pauseWaitSec = getPause(random);
log.info("PAUSING enabled");
log.info("INGESTING for {}s", pauseWaitSec);
}

try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(env.getRandom());
final float deleteProbability = getDeleteProbability(testProps);
log.info("DELETES will occur with a probability of {}",
String.format("%.02f", deleteProbability));

// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, env.getRandom());
try (BatchWriter bw = client.createBatchWriter(tableName)) {
out: while (true) {
ColumnVisibility cv = getVisibility(random);

byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);
// generate sets nodes that link to previous set of nodes
for (int depth = 0; depth < maxDepth; depth++) {
for (int index = 0; index < flushInterval; index++) {
long rowLong = genLong(rowMin, rowMax, random);

int cfInt = env.getRandom().nextInt(maxColF);
int cqInt = env.getRandom().nextInt(maxColQ);
byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row);

nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
prevRow, checksum);
entriesWritten++;
bw.addMutation(m);
}
int cfInt = random.nextInt(maxColF);
int cqInt = random.nextInt(maxColQ);

lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
pauseCheck(env.getRandom());
nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten,
prevRow, checksum);
entriesWritten++;
bw.addMutation(m);
}

// random chance that the entries will be deleted
final boolean delete = env.getRandom().nextFloat() < deleteProbability;

// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
entriesDeleted++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
pauseCheck(env.getRandom());
}
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
entriesWritten++;
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
if (entriesWritten >= numEntries)
break out;
pauseCheck(random);
}

// random chance that the entries will be deleted
final boolean delete = random.nextFloat() < deleteProbability;

// if the previously written entries are scheduled to be deleted
if (delete) {
log.info("Deleting last portion of written entries");
// add delete mutations in the reverse order in which they were written
for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
MutationInfo currentNode = nodeMap[depth][index];
Mutation m = new Mutation(genRow(currentNode.row));
m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
entriesDeleted++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
pauseCheck(random);
}

if (entriesWritten >= numEntries)
break out;
pauseCheck(env.getRandom());
} else {
// create one big linked list, this makes all the first inserts point to something
for (int index = 0; index < flushInterval - 1; index++) {
MutationInfo firstEntry = nodeMap[0][index];
MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv,
ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum);
entriesWritten++;
bw.addMutation(m);
}
lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime);
}

if (entriesWritten >= numEntries)
break out;
pauseCheck(random);
}
}
}
Expand Down
Loading

0 comments on commit 3a4a09f

Please sign in to comment.