Skip to content

Commit 020ac8a

Browse files
made changes as per code review comments
1 parent 56002ec commit 020ac8a

File tree

10 files changed

+24
-26
lines changed

10 files changed

+24
-26
lines changed

conf/broker.conf

+1-2
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,7 @@ maxNumPartitionsPerPartitionedTopic=0
555555
# There are two policies to apply when broker metadata session expires: session expired happens, "shutdown" or "reconnect".
556556
# With "shutdown", the broker will be restarted.
557557
# With "reconnect", the broker will keep serving the topics, while attempting to recreate a new session.
558-
# Node: the "reconnect" policy is an experiment feature
559-
zookeeperSessionExpiredPolicy=shutdown
558+
zookeeperSessionExpiredPolicy=reconnect
560559

561560
# Enable or disable system topic
562561
systemTopicEnabled=false

deployment/terraform-ansible/templates/broker.conf

+1-2
Original file line numberDiff line numberDiff line change
@@ -424,8 +424,7 @@ maxNumPartitionsPerPartitionedTopic=0
424424
# There are two policies when zookeeper session expired happens, "shutdown" and "reconnect".
425425
# If uses "shutdown" policy, shutdown the broker when zookeeper session expired happens.
426426
# If uses "reconnect" policy, try to reconnect to zookeeper server and re-register metadata to zookeeper.
427-
# Node: the "reconnect" policy is an experiment feature
428-
zookeeperSessionExpiredPolicy=shutdown
427+
zookeeperSessionExpiredPolicy=reconnect
429428

430429
# Enable or disable system topic
431430
systemTopicEnabled=false

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1176,7 +1176,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
11761176
+ " With \"shutdown\", the broker will be restarted.\n\n"
11771177
+ " With \"reconnect\", the broker will keep serving the topics, while attempting to recreate a new session."
11781178
)
1179-
private MetadataSessionExpiredPolicy zookeeperSessionExpiredPolicy = MetadataSessionExpiredPolicy.shutdown;
1179+
private MetadataSessionExpiredPolicy zookeeperSessionExpiredPolicy = MetadataSessionExpiredPolicy.reconnect;
11801180

