Skip to content

Commit

Permalink
[Dataflow Streaming] Add Channelz staus page exporting GRPC channelz …
Browse files Browse the repository at this point in the history
…data (#30211)

Co-authored-by: Arun Pandian <pandiana@google.com>
  • Loading branch information
arunpandianp and arunpandianp authored Feb 16, 2024
1 parent 1bbe07e commit 3693174
Show file tree
Hide file tree
Showing 11 changed files with 454 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version",
truth : "com.google.truth:truth:1.1.5",
threetenbp : "org.threeten:threetenbp:1.6.8",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.1",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
vendored_guava_32_1_2_jre : "org.apache.beam:beam-vendor-guava-32_1_2-jre:0.1",
vendored_calcite_1_28_0 : "org.apache.beam:beam-vendor-calcite-1_28_0:0.2",
woodstox_core_asl : "org.codehaus.woodstox:woodstox-core-asl:4.4.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ public Dataflow create(PipelineOptions options) {

void setWindmillGetDataStreamCount(int value);

@Description("If true, will only show windmill service channels on /channelz")
@Default.Boolean(true)
boolean getChannelzShowOnlyWindmillServiceChannels();

void setChannelzShowOnlyWindmillServiceChannels(boolean value);

/**
* The amount of time before UnboundedReaders are considered idle and closed during streaming
* execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -211,6 +212,7 @@ public class StreamingDataflowWorker {

private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5);
private static final Random clientIdGenerator = new Random();
private static final String CHANNELZ_PATH = "/channelz";
final WindmillStateCache stateCache;
// Maps from computation ids to per-computation state.
private final ConcurrentMap<String, ComputationState> computationMap;
Expand Down Expand Up @@ -735,6 +737,13 @@ public void startStatusPages() {
if (debugCaptureManager != null) {
debugCaptureManager.start();
}

if (windmillServiceEnabled) {
ChannelzServlet channelzServlet = new ChannelzServlet(CHANNELZ_PATH, options, windmillServer);
statusPages.addServlet(channelzServlet);
statusPages.addCapturePage(channelzServlet);
}

statusPages.addServlet(stateCache.statusServlet());
statusPages.addServlet(new SpecsServlet());

Expand Down Expand Up @@ -2081,6 +2090,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) thro
}

private class MetricsDataProvider implements StatusDataProvider {

@Override
public void appendSummaryHtml(PrintWriter writer) {
writer.println(workUnitExecutor.summaryHtml());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/**
Expand Down Expand Up @@ -53,6 +54,11 @@ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws IOExc
// This class is used for windmill appliance and local runner tests.
}

@Override
public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
return ImmutableSet.of();
}

@Override
public boolean isReady() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;

/** Stub for communicating with a Windmill server. */
Expand All @@ -40,6 +41,11 @@ public abstract class WindmillServerStub implements StatusDataProvider {
*/
public abstract void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) throws IOException;

/*
* Returns the windmill service endpoints set by setWindmillServiceEndpoints
*/
public abstract ImmutableSet<HostAndPort> getWindmillServiceEndpoints();

/** Returns true iff this WindmillServerStub is ready for making API calls. */
public abstract boolean isReady();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.channelz.v1.*;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.protobuf.services.ChannelzService;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;

/** Respond to /path with the GRPC channelz data. */
@Internal
public class ChannelzServlet extends BaseStatusServlet implements DebugCapture.Capturable {

private static final int MAX_TOP_CHANNELS_TO_RETURN = 500;

private final ChannelzService channelzService;
private final WindmillServerStub windmillServerStub;
private final boolean showOnlyWindmillServiceChannels;

public ChannelzServlet(
String path, StreamingDataflowWorkerOptions options, WindmillServerStub windmillServerStub) {
super(path);
channelzService = ChannelzService.newInstance(MAX_TOP_CHANNELS_TO_RETURN);
this.windmillServerStub = windmillServerStub;
showOnlyWindmillServiceChannels = options.getChannelzShowOnlyWindmillServiceChannels();
}

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter writer = response.getWriter();
captureData(writer);
}

@Override
public String pageName() {
return getPath();
}

@Override
public void captureData(PrintWriter writer) {
writer.println("<html>");
writer.println("<h1>Channelz</h1>");
appendTopChannels(writer);
writer.println("</html>");
}

// channelz proto says there won't be cycles in the ref graph.
// we track visited ids to be defensive and prevent any accidental cycles.
private static class VisitedSets {

Set<Long> channels = new HashSet<>();
Set<Long> subchannels = new HashSet<>();
}

private void appendTopChannels(PrintWriter writer) {
SettableFuture<GetTopChannelsResponse> future = SettableFuture.create();
// IDEA: If there are more than MAX_TOP_CHANNELS_TO_RETURN top channels
// in the worker, we might not return all the windmill channels. If we run into
// such situations, this logic can be modified to loop till we see an empty
// GetTopChannelsResponse response with the end bit set.
channelzService.getTopChannels(
GetTopChannelsRequest.newBuilder().build(), getStreamObserver(future));
GetTopChannelsResponse topChannelsResponse;
try {
topChannelsResponse = future.get();
} catch (Exception e) {
String msg = "Failed to get channelz: " + e.getMessage();
writer.println(msg);
return;
}

List<Channel> topChannels = topChannelsResponse.getChannelList();
if (showOnlyWindmillServiceChannels) {
topChannels = filterWindmillChannels(topChannels);
}
writer.println("<h2>Top Level Channels</h2>");
writer.println("<table border='1'>");
VisitedSets visitedSets = new VisitedSets();
for (Channel channel : topChannels) {
writer.println("<tr>");
writer.println("<td>");
writer.println("TopChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channel, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
writer.println("</table>");
}

private List<Channel> filterWindmillChannels(List<Channel> channels) {
ImmutableSet<HostAndPort> windmillServiceEndpoints =
windmillServerStub.getWindmillServiceEndpoints();
Set<String> windmillServiceHosts =
windmillServiceEndpoints.stream().map(HostAndPort::getHost).collect(Collectors.toSet());
List<Channel> windmillChannels = new ArrayList<>();
for (Channel channel : channels) {
for (String windmillServiceHost : windmillServiceHosts) {
if (channel.getData().getTarget().contains(windmillServiceHost)) {
windmillChannels.add(channel);
break;
}
}
}
return windmillChannels;
}

private void appendChannels(
List<ChannelRef> channelRefs, PrintWriter writer, VisitedSets visitedSets) {
for (ChannelRef channelRef : channelRefs) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Channel: " + channelRef.getChannelId());
writer.println("</td>");
writer.println("<td>");
appendChannel(channelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendChannel(ChannelRef channelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.channels.contains(channelRef.getChannelId())) {
String msg = "Duplicate Channel Id: " + channelRef;
writer.println(msg);
return;
}
visitedSets.channels.add(channelRef.getChannelId());
SettableFuture<GetChannelResponse> future = SettableFuture.create();
channelzService.getChannel(
GetChannelRequest.newBuilder().setChannelId(channelRef.getChannelId()).build(),
getStreamObserver(future));
Channel channel;
try {
channel = future.get().getChannel();
} catch (Exception e) {
String msg = "Failed to get Channel: " + channelRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}
appendChannel(channel, writer, visitedSets);
}

private void appendChannel(Channel channel, PrintWriter writer, VisitedSets visitedSets) {
writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>");
writer.println("ChannelId: " + channel.getRef().getChannelId());
writer.println("</td>");
writer.println("<td><pre>" + channel);
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(channel.getChannelRefList(), writer, visitedSets);
appendSubChannels(channel.getSubchannelRefList(), writer, visitedSets);
appendSockets(channel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSubChannels(
List<SubchannelRef> subchannelRefList, PrintWriter writer, VisitedSets visitedSets) {
for (SubchannelRef subchannelRef : subchannelRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Sub Channel: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td>");
appendSubchannel(subchannelRef, writer, visitedSets);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSubchannel(
SubchannelRef subchannelRef, PrintWriter writer, VisitedSets visitedSets) {
if (visitedSets.subchannels.contains(subchannelRef.getSubchannelId())) {
String msg = "Duplicate Subchannel Id: " + subchannelRef;
writer.println(msg);
return;
}
visitedSets.subchannels.add(subchannelRef.getSubchannelId());
SettableFuture<GetSubchannelResponse> future = SettableFuture.create();
channelzService.getSubchannel(
GetSubchannelRequest.newBuilder().setSubchannelId(subchannelRef.getSubchannelId()).build(),
getStreamObserver(future));
Subchannel subchannel;
try {
subchannel = future.get().getSubchannel();
} catch (Exception e) {
String msg = "Failed to get Subchannel: " + subchannelRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}

writer.println("<table border='1'>");
writer.println("<tr>");
writer.println("<td>SubchannelId: " + subchannelRef.getSubchannelId());
writer.println("</td>");
writer.println("<td><pre>" + subchannel.toString());
writer.println("</pre></td>");
writer.println("</tr>");
appendChannels(subchannel.getChannelRefList(), writer, visitedSets);
appendSubChannels(subchannel.getSubchannelRefList(), writer, visitedSets);
appendSockets(subchannel.getSocketRefList(), writer);
writer.println("</table>");
}

private void appendSockets(List<SocketRef> socketRefList, PrintWriter writer) {
for (SocketRef socketRef : socketRefList) {
writer.println("<tr>");
writer.println("<td>");
writer.println("Socket: " + socketRef.getSocketId());
writer.println("</td>");
writer.println("<td>");
appendSocket(socketRef, writer);
writer.println("</td>");
writer.println("</tr>");
}
}

private void appendSocket(SocketRef socketRef, PrintWriter writer) {
SettableFuture<GetSocketResponse> future = SettableFuture.create();
channelzService.getSocket(
GetSocketRequest.newBuilder().setSocketId(socketRef.getSocketId()).build(),
getStreamObserver(future));
Socket socket;
try {
socket = future.get().getSocket();
} catch (Exception e) {
String msg = "Failed to get Socket: " + socketRef;
writer.println(msg + " Exception: " + e.getMessage());
return;
}
writer.println("<pre>" + socket + "</pre>");
}

private <T> StreamObserver<T> getStreamObserver(SettableFuture<T> future) {
return new StreamObserver<T>() {
@Nullable T response = null;

@Override
public void onNext(T message) {
response = message;
}

@Override
public void onError(Throwable throwable) {
future.setException(throwable);
}

@Override
public void onCompleted() {
future.set(response);
}
};
}
}
Loading

0 comments on commit 3693174

Please sign in to comment.