-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix getProxyConnection when the topic is migrated #22085
Conversation
final int randomKeyForSelectConnection) { | ||
|
||
LookupService lookup = | ||
redirectedClusterURI == null ? this.lookup : getLookup(redirectedClusterURI.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these connections are cached and pooled, will this still work if the logicalAddress
stays the same in the source and destination clusters? Or is that not possible? Otherwise, we might have to use an actually random key for connection selection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a fair concern.
It appears that the connection cache key should be "logicalAddr + physicalAddr + random-key", and the pool can be a flat map, like the one below. I am not sure why physicalAddr is excluded now.
ConcurrentHashMap<String, CompletableFuture<ClientCnx> pool>
I wonder if we need to address this in a separate PR, as this is a design change for the connection cache. (We could isolate this PR to fix the wrong physical address after migration).
Also, the below code(and other getConnection funcs) might need to be revisited since they generate a new ran-key for every getConnection, which is a different connection cache access behavior after migration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that the connection cache key should be "logicalAddr + physicalAddr + random-key", and the pool can be a flat map, like the one below. I am not sure why physicalAddr is excluded now.
The correct solution might not be so obvious. I think that the commit & PR history reveals some important details for understanding it. IIRC, the connection pool has some gotchas that easily trick into making wrong assumptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least for me, the connection pool and the various concepts around it in the Pulsar client are very misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raised a PR to add the physical address in the key #22196
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#22196 got merged. Plz continue this PR review.
Since this is touching this area, I'm reminding about #22061 . :) Beside advertised listeners, it's possible that the destination cluster has SNI proxy routing. I guess supporting that would be necessary to have full coverage of the different connectivity options that Pulsar provides? |
It appears that client restart is required as per this code, https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L327 I guess we could address this question in the PIP discussion to confirm how blue-green migration should support sni-proxy routing. |
) ### Motivation Migrate apache/pulsar#22085 and (parts of) apache/pulsar-client-cpp#411 over to the Go client. Context for this idea [here](https://github.com/apache/pulsar/pull/22085/files#r1497008116). Golang client support for blue-green migration needs the connection pool to differentiate between connections with the same logical address, but different physical addresses. Otherwise, the wrong connection might be used by the client, in effect pointing to the old cluster, instead of the new one. ### Modifications The connection pool maintains a map of connections, keyed by their logical address and a random connection id. This PR proposes including the physical address in the key also, therefore allowing the upper layer to differentiate between connections with identical logical addresses, but different physical addresses. In addition to this change, the test setup had to be fixed to address breakages in `TestRetryWithMultipleHosts` and `TestReaderWithMultiHosts`. All tests in the repository are using a local standalone setup currently. This unusual configuration has broker lookup operations reply with flag `proxyThroughServiceUrl=true` ([ref](https://github.com/apache/pulsar/blob/e7c2a75473b545134a3b292ae0e87a79d65cb756/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L369)). This in turn has the Go lookup service attempt a name resolution of the configured service addresses ([ref](https://github.com/apache/pulsar-client-go/blob/c3e94e243a730ae22d59bf9d330c4539733b7eef/pulsar/internal/lookup_service.go#L124)). The resolver picks addresses in round-robin mode. Because these two tests use a correct (reachable) broker address _and_ an unreachable address, the resolved address can end up pointing to the unreachable address. The connection pool is then corrupted with a logically invalid entry, causing the tests to fail: | Logical Address | Physical Address | Notes | | --------------- | ---------------- | ----- | | reachable-broker | reachable-broker | Valid | | unreachable-broker | unreachable-broker | Valid, but currently unusable | | reachable-broker | unreachable-broker | *Invalid entry* | To address the issue: - Switch the test setup to a more common cluster configuration. File `integration-tests/clustered/docker-compose.yml` instructs how this setup should look like. - Migrate the tests to separate files and test suites. New test files `pulsar/client_impl_clustered_test.go` and `pulsar/reader_clustered_test.go` contain Go tag `clustered`, allowing them to be ignored during the standalone test runs by virtue of the Go build process. - Add script `run-ci-clustered.sh`, specifying the "clustered" tests to run. - Changes in the `Makefile` add targets `make test_clustered` `make test_standalone` to run the respective test suites independently, while allowing `make test` to run all the tests, as before. - `Dockerfile` and `run-ci.sh` are modified to run the Go build process in the container build, such that it does not need to be run again in the new `run-ci-clustered.sh` script. The image is locally consumed by the tests only and is not published, so there is no risk of contaminating users.
PIP: #20748
Motivation
When a topic is migrated to a cluster with a proxy, it needs to use the proxy in the new cluster.
Modifications
Verifying this change
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: