Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-121: Pulsar cluster level auto failover on client side #13316

Merged
merged 18 commits into from
Jan 29, 2022

Conversation

hangc0276
Copy link
Contributor

Related to #13315

Modification

  1. add Pulsar cluster level auto failover

@github-actions
Copy link

@hangc0276:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@hangc0276 hangc0276 changed the title [PIP-121] Pulsar cluster level auto failover [WIP] PIP-121: Pulsar cluster level auto failover [WIP] Dec 15, 2021
@hangc0276 hangc0276 added the doc-required Your PR changes impact docs and you will update later. label Dec 15, 2021
@github-actions
Copy link

@hangc0276:Thanks for providing doc info!

* Close the resource that the provider allocated.
*
*/
default void close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to make the interface inherit AutoCloseable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we can remove the default empty implementation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other serviceUrlProvider implementation also need to implement close() method, so I add the default implementation.

this.primaryFailedTimestamp = -1;
this.primaryRecoverTimestamp = -1;
this.secondaryFailedTimestamp = -1;
this.timer = new Timer("pulsar-service-provider");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use ScheduledThreadPoolExecutor here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

if (primaryFailedTimestamp == -1) {
primaryFailedTimestamp = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to use System.nanoTime() to calculate the time locally to avoid time drifts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public void run() {
// current pulsar serviceUrl is primary
if (currentPulsarServiceUrl.equals(primary)) {
if (probeAvailable(primary, timeout)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about failover after a succession of failures (3 for example, and should be configurable) to avoid unnecessary switches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation is switch by failure delay time, it can control the client failover time. If we use failure times, the failover time will out of control.

try {
URLConnection conn = pulsarUrlProvider.openConnection();
inputStream = conn.getInputStream();
return new String(IOUtils.toByteArray(inputStream), StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the service url as the response body might not be very extensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should use a JSON response here

}

Socket socket = new Socket();
socket.connect(new InetSocketAddress(parseHost(hostAndPort), parsePort(hostAndPort)), timeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to make a request to the server to ensure it is actually in a good state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation is probe whether the service port is open or not. If we want to probe the broker state, it's better to provide a health check command for broker service on broker side. I can use another PR to provide this feature on broker side. @merlimat Do you have any other ideas?

Copy link
Member

@zymap zymap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, we use geo-replication to support cluster failover, that means there might be have more than two cluster. In the implementation, we only can configure two clusters for the failover. I am thinking of, do we need to support more clusters to failover to make the cluster is more robust?

Comment on lines 171 to 258
private static String parseHostAndPort(String url) {
if (Strings.isNullOrEmpty(url) || !url.startsWith("pulsar")) {
throw new IllegalArgumentException("'" + url + "' isn't an Pulsar service URL");
}

int uriSeparatorPos = url.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + url + "' isn't an URI.");
}
return url.substring(uriSeparatorPos + 3);
}

private static String parseHost(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return hostAndPort.substring(0, portSeparatorPos);
}

private static Integer parsePort(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return Integer.valueOf(hostAndPort.substring(portSeparatorPos+1));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use URL class to parse a URL and you can get the host and port easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think URI could be better here, because URL assumes a "known" protocol scheme.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 please address this comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

currentPulsarServiceUrl, System.currentTimeMillis() - primaryFailedTimestamp);
}
} else { // current pulsar service URL is secondary, probe whether we need to switch back to primary.
if (!probeAvailable(currentPulsarServiceUrl, timeout)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the url check and switch as a method? I think most of the following logic is duplicated with the primary service url check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@hpvd
Copy link

hpvd commented Dec 16, 2021

As you mentioned, we use geo-replication to support cluster failover, that means there might be have more than two cluster. In the implementation, we only can configure two clusters for the failover. I am thinking of, do we need to support more clusters to failover to make the cluster is more robust?

Maybe one could even go further and automatically choose from all of the replicated clusters, the nearest one (via latency probe/from pre-probed list)?
(as follow-up step of this feature)

@hangc0276 hangc0276 changed the title PIP-121: Pulsar cluster level auto failover [WIP] PIP-121: Pulsar cluster level auto failover Dec 16, 2021
@hangc0276
Copy link
Contributor Author

As you mentioned, we use geo-replication to support cluster failover, that means there might be have more than two cluster. In the implementation, we only can configure two clusters for the failover. I am thinking of, do we need to support more clusters to failover to make the cluster is more robust?

@zymap if we have multi secondary clusters, different clients will switch to different secondary clusters. There are some problems.

  1. In geo-replication case, it depends on geo-replication to ensure data integrity. It can't ensure message order in one topic. If all the producer just produce to the same cluster, we can ensure message order in one topic.
  2. If the user doesn't enable geo-replication, we can't ensure the consumer get the whole messages from the topic. Because the producer may switched to cluster B, and the consumer may switched to cluster C.

So, in current implementation, we can support only one secondary cluster first.

@hangc0276
Copy link
Contributor Author

As you mentioned, we use geo-replication to support cluster failover, that means there might be have more than two cluster. In the implementation, we only can configure two clusters for the failover. I am thinking of, do we need to support more clusters to failover to make the cluster is more robust?

Maybe one could even go further and automatically choose from all of the replicated clusters, the nearest one (via latency probe/from pre-probed list)? (as follow-up step of this feature)

@hpvd Please refer to #13316 (comment)

* @param secondary
* @return
*/
AutoClusterFailoverBuilder secondary(String secondary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could take a list/set of serviceUrls, just to make it more general

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could take a list/set of serviceUrls, just to make it more general

done

}

Socket socket = new Socket();
socket.connect(new InetSocketAddress(parseHost(hostAndPort), parsePort(hostAndPort)), TIMEOUT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will check for TCP connectivity, which is a good start, though there can be many cases in which it will give the false impression that cluster is healthy:

  • We're connecting to a Pulsar proxy, but there are no available brokers
  • Using Istio on server side, which always accept the connection even if the broker is in a bad state
  • We might have deadlocks in (all) brokers and while the connections get accepted, the brokers are not able to serve them.

We should consider to have a more in depth test to:

  1. Check that we can authenticate with brokers
  2. (Maybe) Estimate how many brokers are up & healthy in the cluster

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current probe port method has many disadvantages. We can use this method first, and in next step, we'd better provide a health check command, just like Zookeeper ruok command, to probe the healthy of the target Pulsar cluster.

Comment on lines 171 to 258
private static String parseHostAndPort(String url) {
if (Strings.isNullOrEmpty(url) || !url.startsWith("pulsar")) {
throw new IllegalArgumentException("'" + url + "' isn't an Pulsar service URL");
}

int uriSeparatorPos = url.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + url + "' isn't an URI.");
}
return url.substring(uriSeparatorPos + 3);
}

private static String parseHost(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return hostAndPort.substring(0, portSeparatorPos);
}

private static Integer parsePort(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return Integer.valueOf(hostAndPort.substring(portSeparatorPos+1));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think URI could be better here, because URL assumes a "known" protocol scheme.

try {
URLConnection conn = pulsarUrlProvider.openConnection();
inputStream = conn.getInputStream();
return new String(IOUtils.toByteArray(inputStream), StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 We should use a JSON response here

private volatile String currentPulsarServiceUrl;
private final URL pulsarUrlProvider;
private final ScheduledExecutorService executor;
private final int interval = 30_000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should take the poll interval as a config in the builder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should take the poll interval as a config in the builder

done

@hangc0276 hangc0276 requested review from merlimat and zymap January 4, 2022 09:12
private final long intervalMs;
private static final int TIMEOUT = 30_000;

private AutoClusterFailover(String primary, List<String> secondary, long failoverDelayNs, long switchBackDelayNs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the arguments as configuration data or something else to avoid passing so many configurations into the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param authentication
* @return
*/
AutoClusterFailoverBuilder secondaryAuthentication(List<Authentication> authentication);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use a map to keep the secondary URLs and the authentication? I think we should consider users will not add the authentication in order with the secondary URL lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* @param tlsTrustCertsFilePath
* @return
*/
AutoClusterFailoverBuilder secondaryTlsTrustCertsFilePath(List<String> tlsTrustCertsFilePath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same above.

* @param tlsTrustStorePath
* @return
*/
AutoClusterFailoverBuilder secondaryTlsTrustStorePath(List<String> tlsTrustStorePath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same above.

* @param tlsTrustStorePassword
* @return
*/
AutoClusterFailoverBuilder secondaryTlsTrustStorePassword(List<String> tlsTrustStorePassword);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same above

Comment on lines 171 to 258
private static String parseHostAndPort(String url) {
if (Strings.isNullOrEmpty(url) || !url.startsWith("pulsar")) {
throw new IllegalArgumentException("'" + url + "' isn't an Pulsar service URL");
}

int uriSeparatorPos = url.indexOf("://");
if (uriSeparatorPos == -1) {
throw new IllegalArgumentException("'" + url + "' isn't an URI.");
}
return url.substring(uriSeparatorPos + 3);
}

private static String parseHost(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return hostAndPort.substring(0, portSeparatorPos);
}

private static Integer parsePort(String hostAndPort) {
int portSeparatorPos = hostAndPort.indexOf(":");
if (portSeparatorPos == -1) {
throw new IllegalArgumentException("'" + hostAndPort + "' isn't an URI.");
}
return Integer.valueOf(hostAndPort.substring(portSeparatorPos+1));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 please address this comment

@@ -746,7 +754,7 @@ public void shutdown() throws PulsarClientException {
} catch (PulsarClientException e) {
throwable = e;
}
if (conf != null && conf.getAuthentication() != null) {
if (conf.getAuthentication() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the conf != null ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add it back.

String authParamsString = controlledConfiguration.getAuthParamsString();
String token = controlledConfiguration.getToken();

switch (authPluginClassName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered that we can configure the authPlugin and authParams for all auth plugins, why do we need to case the auth plugins? Does the AuthenticationFactory.create(authPlugin, authParam) can not handle all things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@hangc0276
Copy link
Contributor Author

As you mentioned, we use geo-replication to support cluster failover, that means there might be have more than two cluster. In the implementation, we only can configure two clusters for the failover. I am thinking of, do we need to support more clusters to failover to make the cluster is more robust?

Maybe one could even go further and automatically choose from all of the replicated clusters, the nearest one (via latency probe/from pre-probed list)? (as follow-up step of this feature)

@hpvd I added the SecondaryChoosePolicy interface to support different choose policy. Currently we support ORDER first, and we can support more choose policy in the following PRs.

@hangc0276 hangc0276 force-pushed the chenhang/service_url_provider branch from 967ace6 to 5d9b48b Compare January 29, 2022 05:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-complete Your PR changes impact docs and the related docs have been already added.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants