Skip to content

[ci][fix][broker] fix getChildren in memoryMetadata and etcdMetadata #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dcfd40d
[improve][misc] Highlight change to threading (#18025)
dave2wave Oct 13, 2022
e365afe
[fix][test] flaky AdminApi2Test.cleanup (#17861)
poorbarcode Oct 14, 2022
1ecb9c7
[improve][test] Improve KeySharedSubscriptionTest to reduce the execu…
poorbarcode Oct 14, 2022
230e7dc
[fix][broker] Update the log print content of createSubscriptions (#1…
Pomelongan Oct 14, 2022
bcf94b1
[improve][test] Improve SimpleSchemaTest to reduce the execution time…
poorbarcode Oct 14, 2022
e960a65
[fix][doc] Fix the grammar error in docs (#18033)
Huanli-Meng Oct 14, 2022
3dc48d1
[fix][io] Remove unused import statement in the MariadbJdbcAutoSchema…
coderzc Oct 14, 2022
97035ba
[fix][connectors] Fix builtin sink transformation on k8s (#18019)
cbornet Oct 14, 2022
fc61d52
[fix][test] Fix flaky test AdminApi2Test.resetClusters (#18043)
poorbarcode Oct 14, 2022
b451880
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when cha…
Oct 14, 2022
28d8514
[improve][doc] Update workflow of submitting release note (#18047)
Anonymitaet Oct 14, 2022
7dc9a5f
[cleanup][proxy] Use correct address for HAProxyMessage destination (…
michaeljmarshall Oct 14, 2022
6d86a2f
[fix][io] Fix builtin connector/function download filename (#18044)
cbornet Oct 14, 2022
71fb9eb
Bump commons-text to 1.10.0 (#18053)
coderzc Oct 14, 2022
fc7262a
Add link to CPP/Python client to README.md (#18054)
coderzc Oct 14, 2022
da0db33
[fix][ci] Fix deprecation warnings about set-output (#18048)
lhotari Oct 14, 2022
09f5eeb
Make BookieId work with PulsarRegistrationDriver (second take) (#17922)
eolivelli Oct 15, 2022
9c8a6ad
[Improve][doc] Insist about default behaviour for retention. (#17958)
AlvaroStream Oct 17, 2022
a990822
[fix][test] Fix TransactionTest failure due to clean up after class (…
liangyepianzhou Oct 17, 2022
b530184
[fix][test] AdvertisedListenersTest.setup (#17869)
congbobo184 Oct 17, 2022
96cc240
[improve][test] Add test case for system topic schema not compatible …
dragonls Oct 17, 2022
796afff
[improve][test] Improve SubscriptionMessageDispatchThrottlingTest to …
coderzc Oct 17, 2022
4421501
[fix][sec] Upgrade JacksonXML to 2.13.4 (#18020)
nicoloboschi Oct 17, 2022
ffbcdc0
[improve][doc] Add options to stats and partitioned-stats (#17734)
AlvaroStream Oct 17, 2022
4b5de98
[cleanup][broker] Delete unuse metrics of zk_write_latency and zk_rea…
liangyuanpeng Oct 17, 2022
574f784
[improve][fn] Run connectors extraction in parallel (#17902)
Oct 17, 2022
8f44c1a
[fix][admin] Fix NPE when get OffloadThreshold on namespace (#18061)
Technoboy- Oct 17, 2022
63cdb54
[fix][ci] Upload to codecov only upstream pull requests (#18018)
nicoloboschi Oct 17, 2022
0c7a0d1
[feat][doc] Separate CLI docs for doc gen automation (#18051)
Anonymitaet Oct 17, 2022
9acafc9
[fix][sec] File tiered storage: upgrade jettison to get rid of CVE-20…
nicoloboschi Oct 17, 2022
83e830e
[improve][doc] Redirect dangling standlaone to getting-started-standa…
tisonkun Oct 18, 2022
40d4456
fix getChildren in memoryMetadata and etcdMetadata
coderzc Oct 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ This change added tests and can be verified as follows:
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-go-functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Check changed files
id: check_changes
run: |
echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"
echo "docs_only=${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}" >> $GITHUB_OUTPUT

- name: Check if the PR has been approved for testing
if: ${{ steps.check_changes.outputs.docs_only != 'true' && github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pulsar-ci-flaky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- name: Check changed files
id: check_changes
run: |
echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"
echo "docs_only=${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}" >> $GITHUB_OUTPUT

- name: Check if the PR has been approved for testing
if: ${{ steps.check_changes.outputs.docs_only != 'true' && github.repository == 'apache/pulsar' && github.event_name == 'pull_request' }}
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
- name: Check changed files
id: check_changes
run: |
echo "::set-output name=docs_only::${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}"
echo "docs_only=${{ fromJSON(steps.changes.outputs.all_count) == fromJSON(steps.changes.outputs.docs_count) && fromJSON(steps.changes.outputs.docs_count) > 0 }}" >> $GITHUB_OUTPUT


- name: Check if the PR has been approved for testing
Expand Down Expand Up @@ -218,6 +218,7 @@ jobs:
uses: ./.github/actions/copy-test-reports

- name: Upload to Codecov
if: ${{ github.repository == 'apache/pulsar' }}
uses: codecov/codecov-action@v3
continue-on-error: true
with:
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la

### Clients

- [C++ Client](https://github.com/apache/pulsar-client-cpp)
- [Python Client](https://github.com/apache/pulsar-client-python)
- [.NET/C# Client](https://github.com/apache/pulsar-dotpulsar)
- [Go Client](https://github.com/apache/pulsar-client-go)
- [NodeJS Client](https://github.com/apache/pulsar-client-node)
Expand Down
18 changes: 9 additions & 9 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,14 @@ The Apache Software License, Version 2.0
* JCommander -- com.beust-jcommander-1.82.jar
* High Performance Primitive Collections for Java -- com.carrotsearch-hppc-0.9.1.jar
* Jackson
- com.fasterxml.jackson.core-jackson-annotations-2.13.3.jar
- com.fasterxml.jackson.core-jackson-core-2.13.3.jar
- com.fasterxml.jackson.core-jackson-databind-2.13.3.jar
- com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.13.3.jar
- com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.13.3.jar
- com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.13.3.jar
- com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.13.3.jar
- com.fasterxml.jackson.module-jackson-module-jsonSchema-2.13.3.jar
- com.fasterxml.jackson.core-jackson-annotations-2.13.4.jar
- com.fasterxml.jackson.core-jackson-core-2.13.4.jar
- com.fasterxml.jackson.core-jackson-databind-2.13.4.jar
- com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.13.4.jar
- com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.13.4.jar
- com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.13.4.jar
- com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.13.4.jar
- com.fasterxml.jackson.module-jackson-module-jsonSchema-2.13.4.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.0.1.jar
Expand Down Expand Up @@ -284,7 +284,7 @@ The Apache Software License, Version 2.0
- org.apache.commons-commons-collections4-4.4.jar
- org.apache.commons-commons-compress-1.21.jar
- org.apache.commons-commons-lang3-3.11.jar
- org.apache.commons-commons-text-1.9.jar
- org.apache.commons-commons-text-1.10.0.jar
* Netty
- io.netty-netty-buffer-4.1.77.Final.jar
- io.netty-netty-codec-4.1.77.Final.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.pulsar.common.util.PortManager.releaseLockedPort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testBookieFailure() throws Exception {
metadataStore.unsetAlwaysFail();

bkc = new BookKeeperTestClient(baseClientConf);
startNewBookie();
int port = startNewBookie();

// Reconnect a new bk client
factory.shutdown();
Expand Down Expand Up @@ -147,6 +148,7 @@ public void testBookieFailure() throws Exception {
assertEquals("entry-2", new String(entries.get(0).getData()));
entries.forEach(Entry::release);
factory.shutdown();
releaseLockedPort(port);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package org.apache.bookkeeper.test;

import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertFalse;

import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -62,7 +63,7 @@
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
Expand Down Expand Up @@ -113,6 +114,7 @@ public void handleTestMethodName(Method method) {

private boolean isAutoRecoveryEnabled;
protected ExecutorService executor;
private final List<Integer> bookiePorts = new ArrayList<>();

SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
Expand Down Expand Up @@ -264,7 +266,7 @@ protected void startBKCluster(String metadataServiceUri) throws Exception {

// Create Bookie Servers (B1, B2, B3)
for (int i = 0; i < numBookies; i++) {
startNewBookie();
bookiePorts.add(startNewBookie());
}
}

Expand All @@ -283,14 +285,15 @@ protected void stopBKCluster() throws Exception {
t.shutdown();
}
servers.clear();
bookiePorts.removeIf(PortManager::releaseLockedPort);
}

protected ServerConfiguration newServerConfiguration() throws Exception {
File f = tmpDirs.createNew("bookie", "test");

int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = PortManager.nextFreePort();
port = nextLockedFreePort();
} else {
port = 0;
}
Expand Down
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ flexible messaging model and an intuitive client API.</description>
<bookkeeper.version>4.15.1</bookkeeper.version>
<zookeeper.version>3.8.0</zookeeper.version>
<commons-cli.version>1.5.0</commons-cli.version>
<commons-text.version>1.9</commons-text.version>
<commons-text.version>1.10.0</commons-text.version>
<snappy.version>1.1.8.4</snappy.version> <!-- ZooKeeper server -->
<dropwizardmetrics.version>4.1.12.1</dropwizardmetrics.version> <!-- ZooKeeper server -->
<curator.version>5.1.0</curator.version>
Expand All @@ -139,7 +139,7 @@ flexible messaging model and an intuitive client API.</description>
<log4j2.version>2.18.0</log4j2.version>
<bouncycastle.version>1.69</bouncycastle.version>
<bouncycastlefips.version>1.0.2</bouncycastlefips.version>
<jackson.version>2.13.3</jackson.version>
<jackson.version>2.13.4</jackson.version>
<reflections.version>0.9.11</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
Expand Down Expand Up @@ -247,6 +247,7 @@ flexible messaging model and an intuitive client API.</description>
<objenesis.version>3.1</objenesis.version>
<awaitility.version>4.2.0</awaitility.version>
<reload4j.version>1.2.22</reload4j.version>
<jettison.version>1.5.1</jettison.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down Expand Up @@ -798,6 +799,13 @@ flexible messaging model and an intuitive client API.</description>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>${jettison.version}</version>
</dependency>


<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4524,7 +4524,7 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
});

FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> {
log.info("[{}] Successfully created new partitions {}", clientAppId(), topicName);
log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName);
result.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to create subscriptions on new partitions for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,8 @@ public void getOffloadThreshold(
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.OFFLOAD, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> {
if (policies.offload_policies == null) {
if (policies.offload_policies == null
|| policies.offload_policies.getManagedLedgerOffloadThresholdInBytes() == null) {
asyncResponse.resume(policies.offload_threshold);
} else {
asyncResponse.resume(policies.offload_policies.getManagedLedgerOffloadThresholdInBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class BrokerOperabilityMetrics {
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
private final DimensionStats zkWriteLatencyStats;
private final DimensionStats zkReadLatencyStats;
private final String brokerName;
private final LongAdder connectionTotalCreatedCount;
private final LongAdder connectionCreateSuccessCount;
Expand All @@ -45,8 +43,6 @@ public BrokerOperabilityMetrics(String localCluster, String brokerName) {
this.metricsList = new ArrayList<>();
this.localCluster = localCluster;
this.topicLoadStats = new DimensionStats("topic_load_times", 60);
this.zkWriteLatencyStats = new DimensionStats("zk_write_latency", 60);
this.zkReadLatencyStats = new DimensionStats("zk_read_latency", 60);
this.brokerName = brokerName;
this.connectionTotalCreatedCount = new LongAdder();
this.connectionCreateSuccessCount = new LongAdder();
Expand All @@ -62,8 +58,6 @@ public List<Metrics> getMetrics() {

private void generate() {
metricsList.add(getTopicLoadMetrics());
metricsList.add(getZkWriteLatencyMetrics());
metricsList.add(getZkReadLatencyMetrics());
metricsList.add(getConnectionMetrics());
}

Expand Down Expand Up @@ -93,14 +87,6 @@ Metrics getTopicLoadMetrics() {
return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
}

Metrics getZkWriteLatencyMetrics() {
return getDimensionMetrics("zk_write_latency", "zk_write", zkWriteLatencyStats);
}

Metrics getZkReadLatencyMetrics() {
return getDimensionMetrics("zk_read_latency", "zk_read", zkReadLatencyStats);
}

Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) {
Metrics dMetrics = Metrics.create(getDimensionMap(metricsName));

Expand All @@ -120,22 +106,12 @@ Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionS
public void reset() {
metricsList.clear();
topicLoadStats.reset();
zkWriteLatencyStats.reset();
zkReadLatencyStats.reset();
}

public void recordTopicLoadTimeValue(long topicLoadLatencyMs) {
topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
}

public void recordZkWriteLatencyTimeValue(long topicLoadLatencyMs) {
zkWriteLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
}

public void recordZkReadLatencyTimeValue(long topicLoadLatencyMs) {
zkReadLatencyStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
}

public void recordConnectionCreate() {
this.connectionTotalCreatedCount.increment();
this.connectionActive.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -124,6 +126,9 @@ protected void additionalBrokersCleanup() {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarService.close();
pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort);
pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort);
} catch (PulsarServerException e) {
// ignore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public void resetClusters() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
for (String tenant : admin.tenants().getTenants()) {
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
deleteNamespaceGraceFully(namespace, true);
deleteNamespaceGraceFullyByMultiPulsars(namespace, true, admin, pulsar,
mockPulsarSetup.getPulsar());
}
admin.tenants().deleteTenant(tenant, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,9 @@ public void testSetOffloadThreshold() throws Exception {
admin.namespaces().createNamespace(namespace, Set.of(testLocalCluster));
admin.topics().createNonPartitionedTopic(topicName.toString());

admin.namespaces().setOffloadDeleteLag(namespace, 10000, TimeUnit.SECONDS);
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));

// assert we get the default which indicates it will fall back to default
assertEquals(-1, admin.namespaces().getOffloadThreshold(namespace));
// the ledger config should have the expected value
Expand Down
Loading