From 2c11944399e5047f11b7133a66250edd8b2ef0c7 Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Mon, 23 Sep 2024 21:05:04 +0800 Subject: [PATCH] todo: figure out todo list --- .../sdk/dataproxy/DefaultMessageSender.java | 164 ++++-------------- 1 file changed, 33 insertions(+), 131 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index d55b1a90c72..cf793bac6e8 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -43,7 +43,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; public class DefaultMessageSender implements MessageSender { @@ -66,7 +65,6 @@ public class DefaultMessageSender implements MessageSender { private boolean isReport = false; private boolean isSupportLF = false; private int cpsSize = ConfigConstants.COMPRESS_SIZE; - private boolean sendSuccess = true; private final int senderMaxRetry; public DefaultMessageSender(ProxyClientConfig configure) throws Exception { @@ -195,80 +193,14 @@ public String getSDKVersion() { return ConfigConstants.PROXY_SDK_VERSION; } - public void setSendSuccess(boolean sendSuccess) { - this.sendSuccess = sendSuccess; - } - - public boolean isSendSuccess() { - return sendSuccess; - } - - private SendResult retryWhenSendMessageFail(Function sendOperation, - DefaultMessageSender initialSender) { - int attempts = 0; - SendResult sendResult = null; - DefaultMessageSender currentSender = initialSender; - while (attempts < this.senderMaxRetry) { - sendResult = sendOperation.apply(currentSender); - if (sendResult != null && sendResult.equals(SendResult.OK)) { - currentSender.setSendSuccess(true); - return sendResult; - } - currentSender.setSendSuccess(false); - // try to get another success sender - DefaultMessageSender anotherSender = getAnotherSuccessSender(currentSender); - if (anotherSender != null) { - currentSender = anotherSender; - } else { - break; - } - attempts++; - } - - return sendResult; - } - - private String retryWhenSendMessageIndexFail(Function sendOperation, - DefaultMessageSender initialSender) { - int attempts = 0; - String sendIndexResult = null; - DefaultMessageSender currentSender = initialSender; - while (attempts < this.senderMaxRetry) { - sendIndexResult = sendOperation.apply(currentSender); - if (sendIndexResult != null && sendIndexResult.startsWith(SendResult.OK.toString())) { - currentSender.setSendSuccess(true); - return sendIndexResult; - } - currentSender.setSendSuccess(false); - // try to get success sender - DefaultMessageSender randomSuccessSender = getAnotherSuccessSender(currentSender); - if (randomSuccessSender != null) { - currentSender = randomSuccessSender; - } else { - break; - } - attempts++; - } - - return sendIndexResult; - } - - private DefaultMessageSender getAnotherSuccessSender(DefaultMessageSender currentSender) { - for (DefaultMessageSender sender : CACHE_SENDER.values()) { - if (sender != null && sender.isSendSuccess() && !sender.equals(currentSender)) { - return sender; - } - } - return null; - } + // TODO: 业务线程锁住对业务会有影响,再发送 到 退出 之间要退出,这块如何处理,确保数据不丢 + // TODO: 仅仅对当前节点做重试 @Deprecated public SendResult sendMessage(byte[] body, String attributes, String msgUUID, long timeout, TimeUnit timeUnit) { - Function sendOperation = - (currentSender) -> currentSender.sender.syncSendMessage( - new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout, + timeUnit); } public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, @@ -308,9 +240,7 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { proxySend = "&" + proxySend; @@ -318,18 +248,15 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long final String finalProxySend = proxySend; final long finalDt = dt; if (isCompressEnd) { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId=" - + streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(), - this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId=" + + streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(), + this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(body, + return sender.syncSendMessage( + new EncodeObject(body, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + finalProxySend, - idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, - timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + idGenerator.getNextId(), this.getMsgtype(), false, groupId), + msgUUID, timeout, timeUnit); } } @@ -376,24 +303,16 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); if (isCompressEnd) { attrs.append("&cp=snappy"); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(body, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), true, groupId), - msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(body, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, - timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } } return null; @@ -436,9 +355,7 @@ public SendResult sendMessage(List bodyList, String groupId, String stre isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { proxySend = "&" + proxySend; @@ -446,18 +363,15 @@ public SendResult sendMessage(List bodyList, String groupId, String stre final long finalDt = dt; final String finalProxySend = proxySend; if (isCompress) { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + bodyList.size() + finalProxySend, - idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(bodyList, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + + bodyList.size() + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + finalDt + "&cnt=" + bodyList.size() + finalProxySend, - idGenerator.getNextId(), - this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(bodyList, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size() + + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } } return null; @@ -500,25 +414,17 @@ public SendResult sendMessage(List bodyList, String groupId, String stre isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId) .append("&dt=").append(dt).append("&cnt=").append(bodyList.size()); if (isCompress) { attrs.append("&cp=snappy"); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), true, groupId), - msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), false, groupId), - msgUUID, timeout, timeUnit); - return retryWhenSendMessageFail(sendOperation, this); + return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } } return null; @@ -909,9 +815,7 @@ public String sendMessageData(List bodyList, String groupId, String stre isReport, isGroupIdTransfer, dt / 1000, sid, groupId, streamId, attrs.toString(), "data", ""); encodeObject.setSupportLF(isSupportLF); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageIndexFail(sendOperation, this); + return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); } return null; } @@ -926,9 +830,7 @@ private String sendMetric(byte[] body, String groupId, String streamId, long dt, if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport, isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip); - Function sendOperation = (currentSender) -> currentSender.sender - .syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); - return retryWhenSendMessageIndexFail(sendOperation, this); + return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); } return null; }