Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192 Added operation counters in ServiceUnitStateChannelImpl #19410

Merged
merged 2 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Constructed;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.LeaderElectionServiceStarted;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.ChannelState.Started;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Assign;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Split;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.EventType.Unload;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Jittery;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
Expand All @@ -44,6 +47,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
Expand All @@ -63,7 +68,6 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
Expand Down Expand Up @@ -108,6 +112,40 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private long totalCleanupCancelledCnt = 0;
private volatile ChannelState channelState;

enum EventType {
Assign,
Split,
Unload
}

@Getter
@AllArgsConstructor
static class Counters {
private AtomicLong total;
private AtomicLong failure;
}

// operation metrics
final Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = Map.of(
Owned, new AtomicLong(),
Assigned, new AtomicLong(),
Released, new AtomicLong(),
Splitting, new AtomicLong(),
Free, new AtomicLong()
);
final Map<EventType, Counters> eventCounters = Map.of(
Assign, new Counters(new AtomicLong(), new AtomicLong()),
Split, new Counters(new AtomicLong(), new AtomicLong()),
Unload, new Counters(new AtomicLong(), new AtomicLong())
);
final Map<ServiceUnitState, Counters> handlerCounters = Map.of(
Owned, new Counters(new AtomicLong(), new AtomicLong()),
Assigned, new Counters(new AtomicLong(), new AtomicLong()),
Released, new Counters(new AtomicLong(), new AtomicLong()),
Splitting, new Counters(new AtomicLong(), new AtomicLong()),
Free, new Counters(new AtomicLong(), new AtomicLong())
);

enum ChannelState {
Closed(0),
Constructed(1),
Expand Down Expand Up @@ -151,7 +189,10 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) {
}

public synchronized void start() throws PulsarServerException {
validateChannelState(LeaderElectionServiceStarted, false);
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
}

try {
leaderElectionService.start();
this.channelState = LeaderElectionServiceStarted;
Expand Down Expand Up @@ -230,15 +271,24 @@ public synchronized void close() throws PulsarServerException {
}
}

private void validateChannelState(ChannelState targetState, boolean checkLowerIds) {
private boolean validateChannelState(ChannelState targetState, boolean checkLowerIds) {
int order = checkLowerIds ? -1 : 1;
if (Integer.compare(channelState.id, targetState.id) * order > 0) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
return false;
}
return true;
}

private boolean debug() {
return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}

public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
validateChannelState(LeaderElectionServiceStarted, true);
if (!validateChannelState(LeaderElectionServiceStarted, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}

return leaderElectionService.readCurrentLeader().thenApply(leader -> {
//expecting http://broker-xyz:port
// TODO: discard this protocol prefix removal
Expand Down Expand Up @@ -278,27 +328,35 @@ private boolean isChannelOwner() {
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
validateChannelState(Started, true);
ServiceUnitStateData data = tableview.get(serviceUnit);
if (data == null) {
return CompletableFuture.completedFuture(Optional.empty());
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
switch (data.state()) {

ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = data == null ? Free : data.state();
ownerLookUpCounters.get(state).incrementAndGet();
switch (state) {
case Owned, Splitting -> {
return CompletableFuture.completedFuture(Optional.of(data.broker()));
}
case Assigned, Released -> {
return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
}
case Free -> {
return CompletableFuture.completedFuture(Optional.empty());
}
default -> {
String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data);
log.error(errorMsg);
return FutureUtil.failedFuture(new IllegalStateException(errorMsg));
return CompletableFuture.failedFuture(new IllegalStateException(errorMsg));
}
}
}

public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
EventType eventType = Assign;
eventCounters.get(eventType).getTotal().incrementAndGet();
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
.whenComplete((__, ex) -> {
Expand All @@ -307,42 +365,65 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
if (!getOwnerRequest.isCompletedExceptionally()) {
getOwnerRequest.completeExceptionally(ex);
}
eventCounters.get(eventType).getFailure().incrementAndGet();
}
});

return getOwnerRequest;
}

public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
EventType eventType = Unload;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = unload.serviceUnit();
CompletableFuture<MessageId> future;
if (isTransferCommand(unload)) {
ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
unload.destBroker().get(), unload.sourceBroker());
return pubAsync(serviceUnit, next).thenApply(__ -> null);
future = pubAsync(serviceUnit, next);
} else {
future = tombstoneAsync(serviceUnit);
}
return tombstoneAsync(serviceUnit).thenApply(__ -> null);

return future.whenComplete((__, ex) -> {
if (ex != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
}
}).thenApply(__ -> null);
}

