diff --git a/app/build.gradle b/app/build.gradle index 6897ccd5be..9908c5dba1 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -44,6 +44,7 @@ dependencies { testImplementation project(':it') implementation project(':common') + implementation project(':fs') implementation "org.apache.kafka:kafka-clients:${versions["kafka"]}" implementation "com.beust:jcommander:${versions["jcommander"]}" // we don't use slf4j actually, and it is used by kafka so we swallow the log. diff --git a/app/src/main/java/org/astraea/app/backup/Exporter.java b/app/src/main/java/org/astraea/app/backup/Exporter.java index 97d384c252..ce17c2be9e 100644 --- a/app/src/main/java/org/astraea/app/backup/Exporter.java +++ b/app/src/main/java/org/astraea/app/backup/Exporter.java @@ -20,10 +20,14 @@ import java.io.File; import java.nio.file.Path; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.LongAdder; +import org.astraea.common.Configuration; +import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.admin.TopicPartition; import org.astraea.common.argument.PathField; @@ -32,6 +36,7 @@ import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.common.consumer.IteratorLimit; +import org.astraea.fs.FileSystem; public class Exporter { @@ -42,38 +47,50 @@ public static void main(String[] args) { System.out.println(result); } - public static Result execute(Argument argument) { + public static Map execute(Argument argument) { if (!argument.output.toFile().isDirectory()) throw new IllegalArgumentException("--output must be a existent folder"); - var root = argument.output.toFile(); - var recordCount = new HashMap(); - for (var t : argument.topics) { + try (var fs = FileSystem.local(Configuration.EMPTY)) { var iter = - Consumer.forTopics(Set.of(t)) + Consumer.forTopics(Set.copyOf(argument.topics)) .bootstrapServers(argument.bootstrapServers()) .config( ConsumerConfigs.AUTO_OFFSET_RESET_CONFIG, ConsumerConfigs.AUTO_OFFSET_RESET_EARLIEST) .config(ConsumerConfigs.GROUP_ID_CONFIG, argument.group) .iterator(List.of(IteratorLimit.idle(Duration.ofSeconds(3)))); - // skip empty iter to avoid creating empty file - if (!iter.hasNext()) continue; - // TODO: we should create the folder for each partition - var file = new File(root, t); - var count = 0; - try (var writer = RecordWriter.builder(file).build()) { - while (iter.hasNext()) { - var record = iter.next(); - writer.append(record); - count++; - recordCount.compute( - TopicPartition.of(record.topic(), record.partition()), - (k, v) -> v == null ? 1 : v + 1); + var stats = new HashMap(); + var writers = new HashMap(); + while (iter.hasNext()) { + var record = iter.next(); + var writer = + writers.computeIfAbsent( + record.topicPartition(), + ignored -> { + var topicFolder = new File(argument.output.toFile(), record.topic()); + var partitionFolder = new File(topicFolder, String.valueOf(record.partition())); + var file = new File(partitionFolder, String.valueOf(record.offset())); + return RecordWriter.builder(fs.write(file.getAbsolutePath())).build(); + }); + writer.append(record); + // roll new writer in the future + if (writer.size().greaterThan(argument.size)) { + var stat = stats.computeIfAbsent(record.topicPartition(), Stat::new); + stat.count.add(writer.count()); + stat.size.add(writer.size().bytes()); + writers.remove(record.topicPartition()).close(); } } - System.out.println("read " + count + " records from " + t); + // close all writers + writers.forEach( + (tp, writer) -> { + var stat = stats.computeIfAbsent(tp, Stat::new); + stat.count.add(writer.count()); + stat.size.add(writer.size().bytes()); + writer.close(); + }); + return Collections.unmodifiableMap(stats); } - return new Result(recordCount); } static class Argument extends org.astraea.common.argument.Argument { @@ -98,22 +115,38 @@ static class Argument extends org.astraea.common.argument.Argument { description = "String: the group id used by this exporter. You can run multiples exporter with same id in parallel") String group = Utils.randomString(); + + @Parameter( + names = {"--archive.size"}, + description = "DataSize: the max size of a archive file", + converter = DataSize.Field.class) + DataSize size = DataSize.MB.of(100); } - public static class Result { - private final Map recordCount; + public static class Stat { + public TopicPartition partition() { + return partition; + } + + public long count() { + return count.sum(); + } - private Result(Map recordCount) { - this.recordCount = recordCount; + public DataSize size() { + return DataSize.Byte.of(size.sum()); } - public Map recordCount() { - return recordCount; + private final TopicPartition partition; + private final LongAdder count = new LongAdder(); + private final LongAdder size = new LongAdder(); + + public Stat(TopicPartition partition) { + this.partition = partition; } @Override public String toString() { - return "Result{" + "recordCount=" + recordCount + '}'; + return "Stat{" + "partition=" + partition + ", count=" + count + ", size=" + size + '}'; } } } diff --git a/app/src/main/java/org/astraea/app/backup/Importer.java b/app/src/main/java/org/astraea/app/backup/Importer.java index d8f9aeb7a0..383266d8c5 100644 --- a/app/src/main/java/org/astraea/app/backup/Importer.java +++ b/app/src/main/java/org/astraea/app/backup/Importer.java @@ -20,10 +20,12 @@ import java.io.File; import java.nio.file.Path; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.astraea.common.admin.TopicPartition; import org.astraea.common.argument.PathField; import org.astraea.common.backup.RecordReader; @@ -76,7 +78,14 @@ var record = iter.next(); if (current.isDirectory()) { var fs = current.listFiles(); if (fs == null) continue; - files.addAll(Arrays.asList(fs)); + // add files first + files.addAll( + Arrays.stream(fs) + .filter(File::isFile) + .sorted(Comparator.comparing(f -> Long.parseLong(f.getName()))) + .collect(Collectors.toList())); + // add folders + files.addAll(Arrays.stream(fs).filter(File::isDirectory).collect(Collectors.toList())); } if (current.isFile()) process.accept(current); } diff --git a/app/src/test/java/org/astraea/app/backup/TestImportExport.java b/app/src/test/java/org/astraea/app/backup/TestImportExport.java index 3b1964b84e..f5094bf933 100644 --- a/app/src/test/java/org/astraea/app/backup/TestImportExport.java +++ b/app/src/test/java/org/astraea/app/backup/TestImportExport.java @@ -76,20 +76,33 @@ void test() throws IOException { "--bootstrap.servers", bootstrapServers(), "--topics", String.join(",", topics), "--output", file.toString(), + "--archive.size", "10Byte", "--group", group }); - var exportResult = Exporter.execute(exportArg); - Assertions.assertEquals(topics.size(), exportResult.recordCount().size()); - exportResult.recordCount().values().forEach(v -> Assertions.assertEquals(records, v)); - // TODO: we should create the folder for each partition - Assertions.assertEquals( - topics.size(), + var stats = Exporter.execute(exportArg); + Assertions.assertEquals(topics.size(), stats.size()); + stats.values().forEach(stat -> Assertions.assertEquals(records, stat.count())); + var topicFolders = Arrays.stream(Objects.requireNonNull(file.toFile().listFiles())) - .filter(File::isFile) - .count()); + .filter(File::isDirectory) + .collect(Collectors.toList()); + Assertions.assertEquals(topics.size(), topicFolders.size()); + + var partitionFolders = + topicFolders.stream() + .flatMap(f -> Arrays.stream(Objects.requireNonNull(f.listFiles(File::isDirectory)))) + .collect(Collectors.toList()); + // each topic has single partition + Assertions.assertEquals(topics.size(), partitionFolders.size()); + // archive size is very small, so it should export many files + partitionFolders.forEach( + folder -> + Assertions.assertTrue( + Objects.requireNonNull(folder.listFiles(File::isFile)).length > 1, + "files: " + Objects.requireNonNull(folder.listFiles(File::isFile)).length)); // use the same group and there is no more records - Assertions.assertEquals(0, Exporter.execute(exportArg).recordCount().size()); + Assertions.assertEquals(0, Exporter.execute(exportArg).size()); // cleanup topics try (var admin = Admin.of(bootstrapServers())) { diff --git a/common/src/main/java/org/astraea/common/Configuration.java b/common/src/main/java/org/astraea/common/Configuration.java index 29db093bbf..6d4d80399a 100644 --- a/common/src/main/java/org/astraea/common/Configuration.java +++ b/common/src/main/java/org/astraea/common/Configuration.java @@ -28,6 +28,8 @@ public interface Configuration { + Configuration EMPTY = Configuration.of(Map.of()); + static Configuration of(Map configs) { return new Configuration() { @Override diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriter.java b/common/src/main/java/org/astraea/common/backup/RecordWriter.java index 42c4b3456e..0a929b72f6 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriter.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriter.java @@ -21,11 +21,22 @@ import java.io.FileOutputStream; import java.io.OutputStream; import java.io.UncheckedIOException; +import org.astraea.common.DataSize; import org.astraea.common.consumer.Record; public interface RecordWriter extends AutoCloseable { void append(Record record); + /** + * @return size of all records + */ + DataSize size(); + + /** + * @return count of all records + */ + int count(); + void flush(); @Override diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index bf55f0affd..d81b168a3f 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -22,8 +22,11 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import java.util.zip.GZIPOutputStream; +import org.astraea.common.DataSize; import org.astraea.common.consumer.Record; public class RecordWriterBuilder { @@ -31,7 +34,8 @@ public class RecordWriterBuilder { private static final Function V0 = outputStream -> new RecordWriter() { - private int recordCnt = 0; + private final AtomicInteger count = new AtomicInteger(); + private final LongAdder size = new LongAdder(); @Override public void append(Record record) { @@ -60,6 +64,7 @@ public void append(Record record) { + (h.value() == null ? 0 : h.value().length) // [header value] ) .sum(); + size.add(recordSize); // TODO reuse the recordBuffer var recordBuffer = ByteBuffer.allocate(4 + recordSize); recordBuffer.putInt(recordSize); @@ -82,7 +87,17 @@ public void append(Record record) { } catch (IOException e) { throw new UncheckedIOException(e); } - recordCnt++; + count.incrementAndGet(); + } + + @Override + public DataSize size() { + return DataSize.Byte.of(size.sum()); + } + + @Override + public int count() { + return count.get(); } @Override @@ -97,7 +112,7 @@ public void flush() { @Override public void close() { try { - outputStream.write(ByteUtils.of(recordCnt).array()); + outputStream.write(ByteUtils.of(count()).array()); outputStream.flush(); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/common/src/main/java/org/astraea/common/consumer/Record.java b/common/src/main/java/org/astraea/common/consumer/Record.java index 0b0209afc1..75ab6925b6 100644 --- a/common/src/main/java/org/astraea/common/consumer/Record.java +++ b/common/src/main/java/org/astraea/common/consumer/Record.java @@ -44,6 +44,10 @@ static Builder builder() { return new Builder<>(); } + default TopicPartition topicPartition() { + return TopicPartition.of(topic(), partition()); + } + String topic(); List
headers();