Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKUP] export topic data to multi-files by limiting size of archive… #1152

Merged
merged 1 commit into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
testImplementation project(':it')

implementation project(':common')
implementation project(':fs')
implementation "org.apache.kafka:kafka-clients:${versions["kafka"]}"
implementation "com.beust:jcommander:${versions["jcommander"]}"
// we don't use slf4j actually, and it is used by kafka so we swallow the log.
Expand Down
87 changes: 60 additions & 27 deletions app/src/main/java/org/astraea/app/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import org.astraea.common.Configuration;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.argument.PathField;
Expand All @@ -32,6 +36,7 @@
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.consumer.IteratorLimit;
import org.astraea.fs.FileSystem;

public class Exporter {

Expand All @@ -42,38 +47,50 @@ public static void main(String[] args) {
System.out.println(result);
}

public static Result execute(Argument argument) {
public static Map<TopicPartition, Stat> execute(Argument argument) {
if (!argument.output.toFile().isDirectory())
throw new IllegalArgumentException("--output must be a existent folder");
var root = argument.output.toFile();
var recordCount = new HashMap<TopicPartition, Long>();
for (var t : argument.topics) {
try (var fs = FileSystem.local(Configuration.EMPTY)) {
var iter =
Consumer.forTopics(Set.of(t))
Consumer.forTopics(Set.copyOf(argument.topics))
.bootstrapServers(argument.bootstrapServers())
.config(
ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG,
ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST)
.config(ConsumerConfigs.GROUP_ID_CONFIG, argument.group)
.iterator(List.of(IteratorLimit.idle(Duration.ofSeconds(3))));
// skip empty iter to avoid creating empty file
if (!iter.hasNext()) continue;
// TODO: we should create the folder for each partition
var file = new File(root, t);
var count = 0;
try (var writer = RecordWriter.builder(file).build()) {
while (iter.hasNext()) {
var record = iter.next();
writer.append(record);
count++;
recordCount.compute(
TopicPartition.of(record.topic(), record.partition()),
(k, v) -> v == null ? 1 : v + 1);
var stats = new HashMap<TopicPartition, Stat>();
var writers = new HashMap<TopicPartition, RecordWriter>();
while (iter.hasNext()) {
var record = iter.next();
var writer =
writers.computeIfAbsent(
record.topicPartition(),
ignored -> {
var topicFolder = new File(argument.output.toFile(), record.topic());
var partitionFolder = new File(topicFolder, String.valueOf(record.partition()));
var file = new File(partitionFolder, String.valueOf(record.offset()));
return RecordWriter.builder(fs.write(file.getAbsolutePath())).build();
});
writer.append(record);
// roll new writer in the future
if (writer.size().greaterThan(argument.size)) {
var stat = stats.computeIfAbsent(record.topicPartition(), Stat::new);
stat.count.add(writer.count());
stat.size.add(writer.size().bytes());
writers.remove(record.topicPartition()).close();
}
}
System.out.println("read " + count + " records from " + t);
// close all writers
writers.forEach(
(tp, writer) -> {
var stat = stats.computeIfAbsent(tp, Stat::new);
stat.count.add(writer.count());
stat.size.add(writer.size().bytes());
writer.close();
});
return Collections.unmodifiableMap(stats);
}
return new Result(recordCount);
}

static class Argument extends org.astraea.common.argument.Argument {
Expand All @@ -98,22 +115,38 @@ static class Argument extends org.astraea.common.argument.Argument {
description =
"String: the group id used by this exporter. You can run multiples exporter with same id in parallel")
String group = Utils.randomString();

@Parameter(
names = {"--archive.size"},
description = "DataSize: the max size of a archive file",
converter = DataSize.Field.class)
DataSize size = DataSize.MB.of(100);
}

public static class Result {
private final Map<TopicPartition, Long> recordCount;
public static class Stat {
public TopicPartition partition() {
return partition;
}

public long count() {
return count.sum();
}

private Result(Map<TopicPartition, Long> recordCount) {
this.recordCount = recordCount;
public DataSize size() {
return DataSize.Byte.of(size.sum());
}

public Map<TopicPartition, Long> recordCount() {
return recordCount;
private final TopicPartition partition;
private final LongAdder count = new LongAdder();
private final LongAdder size = new LongAdder();

public Stat(TopicPartition partition) {
this.partition = partition;
}

@Override
public String toString() {
return "Result{" + "recordCount=" + recordCount + '}';
return "Stat{" + "partition=" + partition + ", count=" + count + ", size=" + size + '}';
}
}
}
11 changes: 10 additions & 1 deletion app/src/main/java/org/astraea/app/backup/Importer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.argument.PathField;
import org.astraea.common.backup.RecordReader;
Expand Down Expand Up @@ -76,7 +78,14 @@ var record = iter.next();
if (current.isDirectory()) {
var fs = current.listFiles();
if (fs == null) continue;
files.addAll(Arrays.asList(fs));
// add files first
files.addAll(
Arrays.stream(fs)
.filter(File::isFile)
.sorted(Comparator.comparing(f -> Long.parseLong(f.getName())))
.collect(Collectors.toList()));
// add folders
files.addAll(Arrays.stream(fs).filter(File::isDirectory).collect(Collectors.toList()));
}
if (current.isFile()) process.accept(current);
}
Expand Down
31 changes: 22 additions & 9 deletions app/src/test/java/org/astraea/app/backup/TestImportExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,33 @@ void test() throws IOException {
"--bootstrap.servers", bootstrapServers(),
"--topics", String.join(",", topics),
"--output", file.toString(),
"--archive.size", "10Byte",
"--group", group
});
var exportResult = Exporter.execute(exportArg);
Assertions.assertEquals(topics.size(), exportResult.recordCount().size());
exportResult.recordCount().values().forEach(v -> Assertions.assertEquals(records, v));
// TODO: we should create the folder for each partition
Assertions.assertEquals(
topics.size(),
var stats = Exporter.execute(exportArg);
Assertions.assertEquals(topics.size(), stats.size());
stats.values().forEach(stat -> Assertions.assertEquals(records, stat.count()));
var topicFolders =
Arrays.stream(Objects.requireNonNull(file.toFile().listFiles()))
.filter(File::isFile)
.count());
.filter(File::isDirectory)
.collect(Collectors.toList());
Assertions.assertEquals(topics.size(), topicFolders.size());

var partitionFolders =
topicFolders.stream()
.flatMap(f -> Arrays.stream(Objects.requireNonNull(f.listFiles(File::isDirectory))))
.collect(Collectors.toList());
// each topic has single partition
Assertions.assertEquals(topics.size(), partitionFolders.size());
// archive size is very small, so it should export many files
partitionFolders.forEach(
folder ->
Assertions.assertTrue(
Objects.requireNonNull(folder.listFiles(File::isFile)).length > 1,
"files: " + Objects.requireNonNull(folder.listFiles(File::isFile)).length));

// use the same group and there is no more records
Assertions.assertEquals(0, Exporter.execute(exportArg).recordCount().size());
Assertions.assertEquals(0, Exporter.execute(exportArg).size());

// cleanup topics
try (var admin = Admin.of(bootstrapServers())) {
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/astraea/common/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public interface Configuration {

Configuration EMPTY = Configuration.of(Map.of());

static Configuration of(Map<String, String> configs) {
return new Configuration() {
@Override
Expand Down
11 changes: 11 additions & 0 deletions common/src/main/java/org/astraea/common/backup/RecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,22 @@
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import org.astraea.common.DataSize;
import org.astraea.common.consumer.Record;

public interface RecordWriter extends AutoCloseable {
void append(Record<byte[], byte[]> record);

/**
* @return size of all records
*/
DataSize size();

/**
* @return count of all records
*/
int count();

void flush();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.zip.GZIPOutputStream;
import org.astraea.common.DataSize;
import org.astraea.common.consumer.Record;

public class RecordWriterBuilder {

private static final Function<OutputStream, RecordWriter> V0 =
outputStream ->
new RecordWriter() {
private int recordCnt = 0;
private final AtomicInteger count = new AtomicInteger();
private final LongAdder size = new LongAdder();

@Override
public void append(Record<byte[], byte[]> record) {
Expand Down Expand Up @@ -60,6 +64,7 @@ public void append(Record<byte[], byte[]> record) {
+ (h.value() == null ? 0 : h.value().length) // [header value]
)
.sum();
size.add(recordSize);
// TODO reuse the recordBuffer
var recordBuffer = ByteBuffer.allocate(4 + recordSize);
recordBuffer.putInt(recordSize);
Expand All @@ -82,7 +87,17 @@ public void append(Record<byte[], byte[]> record) {
} catch (IOException e) {
throw new UncheckedIOException(e);
}
recordCnt++;
count.incrementAndGet();
}

@Override
public DataSize size() {
return DataSize.Byte.of(size.sum());
}

@Override
public int count() {
return count.get();
}

@Override
Expand All @@ -97,7 +112,7 @@ public void flush() {
@Override
public void close() {
try {
outputStream.write(ByteUtils.of(recordCnt).array());
outputStream.write(ByteUtils.of(count()).array());
outputStream.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
4 changes: 4 additions & 0 deletions common/src/main/java/org/astraea/common/consumer/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ static Builder<byte[], byte[]> builder() {
return new Builder<>();
}

default TopicPartition topicPartition() {
return TopicPartition.of(topic(), partition());
}

String topic();

List<Header> headers();
Expand Down