-
Notifications
You must be signed in to change notification settings - Fork 332
Introduce alternate in-memory buffering event listener #2574
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
\cc @adnanhemani |
| testImplementation("io.quarkus:quarkus-junit5-mockito") | ||
| testImplementation("io.quarkus:quarkus-rest-client") | ||
| testImplementation("io.quarkus:quarkus-rest-client-jackson") | ||
| testImplementation("io.quarkus:quarkus-jdbc-h2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some cleanup in this file which had a few duplicate dependencies.
adnanhemani
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @adutra!! I agree, this looks quite clean and we should probably merge this and the previously-merged listeners together.
A few questions/comments to go, but I think this makes sense overall!
| private void assertRows(String realmId, int expected, Duration timeout) { | ||
| String query = "SELECT COUNT(*) FROM polaris_schema.events WHERE realm_id = '" + realmId + "'"; | ||
| await() | ||
| .atMost(timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we are testing with actual delays. Is there any way around this? Personally, I would like if we not increase the running time of the test suite as much as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, but then we need a separate test to test just the timeouts. I will do the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. It's indeed much faster now 👍
| } | ||
|
|
||
| protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) { | ||
| UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe that UnicastProcessor is thread-safe for multiple producers by default. However, if we pass in a ConcurrentLinkedQueue during create, we may be able to make it become thread-safe. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good question! But in fact, it is safe for multiple producers as the onNext method is synchronized. No need to use a MPSC queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW: the tests are testing this. If you look at the sendAsync method, it fires events from multiple threads.
| protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) { | ||
| UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create(); | ||
| processor | ||
| .emitOn(Infrastructure.getDefaultWorkerPool()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question on this: is this creating a separate execution service? Is it possible for this to become a bottleneck in terms of how fast events can be ingested?
It it be better or worse to have the threads here managed by Quarkus instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a Quarkus platform, Infrastructure.getDefaultWorkerPool() always returns Quarkus default worker pool. So this method is not creating a separate execution service – which is why we don't need to care about shutting it down either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to configure this workerPool ? lets say if i my one instance is handling a lot of realms ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's configurable through the quarkus.thread-pool.* options:
https://quarkus.io/guides/all-config#quarkus-core_quarkus-thread-pool-core-threads
The core thread pool is responsible for running all blocking tasks in Quarkus. I think this is appropriate for this use case. Note: this thread pool does NOT handle HTTP requests.
If this becomes problematic one day, the solution would be to declare a separate Executor CDI bean and inject it here.
| .intoLists() | ||
| .of(configuration.maxBufferSize(), configuration.bufferTime()) | ||
| .subscribe() | ||
| .with(events -> flush(realmId, events)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving here as a note for anyone else wondering:
Even though this pattern will dequeue events one-by-one when attempting to generate the list of events, I don't believe we will have the race condition where events are being added at a higher rate than the amount of events being drained. This is because only a fixed amount of events can be requested by the subscription at a time (which seems like Long.MAX_VALUE). If this load is continuously sustained, then it may overwhelm the service by overflowing the queue that is backing the UnicastProcessor - but I don't think there's many other ways to get around that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly the concern, what matters here is the UnicastProcessor queue. For now, it's unbounded, so there should never be a BackPressureFailure raised, but the small risk is that the queue could grow if the database cannot keep up with the number of events coming in.
I think it's OK for now, if users report memory issues we can look into bounded queues. But I don't think the server can generate events faster than the database is able to write them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a small safeguard: if the upstream terminates with a failure, a message is logged and the processor is evicted from the cache.
| public record ContextSpecificInformation(long timestamp, @Nullable String principalName) {} | ||
|
|
||
| abstract ContextSpecificInformation getContextSpecificInformation(); | ||
| protected abstract ContextSpecificInformation getContextSpecificInformation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious as to why these changes were made?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because otherwise you cannot access this method from outside this package.
Generally speaking, since this class is meant to be subclassed, even by implementors outside Polaris, it's not recommended to use package-private members.
adnanhemani
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
@adutra - I can ask others to review and merge this as well. Would you like me to work on gluing this together with the already existing Event Listener, or would you like to do it?
I can take care of that. Let's merge this one first if everyone agrees? I'll need re-approvals because of a merge conflict 😄 |
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well, thanks @adutra !
Added some questions for my better understanding
| LOGGER.error( | ||
| "Unexpected error while processing events for realm '{}'; some events may have been dropped", | ||
| realmId, | ||
| error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if its just okay to drop events considering they are deemed to be Audit logs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, since the beginning the consistency guarantee has been "at most once", so it's understood that some events may be lost.
| protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) { | ||
| UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create(); | ||
| processor | ||
| .emitOn(Infrastructure.getDefaultWorkerPool()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to configure this workerPool ? lets say if i my one instance is handling a lot of realms ?
| protected ContextSpecificInformation getContextSpecificInformation() { | ||
| var principal = securityContext.getUserPrincipal(); | ||
| var principalName = principal == null ? null : principal.getName(); | ||
| return new ContextSpecificInformation(clock.millis(), principalName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may be this is not in the scope of the pr, but do we know why just principal and not the activated roles ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is on me, I forgot to put it in the original PR - I will enhance this in the future to include activated roles.
|
Merging this now, I will tackle the remaining improvements next. |
* Avoid calling deprecated `TableMetadataParser.read(FileIO, InputFile)` method. (apache#2609) Call `read(InputFile)` instead, as instructed by Iceberg javadoc. * Add doc notes about EclipseLink removal (apache#2605) * chore(docs): add polaris-api-specs section (apache#2598) * docs(README): Updating the READMEs to Reflect the Project Structure (apache#2599) * docs(README): Updating the READMEs to Reflect the Project Structure * fix(deps): update dependency io.opentelemetry:opentelemetry-bom to v1.54.1 (apache#2613) * Add Code of Conduct entry to the ASF menu (apache#2537) * Use the ASF Code Of Conduct * Update site/hugo.yaml Co-authored-by: Robert Stupp <snazy@snazy.de> --------- Co-authored-by: Robert Stupp <snazy@snazy.de> * fix(deps): update dependency org.postgresql:postgresql to v42.7.8 (apache#2619) * chore(deps): update dependency mypy to >=1.18, <=1.18.2 (apache#2617) * Update registry.access.redhat.com/ubi9/openjdk-21-runtime Docker tag to v1.23-6.1758133907 (apache#2612) * Introduce alternate in-memory buffering event listener (apache#2574) * fix(deps): update dependency org.assertj:assertj-core to v3.27.5 (apache#2618) * chore(deps): update dependency virtualenv to >=20.34.0,<20.35.0 (apache#2614) * Add Community Meeting 20250918 (apache#2622) * Add 1.1.0-incubating release on the website (apache#2621) * Add 1.1.0-incubating release content (apache#2625) * chore(errorprone): Enabling EqualsGetClass, PatternMatchingInstanceof, and UnusedMethod in ErrorProne (apache#2600) * fix(deps): update dependency com.adobe.testing:s3mock-testcontainers to v4.9.1 (apache#2626) * Unify create/loadTable call paths (apache#2589) In preparation for implementing sending non-credential config to REST Catalog clients for apache#2207 this PR unifies calls paths for create/load table operations. This change does not have any differences in authorization. This change is not expecte to have any material behaviour differences to the affected code paths. The main idea is to consolidate decision-making for that to include into REST responses and use method parameters like `EnumSet<AccessDelegationMode> delegationModes` for driving those decisions. * Remove numeric identifier from PolarisPrincipal (apache#2388) This change removes the requirement for Polaris principals to have a numeric identifier, by removing the only sites where such an identifier was required: - In the `Resolver`. Instead, the `Resolver` now performs a lookup by principal name. - In `PolarisAdminService`. Instead, the code now compares the principal name against the entity name. Note: the lookup in the `Resolver` is still necessary, because the `Resolver` also needs to fetch the grant records. * Include principal name in Polaris tokens (apache#2389) * Include principal name in Polaris tokens Summary of changes: - Instead of including the principal id twice in the token, the principal name is now used as the subject claim. While the default authenticator doesn't need the principal name and works with just the principal id, not having the "real" principal name available could be a problem for other authenticator implementations. - `DecodedToken` has been refactored and renamed to `InternalPolarisCredential`. It is also now a package-private component. - `TokenBroker.verify()` now returns PolarisCredential. * rename to InternalPolarisToken * main: bump to 1.2.0-incubating-SNAPSHOT (apache#2624) * bump version.txt to 1.2.0-incubating-SNAPSHOT * virtualenv: wider version range (apache#2623) see apache#2614 (comment) * Remove ActiveRolesProvider (apache#2390) Summary of changes: - As proposed on the ML, `ActiveRolesProvider` is removed, and `DefaultActiveRolesProvider` is merged into `DefaultAuthenticator`. `ActiveRolesAugmentor` is also merged into `AuthenticatingAugmentor`. - The implicit convention that no roles in credentials == all roles requested is removed as it is ambiguous. Credentials must explicitly include the `PRINCIPAL_ROLE:ALL` pseudo-role to request all roles available. - PersistedPolarisPrincipal is removed. It existed merely as a means of passing the `PrincipalEntity` from the authenticator to the roles provider. This is not necessary anymore. * NoSQL: adaptions * Last merged commit d1d359a --------- Co-authored-by: Dmitri Bourlatchkov <dmitri.bourlatchkov@gmail.com> Co-authored-by: Artur Rakhmatulin <artur.rakhmatulin@gmail.com> Co-authored-by: Adam Christian <105929021+adam-christian-software@users.noreply.github.com> Co-authored-by: Mend Renovate <bot@renovateapp.com> Co-authored-by: JB Onofré <jbonofre@apache.org> Co-authored-by: Alexandre Dutra <adutra@apache.org>
As promised, this is a proposal for an alternative implementation of
InMemoryBufferPolarisPersistenceEventListener, leveraging Mutiny and Smallrye Fault Tolerance, and avoiding dealing with buffering and flushing manually.For now I'm proposing a separate CDI bean, to facilitate reviews or comparisons.
But if the approach taken in this PR is considered a better solution for buffering events, I would suggest to replace
InMemoryBufferPolarisPersistenceEventListenercompletely.