Skip to content

Commit

Permalink
todo: figure out todo list
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 23, 2024
1 parent b5aac77 commit 2c11944
Showing 1 changed file with 33 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

public class DefaultMessageSender implements MessageSender {

Expand All @@ -66,7 +65,6 @@ public class DefaultMessageSender implements MessageSender {
private boolean isReport = false;
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private boolean sendSuccess = true;
private final int senderMaxRetry;

public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
Expand Down Expand Up @@ -195,80 +193,14 @@ public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
}

public void setSendSuccess(boolean sendSuccess) {
this.sendSuccess = sendSuccess;
}

public boolean isSendSuccess() {
return sendSuccess;
}

private SendResult retryWhenSendMessageFail(Function<DefaultMessageSender, SendResult> sendOperation,
DefaultMessageSender initialSender) {
int attempts = 0;
SendResult sendResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < this.senderMaxRetry) {
sendResult = sendOperation.apply(currentSender);
if (sendResult != null && sendResult.equals(SendResult.OK)) {
currentSender.setSendSuccess(true);
return sendResult;
}
currentSender.setSendSuccess(false);
// try to get another success sender
DefaultMessageSender anotherSender = getAnotherSuccessSender(currentSender);
if (anotherSender != null) {
currentSender = anotherSender;
} else {
break;
}
attempts++;
}

return sendResult;
}

private String retryWhenSendMessageIndexFail(Function<DefaultMessageSender, String> sendOperation,
DefaultMessageSender initialSender) {
int attempts = 0;
String sendIndexResult = null;
DefaultMessageSender currentSender = initialSender;
while (attempts < this.senderMaxRetry) {
sendIndexResult = sendOperation.apply(currentSender);
if (sendIndexResult != null && sendIndexResult.startsWith(SendResult.OK.toString())) {
currentSender.setSendSuccess(true);
return sendIndexResult;
}
currentSender.setSendSuccess(false);
// try to get success sender
DefaultMessageSender randomSuccessSender = getAnotherSuccessSender(currentSender);
if (randomSuccessSender != null) {
currentSender = randomSuccessSender;
} else {
break;
}
attempts++;
}

return sendIndexResult;
}

private DefaultMessageSender getAnotherSuccessSender(DefaultMessageSender currentSender) {
for (DefaultMessageSender sender : CACHE_SENDER.values()) {
if (sender != null && sender.isSendSuccess() && !sender.equals(currentSender)) {
return sender;
}
}
return null;
}
// TODO: 业务线程锁住对业务会有影响,再发送 到 退出 之间要退出,这块如何处理,确保数据不丢
// TODO: 仅仅对当前节点做重试

@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
long timeout, TimeUnit timeUnit) {
Function<DefaultMessageSender, SendResult> sendOperation =
(currentSender) -> currentSender.sender.syncSendMessage(
new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout,
timeUnit);
}

public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
Expand Down Expand Up @@ -308,28 +240,23 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
final String finalProxySend = proxySend;
final long finalDt = dt;
if (isCompressEnd) {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
+ streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body,
return sender.syncSendMessage(
new EncodeObject(body,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout,
timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
}
}

Expand Down Expand Up @@ -376,24 +303,16 @@ public SendResult sendMessage(byte[] body, String groupId, String streamId, long
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(body, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID,
timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
Expand Down Expand Up @@ -436,28 +355,23 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
final long finalDt = dt;
final String finalProxySend = proxySend;
if (isCompress) {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + bodyList.size() + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt="
+ bodyList.size() + finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
+ "&dt=" + finalDt + "&cnt=" + bodyList.size() + finalProxySend,
idGenerator.getNextId(),
this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size()
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
Expand Down Expand Up @@ -500,25 +414,17 @@ public SendResult sendMessage(List<byte[]> bodyList, String groupId, String stre
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, attrs.toString());
encodeObject.setSupportLF(isSupportLF);
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
} else {
Function<DefaultMessageSender, SendResult> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessage(new EncodeObject(bodyList, attrs.toString(),
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
msgUUID, timeout, timeUnit);
return retryWhenSendMessageFail(sendOperation, this);
return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
Expand Down Expand Up @@ -909,9 +815,7 @@ public String sendMessageData(List<byte[]> bodyList, String groupId, String stre
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
Function<DefaultMessageSender, String> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageIndexFail(sendOperation, this);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
}
return null;
}
Expand All @@ -926,9 +830,7 @@ private String sendMetric(byte[] body, String groupId, String streamId, long dt,
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip);
Function<DefaultMessageSender, String> sendOperation = (currentSender) -> currentSender.sender
.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
return retryWhenSendMessageIndexFail(sendOperation, this);
return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit);
}
return null;
}
Expand Down

0 comments on commit 2c11944

Please sign in to comment.