Skip to content

Commit

Permalink
feat: version bump kafka-clients to 3.1.0 and clean up assorted warni…
Browse files Browse the repository at this point in the history
…ngs (#312)

* feat: version bump kafka-clients to 3.1.0 and clean up assorted warnings

* feat: version bump kafka-clients to 3.1.0 and clean up assorted warnings
  • Loading branch information
dpcollins-google authored May 24, 2022
1 parent 0b27e83 commit 85b4f67
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public abstract class ConsumerSettings {
abstract Optional<TopicPath> topicPathOverride();

public static Builder newBuilder() {
return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false);
return new AutoValue_ConsumerSettings.Builder().setAutocommit(false);
}

@AutoValue.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ static KafkaException toKafkaException(CheckedApiException source) {
return new KafkaException("Unimplemented.", source);
case UNKNOWN:
return new KafkaException("Unknown.", source);
default:
return new KafkaException("No case.", source);
}
return new KafkaException("No case.", source);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.protobuf.ByteString;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

Expand All @@ -48,12 +46,6 @@ public byte[] value() {
};
}

private static List<Header> toHeaders(String key, Collection<ByteString> values) {
ImmutableList.Builder<Header> headersBuilder = ImmutableList.builder();
values.forEach(value -> headersBuilder.add(toHeader(key, value)));
return headersBuilder.build();
}

@Override
public Headers add(Header header) throws IllegalStateException {
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -250,10 +251,6 @@ public void unsubscribe() {
consumer = Optional.empty();
}

private static Duration toDuration(long l, TimeUnit timeUnit) {
return Duration.ofMillis(TimeUnit.MILLISECONDS.convert(l, timeUnit));
}

@Override
public ConsumerRecords<byte[], byte[]> poll(long l) {
return poll(Duration.ofMillis(l));
Expand Down Expand Up @@ -433,11 +430,10 @@ public Map<TopicPartition, OffsetAndMetadata> committed(
.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableMap.Builder<TopicPartition, OffsetAndMetadata> output = ImmutableMap.builder();
targets.forEach(
partition -> {
output.put(
toTopicPartition(partition),
new OffsetAndMetadata(full_map.getOrDefault(partition, Offset.of(0)).value()));
});
partition ->
output.put(
toTopicPartition(partition),
new OffsetAndMetadata(full_map.getOrDefault(partition, Offset.of(0)).value())));
return output.build();
} catch (Throwable t) {
throw toKafka(t);
Expand Down Expand Up @@ -483,7 +479,7 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
map.entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey(),
Map.Entry::getKey,
entry ->
topicStatsClient.computeCursorForEventTime(
topicPath,
Expand Down Expand Up @@ -561,11 +557,6 @@ public void close() {
close(INFINITE_DURATION);
}

@Override
public void close(long l, TimeUnit timeUnit) {
close(toDuration(l, timeUnit));
}

@Override
public void close(Duration timeout) {
unsubscribe();
Expand Down Expand Up @@ -606,6 +597,12 @@ public void enforceRebalance() {
logger.atWarning().log("Calling enforceRebalance on a Pub/Sub Lite Consumer is a no-op.");
}

@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
logger.atWarning().log("Calling currentLag on a Pub/Sub Lite Consumer always returns empty.");
return OptionalLong.empty();
}

@Override
public void wakeup() {
requireValidConsumer().wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private static <T> T example(Class<T> klass) {
.put(TopicPartition.class, exampleTopicPartition())
.put(OffsetAndMetadata.class, exampleOffsetAndMetadata())
.build();
T instance = (T) map.getInstance(klass);
T instance = map.getInstance(klass);
if (instance != null) return instance;
return UnitTestExamples.example(klass);
}
Expand Down

0 comments on commit 85b4f67

Please sign in to comment.