Skip to content

Commit

Permalink
HDDS-11723. Tool to better micro benchmark hbase performance in Ozone (
Browse files Browse the repository at this point in the history
…#7463)

Co-authored-by: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com>
Co-authored-by: Siyao Meng <50227127+smengcl@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent cc1a374 commit 579a38e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ public void test() throws IOException {
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);

String rootPath = String.format("%s://%s/%s/%s/",
OZONE_OFS_URI_SCHEME, cluster.getConf().get(OZONE_OM_ADDRESS_KEY),
volumeName, bucketName);
String rootPath = String.format("%s://%s/%s/%s/", OZONE_OFS_URI_SCHEME,
cluster.getConf().get(OZONE_OM_ADDRESS_KEY), volumeName, bucketName);

int exitCode = cmd.execute(
"--path", rootPath,
"--bytes-per-write", "1024",
"--number-of-files", "2",
"--bytes-per-write", "8",
"--writes-per-transaction", "64",
"-t", "5",
"-n", "100");
assertEquals(0, exitCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,39 @@
*/
package org.apache.hadoop.ozone.freon;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;

import com.codahale.metrics.Timer;
import org.apache.hadoop.ozone.util.PayloadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Data generator tool test hsync/write synchronization performance.
* This tool simulates the way HBase writes transaction logs (WAL) to a file in Ozone:
* - Transactions are written to the file's OutputStream by a single thread, each transaction is numbered by an
* increasing counter. Every transaction can be serialized to the OutputStream via multiple write calls.
* - Multiple threads checks and sync (hsync) the OutputStream to make it persistent.
*
* Example usage:
*
* To generate 1000 hsync calls with 10 threads on a single file:
* ozone freon hsync-generator -t 10 --bytes-per-write=1024 -n 1000
*
* To generate 1000 hsync calls with 10 threads on 3 files simultaneously:
* To simulate hlog that generates 1M hsync calls with 5 threads:
*
* ozone freon hsync-generator -t 10 --bytes-per-write=1024 --number-of-files=3 -n 1000
* ozone freon hsync-generator -t 5 --writes-per-transaction=32 --bytes-per-write=8 -n 1000000
*
*/
@Command(name = "hg",
Expand All @@ -53,32 +57,38 @@
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true,
showDefaultValues = true)
public class HsyncGenerator extends HadoopNestedDirGenerator implements Callable<Void> {
public class HsyncGenerator extends BaseFreonGenerator implements Callable<Void> {
private static final Logger LOG = LoggerFactory.getLogger(HsyncGenerator.class);

@CommandLine.ParentCommand
private Freon freon;

@Option(names = {"--path"},
description = "Hadoop FS file system path. Use full path.",
defaultValue = "o3fs://bucket1.vol1")
private String rootPath;

@Option(names = {"--bytes-per-write"},
description = "Size of each write",
defaultValue = "1024")
defaultValue = "8")
private int writeSize;

@Option(names = {"--number-of-files"},
description = "Number of files to run test.",
defaultValue = "1")
private int numberOfFiles;
@Option(names = {"--writes-per-transaction"},
description = "Size of each write",
defaultValue = "32")
private int writesPerTransaction;

private Timer timer;

private OzoneConfiguration configuration;
private FSDataOutputStream[] outputStreams;
private Path[] files;
private AtomicInteger[] callsPerFile;
private FSDataOutputStream outputStream;
private byte[] data;
private final BlockingQueue<Integer> writtenTransactions = new ArrayBlockingQueue<>(10_000);
private final AtomicInteger lastSyncedTransaction = new AtomicInteger();

public HsyncGenerator() {
}
private byte[] data;


@VisibleForTesting
HsyncGenerator(OzoneConfiguration ozoneConfiguration) {
Expand All @@ -87,55 +97,75 @@ public HsyncGenerator() {

@Override
public Void call() throws Exception {
super.init();
init();

if (configuration == null) {
configuration = freon.createOzoneConfiguration();
}
URI uri = URI.create(rootPath);

outputStreams = new FSDataOutputStream[numberOfFiles];
files = new Path[numberOfFiles];
callsPerFile = new AtomicInteger[numberOfFiles];
FileSystem fileSystem = getFileSystem();
for (int i = 0; i < numberOfFiles; i++) {
Path file = new Path(getRootPath() + "/" + generateObjectName(i));
fileSystem.mkdirs(file.getParent());
outputStreams[i] = fileSystem.create(file);
files[i] = file;
callsPerFile[i] = new AtomicInteger();

LOG.info("Created file for testing: {}", file);
}
FileSystem fileSystem = FileSystem.get(uri, configuration);
Path file = new Path(rootPath + "/" + generateObjectName(0));
fileSystem.mkdirs(file.getParent());
outputStream = fileSystem.create(file);

LOG.info("Created file for testing: {}", file);

timer = getMetrics().timer("hsync-generator");
data = PayloadUtils.generatePayload(writeSize);

startTransactionWriter();

try {
runTests(this::sendHsync);
} finally {
for (FSDataOutputStream outputStream : outputStreams) {
outputStream.close();
}
outputStream.close();
fileSystem.close();
}

StringBuilder distributionReport = new StringBuilder();
for (int i = 0; i < numberOfFiles; i++) {
distributionReport.append("\t").append(files[i]).append(": ").append(callsPerFile[i]).append("\n");
}
return null;
}

LOG.info("Hsync generator finished, calls distribution: \n {}", distributionReport);
private void startTransactionWriter() {
Thread transactionWriter = new Thread(this::generateTransactions);
transactionWriter.setDaemon(true);
transactionWriter.start();
}

return null;
private void generateTransactions() {
int transaction = 0;
while (true) {
for (int i = 0; i < writesPerTransaction; i++) {
try {
if (writeSize > 1) {
outputStream.write(data);
} else {
outputStream.write(i);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

try {
writtenTransactions.put(transaction++);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private void sendHsync(long counter) throws Exception {
timer.time(() -> {
int i = ((int) counter) % numberOfFiles;
FSDataOutputStream outputStream = outputStreams[i];
outputStream.write(data);
outputStream.hsync();
callsPerFile[i].incrementAndGet();
return null;
while (true) {
int transaction = writtenTransactions.take();
int lastSynced = lastSyncedTransaction.get();
if (transaction > lastSynced) {
outputStream.hsync();
lastSyncedTransaction.compareAndSet(lastSynced, transaction);
return null;
}
}
});
}
}

0 comments on commit 579a38e

Please sign in to comment.