Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
* @see ConsumerConfig
* @see ProducerConfig
*/
@SuppressWarnings("deprecation")
public class StreamsConfig extends AbstractConfig {

private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
this.builder = builder;
}
@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ public KTableKTableJoinNodeBuilder<K, V1, V2, VR> withStoreBuilder(final StoreBu
return this;
}

@SuppressWarnings("unchecked")
public KTableKTableJoinNode<K, V1, V2, VR> build() {
return new KTableKTableJoinNode<>(
nodeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcess
return oldProcessorSupplier;
}

@SuppressWarnings("unchecked")
KTableSource<KIn, VIn> kTableSourceSupplier() {
// This cast always works because KTableSource hasn't been converted yet.
return oldProcessorSupplier == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public String toString() {
"} " + super.toString();
}

@SuppressWarnings("unchecked")
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder, final Properties props) {
final String processorName = processorParameters.processorName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ private ProcessorAdapter(final org.apache.kafka.streams.processor.Processor<KIn,
this.delegate = delegate;
}

@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
// It only makes sense to use this adapter internally to Streams, in which case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ public void commit() {
streamTask.requestCommit();
}

@SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
@Override
public Cancellable schedule(final Duration interval,
final PunctuationType type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class RecordDeserializer {
* {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL}
* or throws an exception itself
*/
@SuppressWarnings("deprecation")
ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
final ConsumerRecord<byte[], byte[]> rawRecord) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
public final class RecordConverters {
private static final RecordConverter IDENTITY_INSTANCE = record -> record;

@SuppressWarnings("deprecation")
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public byte[] fetch(final Bytes key, final long timestamp) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation")
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
throw new UnsupportedOperationException();
Expand All @@ -77,7 +76,6 @@ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long tim
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
Expand Down Expand Up @@ -105,7 +103,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
throw new UnsupportedOperationException();
}

@SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public byte[] fetch(final Bytes key,
return wrapped.fetch(key, time);
}

@SuppressWarnings("deprecation")
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long timeFrom,
Expand All @@ -150,7 +149,6 @@ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
return wrapped.backwardFetch(key, timeFrom, timeTo);
}

@SuppressWarnings("deprecation")
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
Expand All @@ -167,7 +165,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes keyFr
return wrapped.backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
}

@SuppressWarnings("deprecation")
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public byte[] fetch(final Bytes key,
}

@Override
@SuppressWarnings("deprecation")
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long timeFrom,
final long timeTo) {
Expand Down Expand Up @@ -83,7 +82,6 @@ public WindowStoreIterator<byte[]> backwardFetch(final Bytes key,
}

@Override
@SuppressWarnings("deprecation")
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
final long timeFrom,
Expand Down Expand Up @@ -126,7 +124,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
}

@Override
@SuppressWarnings("deprecation")
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,6 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedExc
}
}

@SuppressWarnings("unchecked")
@Deprecated // testing old PAPI
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,6 @@ private KafkaStreams getKafkaStreams(final String appDir,

final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() {
@SuppressWarnings("unchecked")
@Override
public Transformer<Long, Long, KeyValue<Long, Long>> get() {
return new Transformer<Long, Long, KeyValue<Long, Long>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() thr
builder.addStateStore(keyValueStoreBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@SuppressWarnings("unchecked")
public class InternalStreamsBuilderTest {

private static final String APP_ID = "app-id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public void shouldReduceWithInternalStoreName() {
}
}

@SuppressWarnings("unchecked")
@Test
public void shouldReduceAndMaterializeResults() {
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
Expand Down Expand Up @@ -235,7 +234,6 @@ public void shouldReduceAndMaterializeResults() {
}
}

@SuppressWarnings("unchecked")
@Test
public void shouldCountAndMaterializeResults() {
builder
Expand Down Expand Up @@ -265,7 +263,6 @@ public void shouldCountAndMaterializeResults() {
}
}

@SuppressWarnings("unchecked")
@Test
public void shouldAggregateAndMaterializeResults() {
builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull
assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized) null));
}

@SuppressWarnings("unchecked")
@Test
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
assertThrows(NullPointerException.class, () -> stream.reduce(MockReducer.STRING_ADDER, (Named) null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
assertThrows(NullPointerException.class, () -> windowedStream.reduce(MockReducer.STRING_ADDER, (Named) null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
assertThrows(NullPointerException.class, () -> windowedStream.reduce(
MockReducer.STRING_ADDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.MatcherAssert.assertThat;

@SuppressWarnings("unchecked")
public class TransformerSupplierAdapterTest extends EasyMockSupport {

private ProcessorContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public void shouldSerdeWithNullsTest() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
assertThrows(UnsupportedVersionException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void shouldSerdeNullHashTest() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullKeyTest() {
final String originalKey = null;
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
Expand All @@ -68,7 +67,6 @@ public void shouldThrowExceptionOnNullKeyTest() {
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullInstructionTest() {
final String originalKey = "originalKey";
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
}

@Test
@SuppressWarnings("deprecation")
public void globalWindowStoreShouldBeReadOnly() {
doTest("GlobalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
Expand All @@ -238,7 +237,6 @@ public void globalWindowStoreShouldBeReadOnly() {


@Test
@SuppressWarnings("deprecation")
public void globalTimestampedWindowStoreShouldBeReadOnly() {
doTest("GlobalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
Expand Down Expand Up @@ -325,7 +323,6 @@ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
}

@Test
@SuppressWarnings("deprecation")
public void localWindowStoreShouldNotAllowInitOrClose() {
doTest("LocalWindowStore", (Consumer<WindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
Expand All @@ -345,7 +342,6 @@ public void localWindowStoreShouldNotAllowInitOrClose() {
}

@Test
@SuppressWarnings("deprecation")
public void localTimestampedWindowStoreShouldNotAllowInitOrClose() {
doTest("LocalTimestampedWindowStore", (Consumer<TimestampedWindowStore<String, Long>>) store -> {
verifyStoreCannotBeInitializedOrClosed(store);
Expand Down Expand Up @@ -615,7 +611,7 @@ private TimestampedKeyValueStore<String, Long> timestampedKeyValueStoreMock() {
return timestampedKeyValueStoreMock;
}

@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings("unchecked")
private WindowStore<String, Long> windowStoreMock() {
final WindowStore<String, Long> windowStore = mock(WindowStore.class);

Expand All @@ -638,7 +634,7 @@ private WindowStore<String, Long> windowStoreMock() {
return windowStore;
}

@SuppressWarnings({"unchecked", "deprecation"})
@SuppressWarnings("unchecked")
private TimestampedWindowStore<String, Long> timestampedWindowStoreMock() {
final TimestampedWindowStore<String, Long> windowStore = mock(TimestampedWindowStore.class);

Expand Down
Loading