Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Remove Diamond and code code Alignment #8107

Merged
merged 2 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,

private boolean sendOldValues = false;

KStreamAggregate(final String storeName, final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator) {
KStreamAggregate(final String storeName,
final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator) {
this.storeName = storeName;
this.initializer = initializer;
this.aggregator = aggregator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
private final boolean outer;

KStreamKStreamJoin(final String otherWindowName, final long joinBeforeMs, final long joinAfterMs, final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, final boolean outer) {
KStreamKStreamJoin(final String otherWindowName,
final long joinBeforeMs,
final long joinAfterMs,
final ValueJoiner<? super V1, ? super V2, ? extends R> joiner,
final boolean outer) {
this.otherWindowName = otherWindowName;
this.joinBeforeMs = joinBeforeMs;
this.joinAfterMs = joinAfterMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@

class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {

private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
@Override
public K apply(final K key, final V1 value) {
return key;
}
};
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoiner<? super V1, ? super V2, R> joiner;
private final boolean leftJoin;

KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier, final ValueJoiner<? super V1, ? super V2, R> joiner, final boolean leftJoin) {
KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoiner<? super V1, ? super V2, R> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,15 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) {
public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
}

public MemoryLRUCacheBytesIterator all(final String namespace) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
return new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(namespace, this.metrics));
}
return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
}
Expand Down