Skip to content

Commit

Permalink
[feature](profile) profilev2 distinguish Sink and Operator in pipelin…
Browse files Browse the repository at this point in the history
…eX (apache#25491)

* update

* update
  • Loading branch information
Mryange authored and 胥剑旭 committed Dec 14, 2023
1 parent 791e9a6 commit 33b8225
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 20 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Status JoinProbeLocalState<DependencyType, Derived>::init(RuntimeState* state,
_probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
_join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
_build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
_probe_rows_counter = ADD_COUNTER(Base::profile(), "ProbeRows", TUnit::UNIT);
_probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1);

return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,67 @@
* Used for collecting information obtained from the profile.
*/
public class ProfileStatistics {
class PipelineXStatistics {
ArrayList<String> sinkInfos;
ArrayList<String> infos;

int sinkInstance = 0;
int operatorInstance = 0;

PipelineXStatistics() {
sinkInfos = new ArrayList<String>();
infos = new ArrayList<String>();
}

void add(boolean isSink, String str) {
if (isSink) {
sinkInfos.add(str);
} else {
infos.add(str);
}
}

void updateInstance(boolean isSink, int instance) {
if (isSink) {
sinkInstance = Math.max(sinkInstance, instance);
} else {
operatorInstance = Math.max(operatorInstance, instance);
}
}

String to_String() {
return null;
}

void getInfo(ArrayList<String> infos, String prefix, StringBuilder str) {
Collections.sort(infos);
for (String info : infos) {
str.append(prefix + info + "\n");
}
}

void getOperator(String prefix, StringBuilder str) {
str.append(prefix + "Instance num : " + operatorInstance + "\n");
getInfo(infos, prefix, str);
}

void getSink(String prefix, StringBuilder str) {
str.append(prefix + "Instance num : " + sinkInstance + "\n");
getInfo(sinkInfos, prefix, str);
}
}

// Record statistical information based on nodeid.
private HashMap<Integer, ArrayList<String>> statisticalInfo;
private HashMap<Integer, Integer> statisticalInfoInstance;

// Record statistical information based on nodeid(use in pipelineX).
private HashMap<Integer, PipelineXStatistics> pipelineXStatisticalInfo;

// Record statistical information based on fragment ID.
// "Currently used to record sink nodes.
private HashMap<Integer, ArrayList<String>> fragmentInfo;

private HashMap<Integer, Integer> fragmentInfoInstance;
private int fragmentId;

private boolean isDataSink;
Expand All @@ -40,62 +94,96 @@ public class ProfileStatistics {

public ProfileStatistics(boolean isPipelineX) {
statisticalInfo = new HashMap<Integer, ArrayList<String>>();
statisticalInfoInstance = new HashMap<Integer, Integer>();

fragmentInfo = new HashMap<Integer, ArrayList<String>>();
pipelineXStatisticalInfo = new HashMap<Integer, PipelineXStatistics>();
fragmentInfoInstance = new HashMap<Integer, Integer>();
fragmentId = 0;
isDataSink = false;
this.isPipelineX = isPipelineX;
}

private void addPlanNodeInfo(int id, String info) {
private void addPipelineXPlanNodeInfo(boolean isSink, int id, String info, int instance) {
if (!pipelineXStatisticalInfo.containsKey(id)) {
pipelineXStatisticalInfo.put(id, new PipelineXStatistics());
}
pipelineXStatisticalInfo.get(id).add(isSink, info);
pipelineXStatisticalInfo.get(id).updateInstance(isSink, instance);
}

private void addPlanNodeInfo(int id, String info, int instance) {
if (!statisticalInfo.containsKey(id)) {
statisticalInfo.put(id, new ArrayList<String>());
statisticalInfoInstance.put(id, new Integer(0));
}
statisticalInfo.get(id).add(info);
int ins = statisticalInfoInstance.get(id);
ins = Math.max(ins, instance);
statisticalInfoInstance.put(id, ins);
}

private void addDataSinkInfo(String info) {
private void addDataSinkInfo(String info, int instance) {
if (fragmentInfo.get(fragmentId) == null) {
fragmentInfo.put(fragmentId, new ArrayList<String>());
fragmentInfoInstance.put(fragmentId, new Integer(0));
}
fragmentInfo.get(fragmentId).add(info);
int ins = fragmentInfoInstance.get(fragmentId);
ins = Math.max(ins, instance);
fragmentInfoInstance.put(fragmentId, ins);
}

public void addInfoFromProfile(RuntimeProfile profile, String name, String info) {
public void addInfoFromProfile(RuntimeProfile profile, String name, String info, int instance) {
if (isPipelineX) {
if (profile.sinkOperator()) {
name = name + "(Sink)";
} else {
name = name + "(Operator)";
}
addPlanNodeInfo(profile.nodeId(), name + ": " + info);
addPipelineXPlanNodeInfo(profile.sinkOperator(), profile.nodeId(), name + ": " + info, instance);
} else {
if (isDataSink) {
addDataSinkInfo(name + ": " + info);
addDataSinkInfo(name + ": " + info, instance);
} else {
addPlanNodeInfo(profile.nodeId(), name + ": " + info);
addPlanNodeInfo(profile.nodeId(), name + ": " + info, instance);
}
}
}

public boolean hasInfo(int id) {
return statisticalInfo.containsKey(id);
if (isPipelineX) {
return pipelineXStatisticalInfo.containsKey(id);
} else {
return statisticalInfo.containsKey(id);
}
}

public void getInfoById(int id, String prefix, StringBuilder str) {
if (!hasInfo(id)) {
return;
}
ArrayList<String> infos = statisticalInfo.get(id);
Collections.sort(infos);
for (String info : infos) {
str.append(prefix + info + "\n");
if (isPipelineX) {
getPipelineXInfoById(id, prefix, str);
} else {
ArrayList<String> infos = statisticalInfo.get(id);
str.append(prefix + "Instance num :" + statisticalInfoInstance.get(id) + "\n");
Collections.sort(infos);
for (String info : infos) {
str.append(prefix + info + "\n");
}
}
}

private void getPipelineXInfoById(int id, String prefix, StringBuilder str) {
PipelineXStatistics statistics = pipelineXStatisticalInfo.get(id);
str.append(prefix + "Operator: \n");
statistics.getOperator(prefix + " ", str);

str.append(prefix + "Sink: \n");
statistics.getSink(prefix + " ", str);
}

public void getDataSinkInfo(int fragmentIdx, String prefix, StringBuilder str) {
if (!fragmentInfo.containsKey(fragmentIdx)) {
return;
}
str.append(prefix + "Instance num :" + fragmentInfoInstance.get(fragmentIdx) + "\n");
ArrayList<String> infos = fragmentInfo.get(fragmentIdx);
Collections.sort(infos);
for (String info : infos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private static void mergeCounter(RuntimeProfile src, String counterName, Counter
String infoString = AVG_TIME_PRE + printCounter(newCounter.getValue(), newCounter.getType()) + ", "
+ MAX_TIME_PRE + printCounter(maxCounter.getValue(), maxCounter.getType()) + ", "
+ MIN_TIME_PRE + printCounter(minCounter.getValue(), minCounter.getType());
statistics.addInfoFromProfile(src, counterName, infoString);
statistics.addInfoFromProfile(src, counterName, infoString, rhsCounter.size() + 1);
}
} else {
Counter newCounter = new Counter(counter.getType(), counter.getValue());
Expand All @@ -514,7 +514,7 @@ private static void mergeCounter(RuntimeProfile src, String counterName, Counter
}
}
String infoString = printCounter(newCounter.getValue(), newCounter.getType());
statistics.addInfoFromProfile(src, counterName, infoString);
statistics.addInfoFromProfile(src, counterName, infoString, rhsCounter.size() + 1);
}
}

Expand Down

0 comments on commit 33b8225

Please sign in to comment.