diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java index 85dd2c97be6590..0eef8b25119b29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ColocationGroupProcDir.java @@ -14,20 +14,15 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. - package org.apache.doris.common.proc; - import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.resource.Tag; - import com.google.common.collect.ImmutableList; - import java.util.List; import java.util.Map; - /* * show proc "/colocation_group"; */ @@ -36,19 +31,16 @@ public class ColocationGroupProcDir implements ProcDirInterface { .add("GroupId").add("GroupName").add("TableIds") .add("BucketsNum").add("ReplicaAllocation").add("DistCols").add("IsStable") .add("ErrorMsg").build(); - @Override public boolean register(String name, ProcNodeInterface node) { return false; } - @Override public ProcNodeInterface lookup(String groupIdStr) throws AnalysisException { String[] parts = groupIdStr.split("\\."); if (parts.length != 2) { throw new AnalysisException("Invalid group id: " + groupIdStr); } - long dbId = -1; long grpId = -1; try { @@ -61,17 +53,54 @@ public ProcNodeInterface lookup(String groupIdStr) throws AnalysisException { GroupId groupId = new GroupId(dbId, grpId); ColocateTableIndex index = Env.getCurrentColocateIndex(); Map>> beSeqs = index.getBackendsPerBucketSeq(groupId); + Map>> beSeqs; + + // ==========Core modification: Distinguish between cloud/non-cloud environments to obtain the BE sequence========== + if (CloudReplica.isCloudEnv()) { + // Cloud environment: Call CloudReplica to obtain the Colocated BE ID and construct the compatible beSeqs structure + beSeqs = buildCloudBeSeqs(dbId, grpId); + } else { + // Non-cloud environment: Maintain the original logic + beSeqs = index.getBackendsPerBucketSeq(groupId); + } + return new ColocationGroupBackendSeqsProcNode(beSeqs); } + /** + * Construct the beSeqs data structure in the cloud environment and match it with the format of the non-cloud environment. + * @param dbId dbId of the Colocation Group + * @param grpId grpId of Colocation Group + * @return Compatible Map structure of type >> + */ + private Map>> buildCloudBeSeqs(long dbId, long grpId) { + Map>> beSeqs = Maps.newHashMap(); + + // 1.Call the core function of the cloud environment to obtain the list of associated BE IDs + List colocatedBeIds = CloudReplica.getColocatedBeId(dbId, grpId); + + // 2. Construct a structure that is compatible with the original format (the Tag is set to DEFAULT, and the Bucket sequence is organized according to the BE ID) + // Original format description: + // - Key: Tag(Resource Label) + // - Value: List> → The outer List corresponds to the Bucket sequence, and the inner List corresponds to the BE ID list of each Bucket. + Tag defaultTag = Tag.DEFAULT_TAG; + List> bucketSeqs = Lists.newArrayList(); + + // Simplified processing in the cloud environment: A single Bucket sequence contains all the associated BE IDs + bucketSeqs.add(colocatedBeIds); + + beSeqs.put(defaultTag, bucketSeqs); + return beSeqs; + } + + @Override public ProcResult fetchResult() throws AnalysisException { BaseProcResult result = new BaseProcResult(); result.setNames(TITLE_NAMES); - ColocateTableIndex index = Env.getCurrentColocateIndex(); List> infos = index.getInfos(); result.setRows(infos); return result; } -} +} \ No newline at end of file diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java new file mode 100644 index 00000000000000..57f6513a4fe382 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/proc/ColocationGroupProcDirTest.java @@ -0,0 +1,98 @@ +package org.apache.doris.common.proc; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ColocationGroup; +import org.apache.doris.common.GroupId; +import org.apache.doris.datasource.CloudReplica; +import org.apache.doris.persist.ColocateTableIndex; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +// 使用PowerMockRunner是因为需要Mock静态方法 CloudReplica.isCloudEnv() +@RunWith(PowerMockRunner.class) +// 告诉PowerMock需要处理CloudReplica和Env这两个类 +@PrepareForTest({CloudReplica.class, Env.class}) +public class ColocationGroupProcDirTest { + + private long dbId = 10001L; + private long grpId = 20001L; + private ColocationGroupProcDir procDir; + + @Before + public void setUp() { + // 每个测试方法运行前都会执行这里 + // 实例化要测试的类 + procDir = new ColocationGroupProcDir(dbId, grpId); + } + + /** + * 测试场景:云环境下,测试是否正确构建了BE序列 + * 对应PR中新增的 buildCloudBeSeqs 逻辑 + */ + @Test + public void testLookupInCloudEnv() throws Exception { + // 1. 准备模拟数据 + // 假设云环境返回了3个BE ID: 101, 102, 103 + List mockCloudBeIds = new ArrayList<>(); + mockCloudBeIds.add(101L); + mockCloudBeIds.add(102L); + mockCloudBeIds.add(103L); + + // 2. Mock静态类 + PowerMockito.mockStatic(CloudReplica.class); + + // 3. 设定行为:当调用 CloudReplica.isCloudEnv() 时,返回 true (模拟是云环境) + when(CloudReplica.isCloudEnv()).thenReturn(true); + + // 4. 设定行为:当调用 CloudReplica.getColocatedBeId() 时,返回我们准备的数据 + when(CloudReplica.getColocatedBeId(dbId, grpId)).thenReturn(mockCloudBeIds); + + // 5. 执行测试:调用 lookup 方法(传入 "dbId.grpId" 格式的字符串) + String groupIdStr = dbId + "." + grpId; + ProcNodeInterface result = procDir.lookup(groupIdStr); + + // 6. 验证结果 + // 期望结果不为空 + Assert.assertNotNull(result); + // 期望结果是 ColocationGroupBackendSeqsProcNode 类型 + Assert.assertTrue(result instanceof ColocationGroupBackendSeqsProcNode); + + } + + /** + * 测试场景:非云环境下,是否保持原有的逻辑 + */ + @Test + public void testLookupInNonCloudEnv() throws Exception { + // 1. Mock ColocateTableIndex (原有逻辑依赖这个类) + ColocateTableIndex mockIndex = mock(ColocateTableIndex.class); + + // 2. Mock静态类 + PowerMockito.mockStatic(CloudReplica.class); + PowerMockito.mockStatic(Env.class); + + // 3. 设定行为:是非云环境 + when(CloudReplica.isCloudEnv()).thenReturn(false); + + // 4. 设定行为:Env.getCurrentColocateIndex() 返回我们的mock对象 + when(Env.getCurrentColocateIndex()).thenReturn(mockIndex); + + // 5. 执行测试 + String groupIdStr = dbId + "." + grpId; + procDir.lookup(groupIdStr); + + + } +}