-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added new entrypoint for reader to websocket proxy #620
Conversation
// /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic | ||
checkArgument(parts.size() == 8, "Invalid topic name format"); | ||
checkArgument(parts.get(1).equals("ws")); | ||
checkArgument(parts.get(3).equals("persistent")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about non-persistent
topics? I think we should support them as well in the WebSocket interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
support non-persistent
topics and delete ReaderHandler#checkRequestURI() because
AbstractWebSocketHandler
class check the same.
retest this please |
retest this please |
1 similar comment
retest this please |
service.getExecutor().execute(() -> receiveMessage()); | ||
} | ||
}).exceptionally(exception -> { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a debug log here?
} | ||
updateDeliverMsgStat(msgSize); | ||
int pending = pendingMessages.getAndDecrement(); | ||
if (pending >= maxPendingMessages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be tricky.. here, in reader we are keep pushing messages to the client which may blow up client-memory. so, should we follow the same semantic as normal consumer where initially proxy will send queue-size messages to client and on every acknowledgement/readNext
proxy send one more message.??
@rdhabalia |
service.getExecutor().execute(() -> receiveMessage()); | ||
} else { | ||
// Resume delivery | ||
receiveMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should't we stop delivery if pending
reached to maxPendingMessages
else it will keep sending messages to the client ?
Should we do the similar like ConsumerHandler, after reaching maxPendingMessages
, proxy-reader will send more message when it receives a new readNext
request from client, same way consumer receives ack-request
.? If that seems feasible solution then we can document the semantic into websocket-reader-doc as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia
Please let me confirm.
I will try to change readerHandler
like the following codes.
Does this match what you mean ?
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] message is delivered successfully to {} ", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
}
updateDeliverMsgStat(msgSize);
- pendingMessages.getAndDecrement();
}
.
.
.
int pending = pendingMessages.incrementAndGet();
if (pending < maxPendingMessages) {
// Start next read in a separate thread to avoid recursion
service.getExecutor().execute(() -> receiveMessage());
- } else {
- // Resume delivery
- receiveMessage();
}
.
.
.
@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
+ int pending = pendingMessages.getAndDecrement();
+ if (pending >= maxPendingMessages) {
+ // Resume delivery
+ receiveMessage();
+ }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this would control message-delivery to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your confirmation.
I will try to implement it.
@rdhabalia |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivation
no reader entrypoint on websocket proxy .
Modifications
added reader entrypoint to websocket proxy.
Result
we can use reader through websocket proxy