Skip to content

Commit

Permalink
Pulsar with bk 4.15.0 rc1
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed Apr 27, 2022
1 parent a0c2c59 commit 2a74154
Show file tree
Hide file tree
Showing 37 changed files with 2,745 additions and 772 deletions.
2 changes: 1 addition & 1 deletion buildtools/src/main/resources/pulsar/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ page at http://checkstyle.sourceforge.net/config.html -->

<module name="IllegalImport">
<property name="illegalPkgs"
value="autovalue.shaded, avro.shaded, bk-shade, com.google.api.client.repackaged, com.google.appengine.repackaged, io.netty.util.internal"/>
value="autovalue.shaded, avro.shaded, bk-shade, com.google.api.client.repackaged, com.google.appengine.repackaged" />
</module>

<module name="RedundantModifier">
Expand Down
30 changes: 30 additions & 0 deletions conf/default_rocksdb.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[DBOptions]
# set by jni: options.setCreateIfMissing
create_if_missing=true
# set by jni: options.setInfoLogLevel
info_log_level=INFO_LEVEL
# set by jni: options.setKeepLogFileNum
keep_log_file_num=30

[CFOptions "default"]
# set by jni: options.setLogFileTimeToRoll
log_file_time_to_roll=86400
70 changes: 70 additions & 0 deletions conf/entry_location_rocksdb.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[DBOptions]
# set by jni: options.setCreateIfMissing
create_if_missing=true
# set by jni: options.setInfoLogLevel
info_log_level=INFO_LEVEL
# set by jni: options.setKeepLogFileNum
keep_log_file_num=30
# set by jni: options.setLogFileTimeToRoll
log_file_time_to_roll=86400
# set by jni: options.setMaxBackgroundJobs or options.setIncreaseParallelism
max_background_jobs=2
# set by jni: options.setMaxSubcompactions
max_subcompactions=1
# set by jni: options.setMaxTotalWalSize
max_total_wal_size=536870912
# set by jni: options.setMaxOpenFiles
max_open_files=-1
# set by jni: options.setDeleteObsoleteFilesPeriodMicros
delete_obsolete_files_period_micros=3600000000

[CFOptions "default"]
# set by jni: options.setCompressionType
compression=kLZ4Compression
# set by jni: options.setWriteBufferSize
write_buffer_size=67108864
# set by jni: options.setMaxWriteBufferNumber
max_write_buffer_number=4
# set by jni: options.setNumLevels
num_levels=7
# set by jni: options.setLevelZeroFileNumCompactionTrigger
level0_file_num_compaction_trigger=4
# set by jni: options.setMaxBytesForLevelBase
max_bytes_for_level_base=268435456
# set by jni: options.setTargetFileSizeBase
target_file_size_base=67108864

[TableOptions/BlockBasedTable "default"]
# set by jni: tableOptions.setBlockSize
block_size=65536
# set by jni: tableOptions.setBlockCache
block_cache=206150041
# set by jni: tableOptions.setFormatVersion
format_version=2
# set by jni: tableOptions.setChecksumType
checksum=kxxHash
# set by jni: tableOptions.setFilterPolicy, bloomfilter:[bits_per_key]:[use_block_based_builder]
filter_policy=rocksdb.BloomFilter:10:false
# set by jni: tableOptions.setCacheIndexAndFilterBlocks
cache_index_and_filter_blocks=true
# set by jni: options.setLevelCompactionDynamicLevelBytes
level_compaction_dynamic_level_bytes=true
30 changes: 30 additions & 0 deletions conf/ledger_metadata_rocksdb.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[DBOptions]
# set by jni: options.setCreateIfMissing
create_if_missing=true
# set by jni: options.setInfoLogLevel
info_log_level=INFO_LEVEL
# set by jni: options.setKeepLogFileNum
keep_log_file_num=30