public CompletableFuture<Void> publishSplitEventAsync(Split split) {
EventType eventType = Split;
eventCounters.get(eventType).getTotal().incrementAndGet();
String serviceUnit = split.serviceUnit();
ServiceUnitStateData next = new ServiceUnitStateData(Splitting, split.sourceBroker());
return pubAsync(serviceUnit, next).thenApply(__ -> null);
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
if (ex != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
}
}).thenApply(__ -> null);
}

private void handle(String serviceUnit, ServiceUnitStateData data) {
long totalHandledRequests = getHandlerTotalCounter(data).incrementAndGet();
Copy link
Contributor

@gaoran10 gaoran10 Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to record all events for all brokers? Or record event metrics only for the target broker.

Copy link
Contributor Author

@heesung-sn heesung-sn Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to record all events for all brokers.

The goal is to monitor the consistency of the tableviews across all brokers.

We need to monitor if the rate of the increase of HandlerTotalCounters( increase / x sec) is the same across all brokers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for your explanation.

if (log.isDebugEnabled()) {
log.info("{} received a handle request for serviceUnit:{}, data:{}",
lookupServiceAddress, serviceUnit, data);
log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}",
lookupServiceAddress, serviceUnit, data, totalHandledRequests);
}

ServiceUnitState state = data == null ? Free : data.state();
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
case Assigned -> handleAssignEvent(serviceUnit, data);
case Released -> handleReleaseEvent(serviceUnit, data);
case Splitting -> handleSplitEvent(serviceUnit, data);
case Free -> handleFreeEvent(serviceUnit);
default -> throw new IllegalStateException("Failed to handle channel data:" + data);
try {
switch (state) {
case Owned -> handleOwnEvent(serviceUnit, data);
case Assigned -> handleAssignEvent(serviceUnit, data);
case Released -> handleReleaseEvent(serviceUnit, data);
case Splitting -> handleSplitEvent(serviceUnit, data);
case Free -> handleFreeEvent(serviceUnit);
default -> throw new IllegalStateException("Failed to handle channel data:" + data);
}
} catch (Throwable e){
log.error("Failed to handle the event. serviceUnit:{}, data:{}, handlerFailureCount:{}",
serviceUnit, data, getHandlerFailureCounter(data).incrementAndGet(), e);
throw e;
}
}

Expand All @@ -362,19 +443,46 @@ private static String getLogEventTag(ServiceUnitStateData data) {
isTransferCommand(data) ? "Transfer:" + data.state() : data.state().toString();
}

private AtomicLong getHandlerTotalCounter(ServiceUnitStateData data) {
return getHandlerCounter(data, true);
}

private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
return getHandlerCounter(data, false);
}

private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {
var state = data == null ? Free : data.state();
var counter = total
? handlerCounters.get(state).getTotal() : handlerCounters.get(state).getFailure();
if (counter == null) {
throw new IllegalStateException("Unknown state:" + state);
}
return counter;
}

private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
if (e == null) {
if (log.isDebugEnabled() || isTransferCommand(data)) {
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}",
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).get();
log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, "
+ "totalHandledRequests{}, totalFailedRequests:{}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the log level is debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is meant for info. If this is a transfer command, we want to log it at the info level.

In other PRs, I will clean the debug logs based on isLoadBalancerDebugModeEnabled.

private boolean debug() {
return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}

lookupServiceAddress, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next);
next == null ? "" : next,
handlerTotalCount, handlerFailureCount
);
}
} else {
log.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}",
long handlerTotalCount = getHandlerTotalCounter(data).get();
long handlerFailureCount = getHandlerFailureCounter(data).incrementAndGet();
log.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, "
+ "totalHandledRequests{}, totalFailedRequests:{}",
lookupServiceAddress, getLogEventTag(data), serviceUnit,
data == null ? "" : data,
next == null ? "" : next,
handlerTotalCount, handlerFailureCount,
e);
}
}
Expand All @@ -387,7 +495,6 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.broker())) {
log(null, serviceUnit, data, null);
}

}

private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
Expand All @@ -401,7 +508,6 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
}

private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {

if (isTargetBroker(data.sourceBroker())) {
ServiceUnitStateData next = new ServiceUnitStateData(Owned, data.broker(), data.sourceBroker());
// TODO: when close, pass message to clients to connect to the new broker
Expand Down Expand Up @@ -432,7 +538,10 @@ private void handleFreeEvent(String serviceUnit) {
}

private CompletableFuture<MessageId> pubAsync(String serviceUnit, ServiceUnitStateData data) {
validateChannelState(Started, true);
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Invalid channel state:" + channelState.name()));
}
CompletableFuture<MessageId> future = new CompletableFuture<>();
producer.newMessage()
.key(serviceUnit)
Expand Down
Loading