11811181
@FieldContext(
11821182
category = CATEGORY_SERVER,

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -674,7 +674,8 @@ public void start() throws PulsarServerException {
674674
this.startNamespaceService();
675675

676676
schemaStorage = createAndStartSchemaStorage();
677-
schemaRegistryService = SchemaRegistryService.create(schemaStorage, config.getSchemaRegistryClassName());
677+
setSchemaRegistryName(schemaStorage);
678+
schemaRegistryService = SchemaRegistryService.create(config.getSchemaRegistryClassName());
678679
schemaRegistryService.initialize(config, schemaStorage);
679680

680681
this.defaultOffloader = createManagedLedgerOffloader(
@@ -1266,6 +1267,12 @@ private SchemaStorage createAndStartSchemaStorage() throws Exception {
12661267
return schemaStorage;
12671268
}
12681269

1270+
private void setSchemaRegistryName(SchemaStorage schemaStorage) {
1271+
if (schemaStorage == null) {
1272+
this.config.setSchemaRegistryClassName("org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService");
1273+
}
1274+
}
1275+
12691276
public ScheduledExecutorService getExecutor() {
12701277
return executor;
12711278
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ public SchemaVersion versionFromBytes(byte[] version) {
108108

109109
@Override
110110
public void initialize(ServiceConfiguration configuration, SchemaStorage schemaStorage)
111-
throws PulsarServerException {
112-
113-
}
111+
throws PulsarServerException {}
114112

115113
@Override
116114
public void close() throws Exception {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,14 @@ public interface SchemaRegistryService extends SchemaRegistry {
2929
Logger LOG = LoggerFactory.getLogger(SchemaRegistryService.class);
3030
long NO_SCHEMA_VERSION = -1L;
3131

32-
static SchemaRegistryService create(SchemaStorage schemaStorage, String schemaRegistryClassName) {
33-
if (schemaStorage != null) {
34-
try {
35-
SchemaRegistryService schemaRegistryService = (SchemaRegistryService) Class
36-
.forName(schemaRegistryClassName).getDeclaredConstructor()
37-
.newInstance();
32+
static SchemaRegistryService create(String schemaRegistryClassName) {
33+
try {
34+
SchemaRegistryService schemaRegistryService = (SchemaRegistryService) Class.forName(schemaRegistryClassName)
35+
.getDeclaredConstructor().newInstance();
3836

39-
return SchemaRegistryServiceWithSchemaDataValidator.of(schemaRegistryService);
40-
} catch (Exception e) {
41-
LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e);
42-
}
37+
return SchemaRegistryServiceWithSchemaDataValidator.of(schemaRegistryService);
38+
} catch (Exception e) {
39+
LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e);
4340
}
4441
return new DefaultSchemaRegistryService();
4542
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
2626
import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
2727
import static org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
28-
import com.google.common.annotations.VisibleForTesting;
2928
import com.google.common.collect.Maps;
3029
import com.google.common.hash.HashFunction;
3130
import com.google.common.hash.Hashing;
@@ -53,7 +52,6 @@
5352
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
5453
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
5554
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
56-
import org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator;
5755
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5856
import org.apache.pulsar.common.protocol.schema.SchemaData;
5957
import org.apache.pulsar.common.protocol.schema.SchemaHash;
@@ -218,7 +216,6 @@ public SchemaVersion versionFromBytes(byte[] version) {
218216
public void initialize(ServiceConfiguration configuration, SchemaStorage schemaStorage) throws PulsarServerException {
219217
try {
220218
Map<SchemaType, SchemaCompatibilityCheck> checkers = getCheckers(configuration.getSchemaRegistryCompatibilityCheckers());
221-
checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
222219
this.schemaStorage = schemaStorage;
223220
this.compatibilityChecks = checkers;
224221
this.clock = Clock.systemUTC();
@@ -237,6 +234,7 @@ private Map<SchemaType, SchemaCompatibilityCheck> getCheckers(Set<String> checke
237234
.getDeclaredConstructor().newInstance();
238235
checkers.put(instance.getSchemaType(), instance);
239236
}
237+
checkers.put(SchemaType.KEY_VALUE, new KeyValueSchemaCompatibilityCheck(checkers));
240238
return checkers;
241239
}
242240

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,18 @@ public void setup() {
4545
@Test
4646
void createCustomRegistry() {
4747
SchemaRegistryService actual = SchemaRegistryService
48-
.create(mockSchemaStorage, "org.apache.pulsar.broker.service.schema.TestSchemaRegistryService");
48+
.create("org.apache.pulsar.broker.service.schema.TestSchemaRegistryService");
4949
Assert.assertSame(actual.getClass(), SchemaRegistryServiceWithSchemaDataValidator.class);
5050
}
5151

5252
@Test
5353
void createEmptyRegistry() {
54-
SchemaRegistryService actual = SchemaRegistryService.create(null, null);
54+
SchemaRegistryService actual = SchemaRegistryService.create(null);
5555
Assert.assertSame(actual.getClass(), DefaultSchemaRegistryService.class);
5656
}
5757

5858
}
5959

6060
class TestSchemaRegistryService extends DefaultSchemaRegistryService {
61-
public TestSchemaRegistryService(SchemaStorage schemaStorage, Map checkers) {}
61+
public TestSchemaRegistryService() {}
6262
}

site2/website-next/versioned_docs/version-2.7.3/reference-configuration.md

-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,6 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
235235
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
236236
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
237237
|schemaRegistryStorageClassName|The schema storage implementation used by this broker.|org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory|
238-
|schemaRegistryClassName|Specify the schema registry to be used in Pulsar. If it is not set, the default schema registry is used.|org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl|
239238
|isSchemaValidationEnforced|Enforce schema validation on following cases: if a producer without a schema attempts to produce to a topic with schema, the producer will be failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema. If this setting is enabled, then non-java clients fail to produce.|false|
240239
|offloadersDirectory|The directory for all the offloader implementations.|./offloaders|
241240
|bookkeeperMetadataServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location. This value can be fetched using `bookkeeper shell whatisinstanceid` command in BookKeeper cluster. For example: zk+hierarchical://localhost:2181/ledgers. The metadata service uri list can also be semicolon separated values like below: zk+hierarchical://zk1:2181;zk2:2181;zk3:2181/ledgers ||

site2/website/versioned_docs/version-2.9.1/reference-configuration.md

+1
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
241241
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
242242
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
243243
|schemaRegistryStorageClassName|The schema storage implementation used by this broker.|org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory|
244+
|schemaRegistryClassName|Specify the schema registry to be used in Pulsar. If it is not set, the default schema registry is used.|org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl|
244245
|isSchemaValidationEnforced|Enforce schema validation on following cases: if a producer without a schema attempts to produce to a topic with schema, the producer will be failed to connect. PLEASE be carefully on using this, since non-java clients don't support schema. If this setting is enabled, then non-java clients fail to produce.|false|
245246
| topicFencingTimeoutSeconds | If a topic remains fenced for a certain time period (in seconds), it is closed forcefully. If set to 0 or a negative number, the fenced topic is not closed. | 0 |
246247
|offloadersDirectory|The directory for all the offloader implementations.|./offloaders|

0 commit comments

Comments
 (0)