[CFOptions "default"]
# set by jni: options.setLogFileTimeToRoll
log_file_time_to_roll=86400
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testSimpleRead() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my-ledger", config);
ManagedLedger ledger = factory.open("my-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("c1");

int N = 1;
Expand All @@ -88,7 +88,7 @@ public void testBookieFailure() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
ManagedLedger ledger = factory.open("my-ledger", config);
ManagedLedger ledger = factory.open("my-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("my-cursor");
ledger.addEntry("entry-0".getBytes());

Expand Down Expand Up @@ -121,7 +121,7 @@ public void testBookieFailure() throws Exception {
factory.shutdown();

factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ledger = factory.open("my-ledger", config);
ledger = factory.open("my-ledger" + testName, config);
cursor = ledger.openCursor("my-cursor");

// Next add should succeed
Expand Down Expand Up @@ -159,7 +159,7 @@ public void verifyConcurrentUsage() throws Exception {
EntryCacheManager cacheManager = factory.getEntryCacheManager();
ManagedLedgerConfig conf = new ManagedLedgerConfig();
conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger", conf);
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my-ledger" + testName, conf);

int NumProducers = 1;
int NumConsumers = 1;
Expand Down Expand Up @@ -299,7 +299,7 @@ public void asyncMarkDeleteAndClose() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("my_test_ledger", config);
ManagedLedger ledger = factory.open("my_test_ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("c1");

List<Position> positions = Lists.newArrayList();
Expand Down Expand Up @@ -348,7 +348,7 @@ public void ledgerFencedByAutoReplication() throws Exception {
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger" + testName, config);
ManagedCursor c1 = ledger.openCursor("c1");

PositionImpl p1 = (PositionImpl) ledger.addEntry("entry-1".getBytes());
Expand Down Expand Up @@ -378,15 +378,15 @@ public void ledgerFencedByFailover() throws Exception {
ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory1.open("my_test_ledger", config);
ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory1.open("my_test_ledger" + testName, config);
ledger1.openCursor("c");

ledger1.addEntry("entry-1".getBytes());

// Open the ML from another factory
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory2.open("my_test_ledger", config);
ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory2.open("my_test_ledger" + testName, config);
ManagedCursor c2 = ledger2.openCursor("c");

try {
Expand Down Expand Up @@ -477,7 +477,7 @@ public void managedLedgerClosed() throws Exception {
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger", config);
ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger" + testName, config);

int N = 100;

Expand Down Expand Up @@ -517,7 +517,7 @@ public void testChangeCrcType() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
config.setDigestType(DigestType.CRC32);
ManagedLedger ledger = factory.open("my_test_ledger", config);
ManagedLedger ledger = factory.open("my_test_ledger" + testName, config);
ManagedCursor c1 = ledger.openCursor("c1");

ledger.addEntry("entry-0".getBytes());
Expand All @@ -527,7 +527,7 @@ public void testChangeCrcType() throws Exception {
ledger.close();

config.setDigestType(DigestType.CRC32C);
ledger = factory.open("my_test_ledger", config);
ledger = factory.open("my_test_ledger" + testName, config);
c1 = ledger.openCursor("c1");

ledger.addEntry("entry-3".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected String changeLedgerPath() {
return "/test";
}

@Test()
@Test(timeOut = 60000)
public void testChangeZKPath() throws Exception {
ClientConfiguration configuration = new ClientConfiguration();
String zkConnectString = zkUtil.getZooKeeperConnectString() + "/test";
Expand All @@ -61,8 +61,8 @@ public void testChangeZKPath() throws Exception {
.setAckQuorumSize(1)
.setMetadataAckQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("test-ledger", config);
ManagedCursor cursor = ledger.openCursor("test-c1");
ManagedLedger ledger = factory.open("test-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("test-c1" + testName);

for (int i = 0; i < 10; i++) {
String entry = "entry" + i;
Expand All @@ -77,7 +77,7 @@ public void testChangeZKPath() throws Exception {
Assert.assertEquals(("entry" + i).getBytes("UTF8"), entry.getData());
}
}
@Test()
@Test(timeOut = 60000)
public void testChangeZKPath2() throws Exception {
ClientConfiguration configuration = new ClientConfiguration();
String zkConnectString = zkUtil.getZooKeeperConnectString() + "/test";
Expand All @@ -98,8 +98,8 @@ public void testChangeZKPath2() throws Exception {
.setAckQuorumSize(1)
.setMetadataAckQuorumSize(1)
.setMetadataAckQuorumSize(1);
ManagedLedger ledger = factory.open("test-ledger", config);
ManagedCursor cursor = ledger.openCursor("test-c1");
ManagedLedger ledger = factory.open("test-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("test-c1" + testName);

for (int i = 0; i < 10; i++) {
String entry = "entry" + i;
Expand Down
Loading

0 comments on commit 2a74154

Please sign in to comment.