Skip to content

Commit f7e1736

Browse files
author
Jacob Maes
committed
Samza 1214: Allow users to set a default replication.factor for intermediate topics
* Add a new "systems.sysName.default.stream.*" config structure that allows users to set system-wide defaults for streams. * More thorough testing of system defaults and stream defaults * Removed the old migration config from the config table since there's no code to support it. * Moved 2 kafka-specific config accessors out of JobConfig and into KafkaConfig * Removed duplicate impl of getChangelogStream() Author: Jacob Maes <jmaes@linkedin.com> Reviewers: Jagadish <jvenkatr@linkedin.com> Closes apache#141 from jmakes/samza-1214
1 parent 05060ed commit f7e1736

File tree

11 files changed

+379
-138
lines changed

11 files changed

+379
-138
lines changed

docs/learn/documentation/versioned/jobs/configuration-table.html

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -720,8 +720,19 @@ <h1>Samza Configuration Reference</h1>
720720
</tr>
721721

722722
<tr>
723-
<td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
724-
<td class="default" rowspan="2"></td>
723+
<td class="property" id="systems-default-stream">systems.<span class="system">system-name</span>.<br>default.stream.*</td>
724+
<td class="default"></td>
725+
<td class="description">
726+
A set of default properties for any stream associated with the system. For example, if
727+
"systems.kafka-system.default.stream.replication.factor"=2 was configured, then every Kafka stream
728+
created on the kafka-system will have a replication factor of 2 unless the property is explicitly
729+
overridden at the stream scope using <a href="#streams-properties">streams properties</a>.
730+
</td>
731+
</tr>
732+
733+
<tr>
734+
<td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.key.serde</td>
735+
<td class="default"></td>
725736
<td class="description">
726737
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
727738
<em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
@@ -734,17 +745,10 @@ <h1>Samza Configuration Reference</h1>
734745
the task and the output stream producer.
735746
</td>
736747
</tr>
737-
<tr>
738-
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
739-
<td class="description">
740-
This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
741-
streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
742-
</td>
743-
</tr>
744748

745749
<tr>
746-
<td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
747-
<td class="default" rowspan="2"></td>
750+
<td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.msg.serde</td>
751+
<td class="default"></td>
748752
<td class="description">
749753
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
750754
<em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
@@ -757,17 +761,10 @@ <h1>Samza Configuration Reference</h1>
757761
the task and the output stream producer.
758762
</td>
759763
</tr>
760-
<tr>
761-
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
762-
<td class="description">
763-
This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
764-
streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
765-
</td>
766-
</tr>
767764

768765
<tr>
769-
<td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
770-
<td class="default" rowspan="2">upcoming</td>
766+
<td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>default.stream.samza.offset.default</td>
767+
<td class="default">upcoming</td>
771768
<td class="description">
772769
If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
773770
this property determines where in the input stream we should start consuming. The value must be an
@@ -786,13 +783,6 @@ <h1>Samza Configuration Reference</h1>
786783
If both are defined, the stream-level definition takes precedence.
787784
</td>
788785
</tr>
789-
<tr>
790-
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
791-
<td class="description">
792-
This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
793-
streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
794-
</td>
795-
</tr>
796786

