Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-8332][DataProxy] Return original content for MSG_ORIGINAL_RETURN type messages #8333

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,12 @@ public void fileMetricAddFailCnt(PackProfile packProfile, String topic, String r
if (packProfile instanceof SimplePackProfile) {
SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
StringBuilder statsKey = new StringBuilder(512)
.append(sinkName).append(AttrConstants.SEPARATOR)
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
.append(topic).append(AttrConstants.SEPARATOR)
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
.append(remoteId).append(AttrConstants.SEPARATOR)
.append(sinkName).append(AttrConstants.SEP_HASHTAG)
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEP_HASHTAG)
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEP_HASHTAG)
.append(topic).append(AttrConstants.SEP_HASHTAG)
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEP_HASHTAG)
.append(remoteId).append(AttrConstants.SEP_HASHTAG)
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
monitorIndex.addFailStats(statsKey.toString(), 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj, StringBuilder
if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
retData = buildBinMsgRspPackage(strBuff.toString(), msgObj.getUniq());
} else {
retData = buildTxtMsgRspPackage(msgType, strBuff.toString());
retData = buildTxtMsgRspPackage(msgType, strBuff.toString(), msgObj);
}
strBuff.delete(0, strBuff.length());
flushV0MsgPackage(source, channel, retData, msgObj.getAttr());
Expand Down Expand Up @@ -598,6 +598,43 @@ public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) {
return buffer;
}

/**
* Build default-msg response message ByteBuf
*
* @param msgType the message type
* @param attrs the return attribute
* @param msgObj the request message object
* @return ByteBuf
*/
private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCodec msgObj) {
int attrsLen = 0;
int bodyLen = 0;
byte[] backBody = null;
if (attrs != null) {
attrsLen = attrs.length();
}
if (MsgType.MSG_ORIGINAL_RETURN.equals(msgType)) {
backBody = msgObj.getOrigBody();
if (backBody != null) {
bodyLen = backBody.length;
}
}
// backTotalLen = mstType + bodyLen + body + attrsLen + attrs
int backTotalLen = 1 + 4 + bodyLen + 4 + attrsLen;
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
buffer.writeInt(backTotalLen);
buffer.writeByte(msgType.getValue());
buffer.writeInt(bodyLen);
if (bodyLen > 0) {
buffer.writeBytes(backBody);
}
buffer.writeInt(attrsLen);
if (attrsLen > 0) {
buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
}
return buffer;
}

/**
* Build heartbeat response message ByteBuf
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public abstract class AbsV0MsgCodec {
protected int msgCount;
protected String origAttr = "";
protected byte[] bodyData;
protected byte[] origBody = null;
protected long dataTimeMs;
protected String groupId;
protected String streamId = "";
Expand Down Expand Up @@ -144,6 +145,10 @@ public String getStrRemoteIP() {
return strRemoteIP;
}

public byte[] getOrigBody() {
return origBody;
}

public long getMsgRcvTime() {
return msgRcvTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public boolean descMsg(BaseSource source, ByteBuf cb) throws Exception {
// extract body bytes
this.bodyData = new byte[bodyLen];
cb.getBytes(msgHeadPos + TXT_MSG_BODY_OFFSET, this.bodyData, 0, bodyLen);
if (MsgType.MSG_ORIGINAL_RETURN.equals(MsgType.valueOf(msgType))) {
this.origBody = new byte[bodyLen];
System.arraycopy(this.bodyData, 0, this.origBody, 0, bodyLen);
}
// get attribute length
int attrLen = cb.getInt(msgHeadPos + TXT_MSG_BODY_OFFSET + bodyLen);
if (attrLen < 0) {
Expand Down