Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "KAFKA-15774: refactor windowed stores to use StoreFactory (#14708)" #14739

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public class StreamsConfig extends AbstractConfig {

public static final String ROCKS_DB = "rocksDB";
public static final String IN_MEMORY = "in_memory";
public static final String DEFAULT_DSL_STORE = ROCKS_DB;
public static final String DEFAULT_DSL_STORE_DEFAULT = ROCKS_DB;

/** {@code default.windowed.key.serde.inner} */
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -1001,7 +1001,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(DEFAULT_DSL_STORE_CONFIG,
Type.STRING,
DEFAULT_DSL_STORE,
DEFAULT_DSL_STORE_DEFAULT,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;

import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
Expand Down Expand Up @@ -219,7 +218,7 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
}

public Materialized.StoreType parseStoreType() {
return MaterializedInternal.parse(storeType);
return Materialized.StoreType.parse(storeType);
}

public boolean isNamedTopology() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -70,6 +71,17 @@ public class Materialized<K, V, S extends StateStore> {
public enum StoreType {
ROCKS_DB,
IN_MEMORY;

public static StoreType parse(final String storeType) {
switch (storeType) {
case StreamsConfig.IN_MEMORY:
return StoreType.IN_MEMORY;
case StreamsConfig.ROCKS_DB:
return StoreType.ROCKS_DB;
default:
throw new IllegalStateException("Unexpected storeType: " + storeType);
}
}
}

private Materialized(final StoreSupplier<S> storeSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
Expand All @@ -28,17 +31,38 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Materializes a key-value store as either a {@link TimestampedKeyValueStoreBuilder} or a
* {@link VersionedKeyValueStoreBuilder} depending on whether the store is versioned or not.
*/
public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K, V, KeyValueStore<Bytes, byte[]>> {
public class KeyValueStoreMaterializer<K, V> implements StoreFactory {
private static final Logger LOG = LoggerFactory.getLogger(KeyValueStoreMaterializer.class);

private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
private final Set<String> connectedProcessorNames = new HashSet<>();

private Materialized.StoreType defaultStoreType
= Materialized.StoreType.parse(StreamsConfig.DEFAULT_DSL_STORE_DEFAULT);

public KeyValueStoreMaterializer(
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized
) {
super(materialized);
this.materialized = materialized;

// this condition will never be false; in the next PR we will
// remove the initialization of storeType from MaterializedInternal
if (materialized.storeType() != null) {
defaultStoreType = materialized.storeType;
}
}

@Override
public void configure(final StreamsConfig config) {
// in a follow-up PR, this will set the defaultStoreType to the configured value
}

@Override
Expand Down Expand Up @@ -103,6 +127,21 @@ public long historyRetention() {
return ((VersionedBytesStoreSupplier) materialized.storeSupplier()).historyRetentionMs();
}

@Override
public Set<String> connectedProcessorNames() {
return connectedProcessorNames;
}

@Override
public boolean loggingEnabled() {
return materialized.loggingEnabled();
}

@Override
public String name() {
return materialized.storeName();
}

@Override
public boolean isWindowStore() {
return false;
Expand All @@ -113,4 +152,26 @@ public boolean isVersionedStore() {
return materialized.storeSupplier() instanceof VersionedBytesStoreSupplier;
}

@Override
public Map<String, String> logConfig() {
return materialized.logConfig();
}

@Override
public StoreFactory withCachingDisabled() {
materialized.withCachingDisabled();
return this;
}

@Override
public StoreFactory withLoggingDisabled() {
materialized.withLoggingDisabled();
return this;
}

@Override
public boolean isCompatibleWith(final StoreFactory storeFactory) {
return (storeFactory instanceof KeyValueStoreMaterializer)
&& ((KeyValueStoreMaterializer<?, ?>) storeFactory).materialized.equals(materialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.TopologyConfig;
Expand Down Expand Up @@ -60,17 +59,6 @@ public MaterializedInternal(final Materialized<K, V, S> materialized,
}
}

public static StoreType parse(final String storeType) {
switch (storeType) {
case StreamsConfig.IN_MEMORY:
return StoreType.IN_MEMORY;
case StreamsConfig.ROCKS_DB:
return StoreType.ROCKS_DB;
default:
throw new IllegalStateException("Unexpected storeType: " + storeType);
}
}

public String queryableStoreName() {
return queryable ? storeName() : null;
}
Expand Down

This file was deleted.

Loading