797787
<tr>
798788
<td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
@@ -1352,7 +1342,7 @@ <h1>Samza Configuration Reference</h1>
13521342
This property defines a store, Samza's mechanism for efficient
13531343
<a href="../container/state-management.html">stateful stream processing</a>. You can give a
13541344
store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span>
1355-
<em>default</em> is reserved for defining default store parameters), and use that name to get a
1345+
<em>default</em> is reserved for defining default store parameters), and use that name to get a
13561346
reference to the store in your stream task (call
13571347
<a href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
13581348
in your task's
@@ -1983,29 +1973,6 @@ <h1>Samza Configuration Reference</h1>
19831973
<td class="default"></td>
19841974
<td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
19851975
</tr>
1986-
1987-
<tr>
1988-
<th colspan="3" class="section" id="task-migration">
1989-
Migrating from Samza 0.9.1 to 0.10.0<br>
1990-
<span class="subtitle">
1991-
(This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set
1992-
<a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> to anything <b> other than </b>
1993-
<code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>)
1994-
</span>
1995-
</th>
1996-
</tr>
1997-
1998-
<tr>
1999-
<td class="property" id="task-checkpoint-skip-migration">task.checkpoint.skip-migration</td>
2000-
<td class="default">false</td>
2001-
<td class="description">
2002-
When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream. <br />
2003-
If you are using a checkpoint manager other than kafka
2004-
(<code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>), you have to
2005-
manually migrate taskName-to-changelog partition mapping to the coordinator stream. <br />
2006-
This can be achieved with the assistance of the <code>checkpoint-tool.sh</code>.
2007-
</td>
2008-
</tr>
20091976
</tbody>
20101977
</table>
20111978
</body>

samza-api/src/main/java/org/apache/samza/config/MapConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.samza.config;
2121

22+
import java.util.Arrays;
2223
import java.util.Collection;
2324
import java.util.Collections;
2425
import java.util.HashMap;
@@ -41,11 +42,15 @@ public MapConfig(Map<String, String> map) {
4142
}
4243

4344
public MapConfig(List<Map<String, String>> maps) {
44-
this.map = new HashMap<String, String>();
45+
this.map = new HashMap<>();
4546
for (Map<String, String> m: maps)
4647
this.map.putAll(m);
4748
}
4849

50+
public MapConfig(Map<String, String>... maps) {
51+
this(Arrays.asList(maps));
52+
}
53+
4954
public String get(Object k) {
5055
return map.get(k);
5156
}

samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public String getChangelogStream(String storeName) {
5858
// If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
5959
// these values will be combined into <asystem>.<astream>
6060
String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null);
61-
String changelogSystem = get(CHANGELOG_SYSTEM, null);
61+
String changelogSystem = getChangelogSystem();
6262

6363
String systemStreamRes;
6464
if (systemStream != null && !systemStream.contains(".")) {
@@ -85,4 +85,21 @@ public String getStorageKeySerde(String storeName) {
8585
public String getStorageMsgSerde(String storeName) {
8686
return get(String.format(MSG_SERDE, storeName), null);
8787
}
88+
89+
/**
90+
* Gets the System to use for reading/writing checkpoints. Uses the following precedence.
91+
*
92+
* 1. If job.changelog.system is defined, that value is used.
93+
* 2. If job.default.system is defined, that value is used.
94+
* 3. null
95+
*
96+
* Note: Changelogs can be defined using
97+
* stores.storeName.changelog=systemName.streamName or
98+
* stores.storeName.changelog=streamName
99+
*
100+
* If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used.
101+
*/
102+
public String getChangelogSystem() {
103+
return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
104+
}
88105
}

samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
public class JavaSystemConfig extends MapConfig {
3636
private static final String SYSTEM_PREFIX = "systems.";
3737
private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
38-
private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
38+
private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX;
39+
private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream.";
3940
private static final String EMPTY = "";
4041

4142
public JavaSystemConfig(Config config) {
@@ -46,7 +47,7 @@ public String getSystemFactory(String name) {
4647
if (name == null) {
4748
return null;
4849
}
49-
String systemFactory = String.format(SYSTEM_FACTORY, name);
50+
String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name);
5051
return get(systemFactory, null);
5152
}
5253

@@ -99,4 +100,11 @@ public Map<String, SystemFactory> getSystemFactories() {
99100

100101
return systemFactories;
101102
}
103+
104+
/**
105+
* Gets the system-wide defaults for streams.
106+
*/
107+
public Config getDefaultStreamProperties(String systemName) {
108+
return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true);
109+
}
102110
}

samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ package org.apache.samza.config
2323
import java.io.File
2424

2525
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
26-
import org.apache.samza.system.{RegexSystemStreamPartitionMatcher, SystemStreamPartitionMatcher}
2726
import org.apache.samza.util.Logging
2827

2928
object JobConfig {
@@ -47,8 +46,6 @@ object JobConfig {
4746
val JOB_CONTAINER_COUNT = "job.container.count"
4847
val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
4948
val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
50-
val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
51-
val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
5249
val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions"
5350

5451
val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
@@ -104,8 +101,22 @@ object JobConfig {
104101
class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
105102
def getName = getOption(JobConfig.JOB_NAME)
106103

107-
def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
108-
throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
104+
def getCoordinatorSystemName = {
105+
val system = getCoordinatorSystemNameOrNull
106+
if (system == null) {
107+
throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")
108+
}
109+
system
110+
}
111+
112+
/**
113+
* Gets the System to use for reading/writing the coordinator stream. Uses the following precedence.
114+
*
115+
* 1. If job.coordinator.system is defined, that value is used.
116+
* 2. If job.default.system is defined, that value is used.
117+
* 3. None
118+
*/
119+
def getCoordinatorSystemNameOrNull = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(getDefaultSystem.orNull)
109120

110121
def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
111122

@@ -144,31 +155,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
144155

145156
def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY)
146157

147-
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
148-
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
149-
150-
def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
151-
case Some(rplFactor) => rplFactor
152-
case _ =>
153-
getOption(CHECKPOINT_REPLICATION_FACTOR) match {
154-
case Some(rplFactor) =>
155-
info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_REPLICATION_FACTOR, CHECKPOINT_REPLICATION_FACTOR, rplFactor))
156-
rplFactor
157-
case _ => "3"
158-
}
159-
}
160-
161-
def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
162-
case Some(segBytes) => segBytes
163-
case _ =>
164-
getOption(CHECKPOINT_SEGMENT_BYTES) match {
165-
case Some(segBytes) =>
166-
info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_SEGMENT_BYTES, CHECKPOINT_SEGMENT_BYTES, segBytes))
167-
segBytes
168-
case _ => "26214400"
169-
}
170-
}
171-
172158
def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)
173159

