Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

:fix: (metrics) add publish and subscribe time (metrics ot linked to the code) #4018

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public void afterDestroyConnection(RemotingConnection connection) throws ActiveM
*/
@Override
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
subscribeMetric.getAllowedMessages().inc();
ActiveMQServerPlugin.super.afterCreateConsumer(consumer);
Context subscribeContext = subscribeMetric.getTime().time();
try {
subscribeMetric.getAllowedMessages().inc();
ActiveMQServerPlugin.super.afterCreateConsumer(consumer);
}
finally {
subscribeContext.stop();
}
}

/**
Expand All @@ -220,41 +226,47 @@ public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQExceptio
@Override
public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
String address = message.getAddress();
int messageSize = message.getEncodeSize();
SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection()));
logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName());
message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession())));
message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond());
message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address));
if (!sessionContext.isInternal()) {
if (isLwt(address)) {
//handle the missing message case
logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId());
sessionContext.setMissing(true);
}
// FIX #164
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId())));
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false);
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}",
messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId());
}
publishMetric.getMessageSizeAllowed().update(messageSize);
} else {
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {}",
messageSize, address);
Context sendContext = publishMetric.getTime().time();
try {
String address = message.getAddress();
int messageSize = message.getEncodeSize();
SessionContext sessionContext = serverContext.getSecurityContext().getSessionContextWithCacheFallback(pluginUtility.getConnectionId(session.getRemotingConnection()));
logger.debug("Publishing message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CLIENT_ID, sessionContext.getClientId());
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTOR_NAME, sessionContext.getConnectorName());
message.putStringProperty(MessageConstants.HEADER_KAPUA_SESSION, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaSession())));
message.putLongProperty(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, KapuaDateUtils.getKapuaSysDate().getEpochSecond());
message.putStringProperty(MessageConstants.HEADER_KAPUA_MESSAGE_TYPE, getMessgeType(address));
if (!sessionContext.isInternal()) {
if (isLwt(address)) {
//handle the missing message case
logger.info("Detected missing message for client {}... Flag session to tell disconnector to avoid disconnect event sending", sessionContext.getClientId());
sessionContext.setMissing(true);
}
// FIX #164
message.putStringProperty(MessageConstants.HEADER_KAPUA_CONNECTION_ID, Base64.getEncoder().encodeToString(SerializationUtils.serialize(sessionContext.getKapuaConnectionId())));
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, false);
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {} - account id: {} - username: {} - clientId: {}",
messageSize, address, sessionContext.getAccountName(), sessionContext.getUsername(), sessionContext.getClientId());
}
publishMetric.getMessageSizeAllowed().update(messageSize);
} else {
if (publishInfoMessageSizeLimit < messageSize) {
logger.info("Published message size over threshold. size: {} - destination: {}",
messageSize, address);
}
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true);
publishMetric.getMessageSizeAllowedInternal().update(messageSize);
}
message.putBooleanProperty(MessageConstants.HEADER_KAPUA_BROKER_CONTEXT, true);
publishMetric.getMessageSizeAllowedInternal().update(messageSize);
message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address);
serverContext.getAddressAccessTracker().update(address);
logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue);
}
finally {
sendContext.stop();
}
message.putStringProperty(MessageConstants.PROPERTY_ORIGINAL_TOPIC, address);
serverContext.getAddressAccessTracker().update(address);
logger.debug("Published message on address {} from clientId: {} - clientIp: {}", address, sessionContext.getClientId(), sessionContext.getClientIp());
ActiveMQServerPlugin.super.beforeSend(session, tx, message, direct, noAutoCreateQueue);
}

private boolean isLwt(String originalTopic) {
Expand Down
Loading