-
Notifications
You must be signed in to change notification settings - Fork 40.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Pulsar config props for startup policy #42180
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -822,6 +822,11 @@ public static class Listener { | |
*/ | ||
private boolean observationEnabled; | ||
|
||
/** | ||
* Startup settings. | ||
*/ | ||
private final Startup startup = new Startup(); | ||
|
||
public SchemaType getSchemaType() { | ||
return this.schemaType; | ||
} | ||
|
@@ -846,6 +851,10 @@ public void setObservationEnabled(boolean observationEnabled) { | |
this.observationEnabled = observationEnabled; | ||
} | ||
|
||
public Startup getStartup() { | ||
return this.startup; | ||
} | ||
|
||
} | ||
|
||
public static class Reader { | ||
|
@@ -876,6 +885,11 @@ public static class Reader { | |
*/ | ||
private boolean readCompacted; | ||
|
||
/** | ||
* Startup settings. | ||
*/ | ||
private final Startup startup = new Startup(); | ||
|
||
public String getName() { | ||
return this.name; | ||
} | ||
|
@@ -916,6 +930,53 @@ public void setReadCompacted(boolean readCompacted) { | |
this.readCompacted = readCompacted; | ||
} | ||
|
||
public Startup getStartup() { | ||
return this.startup; | ||
} | ||
|
||
} | ||
|
||
public static class Startup { | ||
|
||
/** | ||
* The max time to wait for the container to start. | ||
*/ | ||
private Duration timeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a default for this property? If so I think we should use the opportunity to set the default so that it's documented, and very in a test that it's consistent in case it changes. |
||
|
||
/** | ||
* The action to take when the container fails to start. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same. I suggest |
||
*/ | ||
private FailurePolicy onFailure; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the previous behavior was to continue and log an exception. There's no default value here so it's hard to see if that's still the case. If that's the case, this field should have a default value that matches the default behavior. |
||
|
||
public Duration getTimeout() { | ||
return this.timeout; | ||
} | ||
|
||
public void setTimeout(Duration timeout) { | ||
this.timeout = timeout; | ||
} | ||
|
||
public FailurePolicy getOnFailure() { | ||
return this.onFailure; | ||
} | ||
|
||
public void setOnFailure(FailurePolicy onFailure) { | ||
this.onFailure = onFailure; | ||
} | ||
|
||
} | ||
|
||
public enum FailurePolicy { | ||
|
||
/** Stop the container and throw exception. */ | ||
STOP, | ||
|
||
/** Stop the container but do not throw exception. */ | ||
CONTINUE, | ||
|
||
/** Retry startup asynchronously. */ | ||
RETRY | ||
|
||
} | ||
|
||
public static class Template { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,8 +37,10 @@ | |
import org.apache.pulsar.client.api.ServiceUrlProvider; | ||
import org.apache.pulsar.client.impl.AutoClusterFailover.AutoClusterFailoverBuilderImpl; | ||
|
||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; | ||
import org.springframework.boot.context.properties.PropertyMapper; | ||
import org.springframework.boot.json.JsonWriter; | ||
import org.springframework.pulsar.config.StartupFailurePolicy; | ||
import org.springframework.pulsar.core.PulsarTemplate; | ||
import org.springframework.pulsar.listener.PulsarContainerProperties; | ||
import org.springframework.pulsar.reader.PulsarReaderContainerProperties; | ||
|
@@ -198,6 +200,17 @@ private void customizePulsarContainerListenerProperties(PulsarContainerPropertie | |
map.from(properties::getSchemaType).to(containerProperties::setSchemaType); | ||
map.from(properties::getConcurrency).to(containerProperties::setConcurrency); | ||
map.from(properties::isObservationEnabled).to(containerProperties::setObservationEnabled); | ||
customizeListenerStartupProperties(containerProperties); | ||
} | ||
|
||
private void customizeListenerStartupProperties(PulsarContainerProperties containerProperties) { | ||
PulsarProperties.Startup properties = this.properties.getListener().getStartup(); | ||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); | ||
map.from(properties::getTimeout).to(containerProperties::setConsumerStartTimeout); | ||
map.from(properties::getOnFailure) | ||
.as(FailurePolicy::name) | ||
.as(StartupFailurePolicy::valueOf) | ||
.to(containerProperties::setStartupFailurePolicy); | ||
} | ||
|
||
<T> void customizeReaderBuilder(ReaderBuilder<T> readerBuilder) { | ||
|
@@ -214,6 +227,17 @@ void customizeReaderContainerProperties(PulsarReaderContainerProperties readerCo | |
PulsarProperties.Reader properties = this.properties.getReader(); | ||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); | ||
map.from(properties::getTopics).to(readerContainerProperties::setTopics); | ||
customizeReaderStartupProperties(readerContainerProperties); | ||
} | ||
|
||
private void customizeReaderStartupProperties(PulsarReaderContainerProperties readerContainerProperties) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's unfortunate that this is the exact same code as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code to de-duplicate was more complicated than having this in multiple places. |
||
PulsarProperties.Startup properties = this.properties.getReader().getStartup(); | ||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); | ||
map.from(properties::getTimeout).to(readerContainerProperties::setReaderStartTimeout); | ||
map.from(properties::getOnFailure) | ||
.as(FailurePolicy::name) | ||
.as(StartupFailurePolicy::valueOf) | ||
.to(readerContainerProperties::setStartupFailurePolicy); | ||
} | ||
|
||
private Consumer<Duration> timeoutProperty(BiConsumer<Integer, TimeUnit> setter) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,9 @@ | |
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder; | ||
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder; | ||
|
||
import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.FailurePolicy; | ||
import org.springframework.boot.context.properties.PropertyMapper; | ||
import org.springframework.pulsar.config.StartupFailurePolicy; | ||
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties; | ||
|
||
/** | ||
|
@@ -96,6 +98,16 @@ private void customizePulsarContainerListenerProperties(ReactivePulsarContainerP | |
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); | ||
map.from(properties::getSchemaType).to(containerProperties::setSchemaType); | ||
map.from(properties::getConcurrency).to(containerProperties::setConcurrency); | ||
customizeListenerStartupProperties(containerProperties); | ||
} | ||
|
||
private void customizeListenerStartupProperties(ReactivePulsarContainerProperties<?> containerProperties) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here as well. |
||
PulsarProperties.Startup properties = this.properties.getListener().getStartup(); | ||
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); | ||
map.from(properties::getOnFailure) | ||
.as(FailurePolicy::name) | ||
.as(StartupFailurePolicy::valueOf) | ||
.to(containerProperties::setStartupFailurePolicy); | ||
} | ||
|
||
void customizeMessageReaderBuilder(ReactiveMessageReaderBuilder<?> builder) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Description of configuration properties do not start with
the
,a
, etc. I suggestTime to wait for the container to start.