174160
def getSSPMatcherConfigRegex = getExcept(JobConfig.SSP_MATCHER_CONFIG_REGEX)

samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,8 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
4646
def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
4747

4848
def getChangelogStream(name: String) = {
49-
// If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
50-
// If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
51-
// these values will be combined into <asystem>.<astream>
52-
val systemStream = getOption(CHANGELOG_STREAM format name)
53-
val changelogSystem = getOption(CHANGELOG_SYSTEM)
54-
val systemStreamRes =
55-
if ( systemStream.isDefined && ! systemStream.getOrElse("").contains('.')) {
56-
// contains only stream name
57-
if (changelogSystem.isDefined) {
58-
Some(changelogSystem.get + "." + systemStream.get)
59-
}
60-
else {
61-
throw new SamzaException("changelog system is not defined:" + systemStream.get)
62-
}
63-
} else {
64-
systemStream
65-
}
66-
systemStreamRes
49+
val javaStorageConfig = new JavaStorageConfig(config)
50+
Option(javaStorageConfig.getChangelogStream(name))
6751
}
6852

6953
def getChangeLogDeleteRetentionInMs(storeName: String) = {

samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
8181
* Returns a list of all SystemStreams that have a serde defined from the config file.
8282
*/
8383
def getSerdeStreams(systemName: String) = {
84+
val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName)
85+
val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE)
86+
8487
val subConf = config.subset("systems.%s.streams." format systemName, true)
8588
val legacySystemStreams = subConf
8689
.asScala
8790
.keys
88-
.filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
91+
.filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
8992
.map(k => {
9093
val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
9194
new SystemStream(systemName, streamName)
@@ -94,7 +97,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
9497
val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
9598
.asScala
9699
.keys
97-
.filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
100+
.filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde)
98101
.map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
99102
.filter(streamId => systemName.equals(getSystem(streamId)))
100103
.map(streamId => streamIdToSystemStream(streamId)).toSet
@@ -220,10 +223,13 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
220223
* @return the map of properties for the stream
221224
*/
222225
private def getSystemStreamProperties(systemName: String, streamName: String) = {
223-
if (systemName == null || streamName == null) {
226+
if (systemName == null) {
224227
Map()
225228
}
226-
config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
229+
val systemConfig = new JavaSystemConfig(config);
230+
val defaults = systemConfig.getDefaultStreamProperties(systemName);
231+
val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
232+
new MapConfig(defaults, explicitConfigs)
227233
}
228234

229235
/**

0 commit comments

Comments
 (0)