-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP28][PROXY] Pulsar Proxy Gateway Improvement #3915
Conversation
Work in progress I am still working on this feature, no need to review currently. |
547e23c
to
9fe61ce
Compare
@foreversunyao I prepended |
sure, still testing if it can cover all scenarios |
2e65a09
to
e562a50
Compare
run cpp tests |
cb6e924
to
67aff21
Compare
53a0c06
to
345fc3a
Compare
@sijie , should be ready to review, though there are two minor differences with original PIP-28.
|
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
tested it locally, and passed
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good. left some comments, most of them are around coding styles (i.e. spaces).
also it would be good to have some unit tests or integration tests for this feature. or at least make sure the proxy is working well when proxyLogLevel > 0
as when proxyLogLevel == 0
.
|
||
private Channel channel; | ||
//inbound | ||
protected static final String frontendConn = "frontendconn"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use upper case for CONSTANTS.
protected static final String frontendConn = "frontendconn"; | |
protected static final String FRONTEND_CONN = "frontendconn"; |
//inbound | ||
protected static final String frontendConn = "frontendconn"; | ||
//outbound | ||
protected static final String backendConn = "backendconn"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected static final String backendConn = "backendconn"; | |
protected static final String BACKEND_CONN = "backendconn"; |
@@ -148,6 +148,15 @@ | |||
doc = "Path for the file used to determine the rotation status for the proxy instance" | |||
+ " when responding to service discovery health checks" | |||
) | |||
private Integer proxyLogLevel = 0; | |||
|
|||
@FieldContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the @FieldContext
was added to the wrong place. it should be for proxyLogLevel
, and the one for proxyLogLevel
should be for statusFilePath
.
this.connType=type; | ||
} | ||
|
||
private void logging (Channel conn,PulsarApi.BaseCommand.Type cmdtype,String info,List<RawMessage> messages) throws Exception{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private void logging (Channel conn,PulsarApi.BaseCommand.Type cmdtype,String info,List<RawMessage> messages) throws Exception{ | |
private void logging(Channel conn, PulsarApi.BaseCommand.Type cmdtype, String info, List<RawMessage> messages) throws Exception { |
|
||
private void logging (Channel conn,PulsarApi.BaseCommand.Type cmdtype,String info,List<RawMessage> messages) throws Exception{ | ||
|
||
if (messages !=null){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (messages !=null){ | |
if (messages !=null) { |
if (messages !=null){ | ||
// lag | ||
for (int i=0;i <messages.size();i++){ | ||
info = info + "["+(System.currentTimeMillis() - messages.get(i).getPublishTime())+"] "+new String(ByteBufUtil.getBytes((messages.get(i)).getData()),"UTF8"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
info = info + "["+(System.currentTimeMillis() - messages.get(i).getPublishTime())+"] "+new String(ByteBufUtil.getBytes((messages.get(i)).getData()),"UTF8"); | |
info = info + "["+ (System.currentTimeMillis() - messages.get(i).getPublishTime()) + "] " + new String(ByteBufUtil.getBytes((messages.get(i)).getData()), "UTF8"); |
|
||
switch (this.connType){ | ||
case ParserProxyHandler.frontendConn: | ||
log.info("conn:{} cmd:{} msg:{}","["+ conn.remoteAddress().toString()+""+conn.localAddress()+channel.localAddress()+channel.remoteAddress()+"]",cmdtype,info); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see conn.localAddress()
is used twice in this logging statement. is that done intentionally?
log.info("conn:{} cmd:{} msg:{}","["+ conn.remoteAddress().toString()+""+conn.localAddress()+channel.localAddress()+channel.remoteAddress()+"]",cmdtype,info); | |
log.info("conn:{} cmd:{} msg:{}", "[" + conn.remoteAddress().toString() + conn.localAddress() + channel.localAddress() + channel.remoteAddress() +"]", cmdtype,info); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, actually I was trying to add the feautre(whitelist for client as well as broker side), this need get broker ip/port from inbound channel, but right now it would be in next PR. I will update this.
case ParserProxyHandler.frontendConn: | ||
log.info("conn:{} cmd:{} msg:{}","["+ conn.remoteAddress().toString()+""+conn.localAddress()+channel.localAddress()+channel.remoteAddress()+"]",cmdtype,info); | ||
break; | ||
case ParserProxyHandler.backendConn: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as above
ByteBuf buffer = (ByteBuf)(msg); | ||
|
||
try { | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
|
||
log.error("{},{},{}",e.getMessage(),e.getStackTrace(),e.getCause()); | ||
|
||
}finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}finally { | |
} finally { |
@sijie , thanks for reviewing, will update soon. |
5b8c218
to
a7d2ced
Compare
@sijie , I updated the code according to what you mentioned, and also added a unitest ProxyParserTest.java |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
import java.util.Map; | ||
|
||
|
||
public class ParserProxyHandler extends ChannelInboundHandlerAdapter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any intention to extend the functionality for this class?
Seems currently it is mainly for proxy logging handling. How about rename this class name to make it clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for reviewing.
yes, I would add some whitelist feature on client ip/broker ip ,topic name and msg content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parsing protocol should be the first step for other future funtions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for the work. left a minor comments.
Motivation
https://github.com/apache/pulsar/wiki/PIP-28%3A-Pulsar-Proxy-Gateway-Improvement
Modifications
added a new handler ParserProxyHandler.java to parse requests independently and output
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Still working on this feature, will make a document after it's almost done