Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Nov 11, 2020
1 parent 6809280 commit 7aadbc3
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 46 deletions.
5 changes: 3 additions & 2 deletions client/src/containers/Topic/TopicCopy/TopicCopy.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,12 @@ class TopicCopy extends Form {

async doSubmit() {
const { clusterId, topicId, formData, checked } = this.state;
await this.postApi(
const result = await this.postApi(
uriTopicsCopy(clusterId, topicId, formData.clusterListView, formData.topicListView),
this.createSubmitBody(formData, checked)
);
toast.success(`Copied to topic '${formData.topicListView}' successfully.`);

toast.success(`Copied ${result.data.records} records to topic '${formData.topicListView}' successfully.`);
}

checkedTopicOffset = (event) => {
Expand Down
54 changes: 31 additions & 23 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,38 +392,46 @@ public List<RecordRepository.TimeOffset> offsetsStart(String cluster, String top
@Secured(Role.ROLE_TOPIC_DATA_INSERT)
@Post("api/{fromCluster}/topic/{fromTopicName}/copy/{toCluster}/topic/{toTopicName}")
@Operation(tags = {"topic data"}, summary = "Copy from a topic to another topic")
public HttpResponse<?> copy(
public RecordRepository.CopyResult copy(
HttpRequest<?> request,
String fromCluster,
String fromTopicName,
String toCluster,
String toTopicName,
@Body List<OffsetCopy> offsets
) throws ExecutionException, InterruptedException {
Topic topic = this.topicRepository.findByName(fromCluster, fromTopicName);

if (CollectionUtils.isNotEmpty(offsets)) {
// after wait for next offset, so add - 1 to allow to have the current offset
String offsetsList = offsets.stream()
.filter(offsetCopy -> offsetCopy.offset - 1 >= 0)
.map(offsetCopy ->
String.join("-", String.valueOf(offsetCopy.partition), String.valueOf(offsetCopy.offset - 1)))
.collect(Collectors.joining("_"));

RecordRepository.Options options = dataSearchOptions(
fromCluster,
fromTopicName,
Optional.ofNullable(StringUtils.isNotEmpty(offsetsList) ? offsetsList : null),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

this.recordRepository.copy(topic, toCluster, toTopicName, offsets, options);
Topic fromTopic = this.topicRepository.findByName(fromCluster, fromTopicName);
Topic toTopic = this.topicRepository.findByName(toCluster, toTopicName);

if (!CollectionUtils.isNotEmpty(offsets)) {
throw new IllegalArgumentException("Empty collections");
}

return HttpResponse.noContent();
// after wait for next offset, so add - 1 to allow to have the current offset
String offsetsList = offsets.stream()
.filter(offsetCopy -> offsetCopy.offset - 1 >= 0)
.map(offsetCopy ->
String.join("-", String.valueOf(offsetCopy.partition), String.valueOf(offsetCopy.offset - 1)))
.collect(Collectors.joining("_"));

RecordRepository.Options options = dataSearchOptions(
fromCluster,
fromTopicName,
Optional.ofNullable(StringUtils.isNotEmpty(offsetsList) ? offsetsList : null),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty()
);

return this.recordRepository.copy(fromTopic, toCluster, toTopic, offsets, options);
}

@ToString
@EqualsAndHashCode
@Getter
public static class CopyResponse {
int records;
}

private RecordRepository.Options dataSearchOptions(
Expand Down
64 changes: 43 additions & 21 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -721,51 +721,73 @@ public Flowable<Event<TailEvent>> tail(String clusterId, TailOptions options) {
});
}

public void copy(Topic topic, String toClusterId, String toTopicName, List<TopicController.OffsetCopy> offsets, RecordRepository.Options options) {

KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(options.clusterId,
new Properties() {{ put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); }});
public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List<TopicController.OffsetCopy> offsets, RecordRepository.Options options) {
KafkaConsumer<byte[], byte[]> consumer = this.kafkaModule.getConsumer(
options.clusterId,
new Properties() {{
put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
}}
);

Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(topic, options, consumer);
Map<TopicPartition, Long> partitions = getTopicPartitionForSortOldest(fromTopic, options, consumer);

Map<TopicPartition, Long> filteredPartitions = partitions.entrySet().stream()
.filter(topicPartitionLongEntry -> offsets.stream().anyMatch(offsetCopy -> offsetCopy.getPartition() == topicPartitionLongEntry.getKey().partition()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.filter(topicPartitionLongEntry -> offsets.stream()
.anyMatch(offsetCopy -> offsetCopy.getPartition() == topicPartitionLongEntry.getKey().partition()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (filteredPartitions.size() > 0) {
int counter = 0;

if (filteredPartitions.size() > 0) {
consumer.assign(filteredPartitions.keySet());
filteredPartitions.forEach(consumer::seek);

if (log.isTraceEnabled()) {
filteredPartitions.forEach((topicPartition, first) ->
log.trace(
"Consume [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
log.trace(
"Consume [topic: {}] [partition: {}] [start: {}]",
topicPartition.topic(),
topicPartition.partition(),
first
)
);
}

boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size();

KafkaProducer<byte[], byte[]> producer = kafkaModule.getProducer(toClusterId);
ConsumerRecords<byte[], byte[]> records;
do {
records = this.poll(consumer);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + "-" + record.partition());

counter++;
producer.send(new ProducerRecord<>(
toTopicName,
record.partition(),
record.timestamp(),
record.key(),
record.value(),
record.headers()
toTopic.getName(),
samePartition ? record.partition() : null,
record.timestamp(),
record.key(),
record.value(),
record.headers()
));
}

} while(records != null && !records.isEmpty());
} while (!records.isEmpty());

producer.flush();
}
consumer.close();

return new CopyResult(counter);
}

@ToString
@EqualsAndHashCode
@AllArgsConstructor
@Getter
public static class CopyResult {
int records;
}

@ToString
Expand Down

0 comments on commit 7aadbc3

Please sign in to comment.