From fb8a0be4bbe5102d0dafe4988d74d6cb7b6d3680 Mon Sep 17 00:00:00 2001 From: Haotian Zhang <928016560@qq.com> Date: Fri, 1 Nov 2024 15:34:27 +0800 Subject: [PATCH] feat:upgrade lane. --- .../polaris-assembly-factory/pom.xml | 5 + .../polaris-circuitbreaker-factory/pom.xml | 5 + .../main/resources/conf/default-config.yml | 4 +- .../polaris/api/utils/CompareUtils.java | 12 +- .../tencent/polaris/api/utils/RuleUtils.java | 7 +- .../polaris-configuration-factory/pom.xml | 5 + .../composite/CompositeServiceUpdateTask.java | 3 +- .../connector/consul/ConsulAPIConnector.java | 2 + .../consul/service/lane/LaneService.java | 578 ++++++++++++++++++ .../consul/service/lane/entity/LaneGroup.java | 152 +++++ .../consul/service/lane/entity/LaneInfo.java | 90 +++ .../consul/service/lane/entity/LaneRule.java | 120 ++++ .../service/lane/entity/LaneRuleTag.java | 88 +++ .../lane/entity/RuleTagRelationship.java | 17 + .../memory/MessagePersistHandler.java | 2 + .../router-lane/pom.xml | 7 +- .../plugins/router/lane/LaneRouter.java | 264 ++++---- .../polaris-ratelimit-factory/pom.xml | 5 + pom.xml | 2 +- 19 files changed, 1231 insertions(+), 137 deletions(-) create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneGroup.java create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneInfo.java create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRule.java create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRuleTag.java create mode 100644 polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/RuleTagRelationship.java diff --git a/polaris-assembly/polaris-assembly-factory/pom.xml b/polaris-assembly/polaris-assembly-factory/pom.xml index e31c610b8..d0daa3357 100644 --- a/polaris-assembly/polaris-assembly-factory/pom.xml +++ b/polaris-assembly/polaris-assembly-factory/pom.xml @@ -45,6 +45,11 @@ router-isolated ${project.version} + + com.tencent.polaris + router-lane + ${project.version} + com.tencent.polaris router-healthy diff --git a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml index d4b335cac..6ee389974 100644 --- a/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml +++ b/polaris-circuitbreaker/polaris-circuitbreaker-factory/pom.xml @@ -50,6 +50,11 @@ router-isolated ${project.version} + + com.tencent.polaris + router-lane + ${project.version} + com.tencent.polaris router-healthy diff --git a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml index de94bd46d..87e13b1de 100644 --- a/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml +++ b/polaris-common/polaris-config-default/src/main/resources/conf/default-config.yml @@ -169,12 +169,12 @@ consumer: beforeChain: # 隔离路由 - isolatedRouter + # 泳道路由 + - laneRouter #描述: 服务路由链 chain: # 命名空间就近路由 - namespaceRouter - # 泳道路由 - - laneRouter # 元数据路由 - metadataRouter # 规则路由 diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/CompareUtils.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/CompareUtils.java index 1f4ad160d..da7135790 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/CompareUtils.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/CompareUtils.java @@ -35,7 +35,7 @@ public static int compareSingleValue(String value1, String value2) { return 0; } if (serviceWildcard1) { - // 1 before 2 + // 2 before 1 return 1; } if (serviceWildcard2) { @@ -52,4 +52,14 @@ public static int compareService(String namespace1, String service1, String name } return CompareUtils.compareSingleValue(service1, service2); } + + /** + * compare two boolean. + */ + public static int compareBoolean(boolean b1, boolean b2) { + if (b1 == b2) { + return 0; + } + return b1 ? -1 : 1; + } } diff --git a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/RuleUtils.java b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/RuleUtils.java index ad014327c..68ac7ee9a 100644 --- a/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/RuleUtils.java +++ b/polaris-common/polaris-model/src/main/java/com/tencent/polaris/api/utils/RuleUtils.java @@ -172,6 +172,11 @@ public static boolean matchMetadata(Map ruleMeta, Map ruleMeta, Map destMeta, MetadataContainerGroup metadataContainerGroup) { + return matchMetadata(ruleMeta, destMeta, metadataContainerGroup, true, Collections.emptyMap(), Collections.emptyMap(), null); + } + // 匹配metadata public static boolean matchMetadata(Map ruleMeta, Map destMeta, MetadataContainerGroup metadataContainerGroup, boolean isMatchSource, @@ -223,7 +228,7 @@ public static boolean matchMetadata(Map ruleMeta, Maprouter-isolated ${project.version} + + com.tencent.polaris + router-lane + ${project.version} + com.tencent.polaris router-healthy diff --git a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java index 8efe4f92e..7c90874a8 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java +++ b/polaris-plugins/polaris-plugins-connector/connector-composite/src/main/java/com/tencent/polaris/plugins/connector/composite/CompositeServiceUpdateTask.java @@ -122,7 +122,8 @@ public void execute() { || serviceEventKey.getEventType().equals(EventType.NEARBY_ROUTE_RULE) || serviceEventKey.getEventType().equals(EventType.LOSSLESS) || serviceEventKey.getEventType().equals(EventType.CIRCUIT_BREAKING) - || serviceEventKey.getEventType().equals(EventType.RATE_LIMITING)))) { + || serviceEventKey.getEventType().equals(EventType.RATE_LIMITING) + || serviceEventKey.getEventType().equals(EventType.LANE_RULE)))) { return; } boolean svcDeleted = this.notifyServerEvent( diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java index fb177ed0c..6f713731d 100644 --- a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/ConsulAPIConnector.java @@ -51,6 +51,7 @@ import com.tencent.polaris.plugins.connector.consul.service.ServiceService; import com.tencent.polaris.plugins.connector.consul.service.circuitbreaker.CircuitBreakingService; import com.tencent.polaris.plugins.connector.consul.service.lossless.LosslessService; +import com.tencent.polaris.plugins.connector.consul.service.lane.LaneService; import com.tencent.polaris.plugins.connector.consul.service.ratelimiting.RateLimitingService; import com.tencent.polaris.plugins.connector.consul.service.router.NearByRouteRuleService; import com.tencent.polaris.plugins.connector.consul.service.router.RoutingService; @@ -234,6 +235,7 @@ private void initActually(InitContext ctx, ServerConnectorConfig connectorConfig consulServiceMap.put(ServiceEventKey.EventType.LOSSLESS, new LosslessService(consulClient, consulRawClient, consulContext, "consul-lossless", mapper)); consulServiceMap.put(ServiceEventKey.EventType.CIRCUIT_BREAKING, new CircuitBreakingService(consulClient, consulRawClient, consulContext, "consul-circuit-breaking", mapper)); consulServiceMap.put(ServiceEventKey.EventType.RATE_LIMITING, new RateLimitingService(consulClient, consulRawClient, consulContext, "consul-rate-limiting", mapper)); + consulServiceMap.put(ServiceEventKey.EventType.LANE_RULE, new LaneService(consulClient, consulRawClient, consulContext, "consul-lane", mapper)); initialized = true; } diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java new file mode 100644 index 000000000..f0ef03dc3 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/LaneService.java @@ -0,0 +1,578 @@ +/* + * Tencent is pleased to support the open source community by making Polaris available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package com.tencent.polaris.plugins.connector.consul.service.lane; + +import com.ecwid.consul.SingleUrlParameters; +import com.ecwid.consul.UrlParameters; +import com.ecwid.consul.json.GsonFactory; +import com.ecwid.consul.transport.HttpResponse; +import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.ConsulRawClient; +import com.ecwid.consul.v1.QueryParams; +import com.ecwid.consul.v1.kv.model.GetValue; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.gson.reflect.TypeToken; +import com.google.protobuf.Any; +import com.google.protobuf.StringValue; +import com.google.protobuf.UInt32Value; +import com.tencent.polaris.api.exception.ErrorCode; +import com.tencent.polaris.api.exception.PolarisException; +import com.tencent.polaris.api.exception.ServerCodes; +import com.tencent.polaris.api.exception.ServerErrorResponseException; +import com.tencent.polaris.api.plugin.server.ServerEvent; +import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.logging.LoggerFactory; +import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask; +import com.tencent.polaris.plugins.connector.consul.ConsulContext; +import com.tencent.polaris.plugins.connector.consul.service.ConsulService; +import com.tencent.polaris.plugins.connector.consul.service.lane.entity.*; +import com.tencent.polaris.specification.api.v1.model.ModelProto; +import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto; +import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto; +import com.tencent.polaris.specification.api.v1.traffic.manage.LaneProto; +import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto; +import org.slf4j.Logger; +import org.yaml.snakeyaml.Yaml; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static com.tencent.polaris.api.config.plugin.DefaultPlugins.SERVER_CONNECTOR_CONSUL; +import static com.tencent.polaris.metadata.core.constant.TsfMetadataConstants.TSF_GROUP_ID; +import static com.tencent.polaris.plugins.connector.consul.service.common.TagConditionUtil.parseMatchStringType; + +/** + * @author Haotian Zhang + */ +public class LaneService extends ConsulService { + + private static final Logger LOG = LoggerFactory.getLogger(LaneService.class); + + private final Map laneInfoConsulIndexMap = new ConcurrentHashMap<>(); + + private final Map laneRuleConsulIndexMap = new ConcurrentHashMap<>(); + + public LaneService(ConsulClient consulClient, ConsulRawClient consulRawClient, ConsulContext consulContext, + String threadName, ObjectMapper mapper) { + super(consulClient, consulRawClient, consulContext, threadName, mapper); + } + + @Override + protected void sendRequest(ServiceUpdateTask serviceUpdateTask) { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference lock = new AtomicReference<>(); + AtomicReference throwable = new AtomicReference<>(); + AtomicBoolean isLaneInfoReturn = new AtomicBoolean(true); + String namespace = serviceUpdateTask.getServiceEventKey().getNamespace(); + String service = serviceUpdateTask.getServiceEventKey().getService(); + LaneRuleKey laneRuleKey = new LaneRuleKey(); + laneRuleKey.setNamespace(namespace); + laneRuleKey.setService(service); + Long currentLaneInfoConsulIndex = getLaneInfoConsulIndex(laneRuleKey); + Long currentLaneRuleConsulIndex = getLaneRuleConsulIndex(laneRuleKey); + + // Task for /lane/info + AtomicReference laneInfoResponseAtomicReference = new AtomicReference<>(); + Runnable laneInfoMainTask = () -> { + try { + laneInfoResponseAtomicReference.set(syncLaneInfo(serviceUpdateTask, currentLaneInfoConsulIndex, true)); + if (lock.compareAndSet(null, new Object())) { + latch.countDown(); + } + } catch (Throwable t) { + if (throwable.compareAndSet(null, t)) { + latch.countDown(); + } + } + }; + + // Task for /lane/rule + AtomicReference laneRuleResponseAtomicReference = new AtomicReference<>(); + Runnable laneRuleMainTask = () -> { + try { + laneRuleResponseAtomicReference.set(syncLaneRule(serviceUpdateTask, currentLaneRuleConsulIndex, true)); + if (lock.compareAndSet(null, new Object())) { + isLaneInfoReturn.set(false); + latch.countDown(); + } + } catch (Throwable t) { + if (throwable.compareAndSet(null, t)) { + latch.countDown(); + } + } + }; + + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + executorService.execute(laneInfoMainTask); + executorService.execute(laneRuleMainTask); + latch.await(); + executorService.shutdownNow(); + if (throwable.get() != null) { + throw throwable.get(); + } + LaneInfoResponse laneInfoResponse = null; + LaneRuleResponse laneRuleResponse = null; + if (isLaneInfoReturn.get()) { + laneInfoResponse = laneInfoResponseAtomicReference.get(); + if (laneInfoResponse != null) { + laneRuleResponse = syncLaneRule(serviceUpdateTask, currentLaneRuleConsulIndex, false); + } + } else { + laneRuleResponse = laneRuleResponseAtomicReference.get(); + if (laneRuleResponse != null) { + laneInfoResponse = syncLaneInfo(serviceUpdateTask, currentLaneInfoConsulIndex, false); + } + } + + int code = ServerCodes.DATA_NO_CHANGE; + // create service. + ServiceProto.Service.Builder newServiceBuilder = ServiceProto.Service.newBuilder(); + newServiceBuilder.setNamespace(StringValue.of(namespace)); + newServiceBuilder.setName(StringValue.of(service)); + // create discover response. + ResponseProto.DiscoverResponse.Builder newDiscoverResponseBuilder = ResponseProto.DiscoverResponse.newBuilder(); + if (laneInfoResponse != null && laneRuleResponse != null) { + code = ServerCodes.EXECUTE_SUCCESS; + newServiceBuilder.setRevision(StringValue.of(laneInfoResponse.getIndex() + "-" + laneRuleResponse.getIndex())); + List laneGroupList = parseResponse(laneInfoResponse, laneRuleResponse, namespace, service); + if (CollectionUtils.isNotEmpty(laneGroupList)) { + newDiscoverResponseBuilder.addAllLanes(laneGroupList); + } + } + newDiscoverResponseBuilder.setCode(UInt32Value.of(code)); + newDiscoverResponseBuilder.setService(newServiceBuilder); + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), newDiscoverResponseBuilder.build(), null, SERVER_CONNECTOR_CONSUL); + boolean svcDeleted = serviceUpdateTask.notifyServerEvent(serverEvent); + // 重写index + if (laneInfoResponse != null && laneRuleResponse != null) { + setLaneInfoConsulIndex(laneRuleKey, currentLaneInfoConsulIndex, laneInfoResponse.getIndex()); + setLaneRuleConsulIndex(laneRuleKey, currentLaneRuleConsulIndex, laneRuleResponse.getIndex()); + } + if (!svcDeleted) { + serviceUpdateTask.addUpdateTaskSet(); + } + } catch (Throwable e) { + LOG.error("[TSF Lane] tsf lane load error. Will sleep for {} ms.", consulContext.getConsulErrorSleep(), e); + try { + Thread.sleep(consulContext.getConsulErrorSleep()); + } catch (Exception e1) { + LOG.error("error in sleep, msg: {}", e1.getMessage()); + } + PolarisException error = ServerErrorResponseException.build(ErrorCode.NETWORK_ERROR.getCode(), + "Get lane sync failed."); + ServerEvent serverEvent = new ServerEvent(serviceUpdateTask.getServiceEventKey(), null, error, SERVER_CONNECTOR_CONSUL); + serviceUpdateTask.notifyServerEvent(serverEvent); + } finally { + if (!executorService.isShutdown() || !executorService.isTerminated()) { + executorService.shutdownNow(); + } + } + } + + private List parseResponse(LaneInfoResponse laneInfoResponse, + LaneRuleResponse laneRuleResponse, String namespace, String service) { + List laneGroupList = new ArrayList<>(); + + LaneProto.LaneGroup.Builder laneGroupBuilder = LaneProto.LaneGroup.newBuilder(); + laneGroupBuilder.setName("tsf"); + // set destination group list + RoutingProto.DestinationGroup.Builder destinationGroupBuilder = RoutingProto.DestinationGroup.newBuilder(); + destinationGroupBuilder.setNamespace("*"); + destinationGroupBuilder.setService("*"); + laneGroupBuilder.addDestinations(destinationGroupBuilder.build()); + // set entry list + List laneInfoList = laneInfoResponse.getLaneInfoList(); + List entranceList = new ArrayList<>(); + for (LaneInfo laneInfo : laneInfoList) { + for (LaneGroup laneGroup : laneInfo.getLaneGroupList()) { + if (laneGroup.isEntrance()) { + entranceList.add(laneGroup.getGroupId()); + } + } + } + for (String entrance : entranceList) { + LaneProto.TrafficEntry.Builder trafficEntryBuilder = LaneProto.TrafficEntry.newBuilder(); + trafficEntryBuilder.setType("polarismesh.cn/service"); + LaneProto.ServiceSelector.Builder serviceSelectorBuilder = LaneProto.ServiceSelector.newBuilder(); + serviceSelectorBuilder.setNamespace("*"); + serviceSelectorBuilder.setService("*"); + ModelProto.MatchString.Builder label = ModelProto.MatchString.newBuilder(); + label.setType(ModelProto.MatchString.MatchStringType.EXACT); + label.setValue(StringValue.of(entrance)); + label.setValueType(ModelProto.MatchString.ValueType.TEXT); + serviceSelectorBuilder.putLabels(TSF_GROUP_ID, label.build()); + trafficEntryBuilder.setSelector(Any.pack(serviceSelectorBuilder.build())); + laneGroupBuilder.addEntries(trafficEntryBuilder.build()); + } + // set rule list + List tsfLaneRuleList = laneRuleResponse.getLaneRuleList(); + List laneRuleList = new ArrayList<>(); + for (LaneRule laneRule : tsfLaneRuleList) { + LaneProto.LaneRule.Builder laneRuleBuilder = LaneProto.LaneRule.newBuilder(); + laneRuleBuilder.setId(laneRule.getLaneId()); + laneRuleBuilder.setName(laneRule.getLaneId()); + laneRuleBuilder.setGroupName("tsf"); + laneRuleBuilder.setEnable(true); + laneRuleBuilder.setMatchMode(LaneProto.LaneRule.LaneMatchMode.STRICT); + laneRuleBuilder.setPriority(laneRule.getPriority()); + laneRuleBuilder.setLabelKey(TSF_GROUP_ID); + // set TrafficMatchRule + LaneProto.TrafficMatchRule.Builder trafficMatchRuleBuilder = LaneProto.TrafficMatchRule.newBuilder(); + trafficMatchRuleBuilder.setMatchMode(parseTrafficMatchMode(laneRule.getRuleTagRelationship())); + List sourceMatchList = new ArrayList<>(); + for (LaneRuleTag laneRuleTag : laneRule.getRuleTagList()) { + RoutingProto.SourceMatch.Builder sourceMatchBuilder = RoutingProto.SourceMatch.newBuilder(); + sourceMatchBuilder.setType(RoutingProto.SourceMatch.Type.CUSTOM); + sourceMatchBuilder.setKey(laneRuleTag.getTagName()); + ModelProto.MatchString.Builder matchStringBuilder = ModelProto.MatchString.newBuilder(); + matchStringBuilder.setType(parseMatchStringType(laneRuleTag.getTagOperator())); + matchStringBuilder.setValue(StringValue.of(laneRuleTag.getTagValue())); + matchStringBuilder.setValueType(ModelProto.MatchString.ValueType.TEXT); + sourceMatchBuilder.setValue(matchStringBuilder.build()); + sourceMatchList.add(sourceMatchBuilder.build()); + } + if (CollectionUtils.isNotEmpty(sourceMatchList)) { + trafficMatchRuleBuilder.addAllArguments(sourceMatchList); + } + laneRuleBuilder.setTrafficMatchRule(trafficMatchRuleBuilder.build()); + // set DefaultLabelValue + List labelValueList = new ArrayList<>(); + for (LaneInfo laneInfo : laneInfoList) { + if (StringUtils.equals(laneInfo.getLaneId(), laneRule.getLaneId())) { + for (LaneGroup laneGroup : laneInfo.getLaneGroupList()) { + labelValueList.add(laneGroup.getGroupId()); + } + } + } + laneRuleBuilder.setDefaultLabelValue(String.join(",", labelValueList)); + laneRuleList.add(laneRuleBuilder.build()); + } + if (CollectionUtils.isNotEmpty(laneRuleList)) { + laneGroupBuilder.addAllRules(laneRuleList); + } + + laneGroupList.add(laneGroupBuilder.build()); + return laneGroupList; + } + + private LaneProto.TrafficMatchRule.TrafficMatchMode parseTrafficMatchMode(RuleTagRelationship ruleTagRelationship) { + switch (ruleTagRelationship) { + case RELEATION_OR: + return LaneProto.TrafficMatchRule.TrafficMatchMode.OR; + case RELEATION_AND: + default: + return LaneProto.TrafficMatchRule.TrafficMatchMode.AND; + } + } + + private LaneInfoResponse syncLaneInfo(ServiceUpdateTask serviceUpdateTask, Long currentIndex, boolean wait) { + String namespace = serviceUpdateTask.getServiceEventKey().getNamespace(); + String service = serviceUpdateTask.getServiceEventKey().getService(); + String laneInfoPrefixKey = "/v1/kv/lane/info/"; + // 带等待时间发起对Consul的KV请求 + LOG.trace("tsf lane info, consul kv key: {}", laneInfoPrefixKey); + UrlParameters tokenParam = new SingleUrlParameters("token", consulContext.getAclToken()); + UrlParameters recurseParam = new SingleUrlParameters("recurse"); + QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), currentIndex); + LOG.debug("Begin get lane info of {}:{} sync", namespace, service); + HttpResponse rawResponse; + if (wait) { + rawResponse = consulRawClient.makeGetRequest(laneInfoPrefixKey, recurseParam, tokenParam, queryParams); + } else { + rawResponse = consulRawClient.makeGetRequest(laneInfoPrefixKey, recurseParam, tokenParam); + } + if (rawResponse != null) { + if (LOG.isDebugEnabled()) { + String responseStr = "RawResponse{" + + "statusCode=" + rawResponse.getStatusCode() + + ", statusMessage='" + rawResponse.getStatusMessage() + '\'' + + ", content='" + rawResponse.getContent() + '\'' + + ", consulIndex=" + rawResponse.getConsulIndex() + '\'' + + ", consulKnownLeader=" + rawResponse.isConsulKnownLeader() + '\'' + + ", consulLastContact=" + rawResponse.getConsulLastContact() + + '}'; + LOG.debug("tsf lane info, consul kv namespace, response: {}", responseStr); + } + + Long newIndex = rawResponse.getConsulIndex(); + if (Objects.nonNull(newIndex)) { + if (!Objects.equals(currentIndex, newIndex) || !wait) { + LaneInfoResponse laneInfoResponse = new LaneInfoResponse(); + laneInfoResponse.setIndex(newIndex); + laneInfoResponse.setLaneInfoList(new ArrayList<>()); + if (rawResponse.getStatusCode() == 200) { + if (rawResponse.getContent() != null) { + LOG.info("new lane info: {}", rawResponse.getContent()); + laneInfoResponse.setLaneInfoList(parseLaneInfoResponse(rawResponse, mapper)); + } + } else if (rawResponse.getStatusCode() == 404) { + LOG.info("empty lane info: {}", rawResponse.getContent()); + } + return laneInfoResponse; + } else { + LOG.debug("[TSF Lane Info] Consul data is not changed"); + } + } else { + LOG.warn("[TSF Lane Info] Consul data is abnormal. {}", rawResponse); + } + } + return null; + } + + private List parseLaneInfoResponse(final HttpResponse response, ObjectMapper mapper) { + List valueList = GsonFactory.getGson().fromJson(response.getContent(), + new TypeToken>() { + }.getType()); + Yaml yaml = new Yaml(); + List laneInfoList = Lists.newArrayList(); + valueList.forEach(value -> { + try { + String routeJsonString = mapper + .writeValueAsString(yaml.load(value.getDecodedValue())); + laneInfoList.add(mapper.readValue(routeJsonString, + new TypeReference() { + })); + } catch (Exception ex) { + LOG.error("tsf lane info load error, ex", ex); + throw new PolarisException(ErrorCode.INVALID_RESPONSE, "tsf lane info load error.", ex); + } + }); + return laneInfoList; + } + + private LaneRuleResponse syncLaneRule(ServiceUpdateTask serviceUpdateTask, Long currentIndex, boolean wait) { + String namespace = serviceUpdateTask.getServiceEventKey().getNamespace(); + String service = serviceUpdateTask.getServiceEventKey().getService(); + String laneRulePrefixKey = "/v1/kv/lane/rule/"; + // 带等待时间发起对Consul的KV请求 + LOG.trace("tsf lane rule, consul kv key: {}", laneRulePrefixKey); + UrlParameters tokenParam = new SingleUrlParameters("token", consulContext.getAclToken()); + UrlParameters recurseParam = new SingleUrlParameters("recurse"); + QueryParams queryParams = new QueryParams(consulContext.getWaitTime(), currentIndex); + LOG.debug("Begin get lane rule of {}:{} sync", namespace, service); + HttpResponse rawResponse; + if (wait) { + rawResponse = consulRawClient.makeGetRequest(laneRulePrefixKey, recurseParam, tokenParam, queryParams); + } else { + rawResponse = consulRawClient.makeGetRequest(laneRulePrefixKey, recurseParam, tokenParam); + } + if (rawResponse != null) { + if (LOG.isDebugEnabled()) { + String responseStr = "RawResponse{" + + "statusCode=" + rawResponse.getStatusCode() + + ", statusMessage='" + rawResponse.getStatusMessage() + '\'' + + ", content='" + rawResponse.getContent() + '\'' + + ", consulIndex=" + rawResponse.getConsulIndex() + '\'' + + ", consulKnownLeader=" + rawResponse.isConsulKnownLeader() + '\'' + + ", consulLastContact=" + rawResponse.getConsulLastContact() + + '}'; + LOG.debug("tsf lane rule, consul kv namespace, response: {}", responseStr); + } + + Long newIndex = rawResponse.getConsulIndex(); + if (Objects.nonNull(newIndex)) { + if (!Objects.equals(currentIndex, newIndex) || !wait) { + LaneRuleResponse laneRuleResponse = new LaneRuleResponse(); + laneRuleResponse.setIndex(newIndex); + laneRuleResponse.setLaneRuleList(new ArrayList<>()); + if (rawResponse.getStatusCode() == 200) { + if (rawResponse.getContent() != null) { + LOG.info("new lane rule: {}", rawResponse.getContent()); + laneRuleResponse.setLaneRuleList(parseLaneRuleResponse(rawResponse, mapper)); + } + } else if (rawResponse.getStatusCode() == 404) { + LOG.info("empty lane rule: {}", rawResponse.getContent()); + } + return laneRuleResponse; + } else { + LOG.debug("[TSF Lane rule] Consul data is not changed"); + } + } else { + LOG.warn("[TSF Lane rule] Consul data is abnormal. {}", rawResponse); + } + } + return null; + } + + private List parseLaneRuleResponse(final HttpResponse response, ObjectMapper mapper) { + List valueList = GsonFactory.getGson().fromJson(response.getContent(), + new TypeToken>() { + }.getType()); + Yaml yaml = new Yaml(); + List laneRuleList = Lists.newArrayList(); + valueList.forEach(value -> { + try { + String routeJsonString = mapper + .writeValueAsString(yaml.load(value.getDecodedValue())); + laneRuleList.add(mapper.readValue(routeJsonString, + new TypeReference() { + })); + } catch (Exception ex) { + LOG.error("tsf lane rule load error, ex", ex); + throw new PolarisException(ErrorCode.INVALID_RESPONSE, "tsf lane rule load error.", ex); + } + }); + return laneRuleList; + } + + private Long getLaneInfoConsulIndex(LaneRuleKey routerRuleKey) { + Long index = laneInfoConsulIndexMap.get(routerRuleKey); + if (index != null) { + return index; + } + setLaneInfoConsulIndex(routerRuleKey, null, -1L); + return -1L; + } + + private void setLaneInfoConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) { + LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); + laneInfoConsulIndexMap.put(routerRuleKey, newIndex); + } + + private Long getLaneRuleConsulIndex(LaneRuleKey routerRuleKey) { + Long index = laneRuleConsulIndexMap.get(routerRuleKey); + if (index != null) { + return index; + } + setLaneRuleConsulIndex(routerRuleKey, null, -1L); + return -1L; + } + + private void setLaneRuleConsulIndex(LaneRuleKey routerRuleKey, Long lastIndex, Long newIndex) { + LOG.debug("LaneRuleKey: {}; lastIndex: {}; newIndex: {}", routerRuleKey, lastIndex, newIndex); + laneRuleConsulIndexMap.put(routerRuleKey, newIndex); + } + + static class LaneRuleKey { + private String namespace = ""; + private String service = ""; + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getService() { + return service; + } + + public void setService(String service) { + this.service = service; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LaneRuleKey that = (LaneRuleKey) o; + return Objects.equals(getNamespace(), that.getNamespace()) && Objects.equals(getService(), that.getService()); + } + + @Override + public int hashCode() { + return Objects.hash(getNamespace(), getService()); + } + + @Override + public String toString() { + return "LaneRuleKey{" + + "namespace='" + namespace + '\'' + + ", service='" + service + '\'' + + '}'; + } + } + + static class LaneInfoResponse { + private Long index = -1L; + + private int code = ServerCodes.EXECUTE_SUCCESS; + + private List laneInfoList = null; + + public Long getIndex() { + return index; + } + + public void setIndex(Long index) { + this.index = index; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public List getLaneInfoList() { + return laneInfoList; + } + + public void setLaneInfoList(List laneInfoList) { + this.laneInfoList = laneInfoList; + } + } + + static class LaneRuleResponse { + private Long index = -1L; + + private int code = ServerCodes.EXECUTE_SUCCESS; + + private List laneRuleList = null; + + public Long getIndex() { + return index; + } + + public void setIndex(Long index) { + this.index = index; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public List getLaneRuleList() { + return laneRuleList; + } + + public void setLaneRuleList(List laneRuleList) { + this.laneRuleList = laneRuleList; + } + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneGroup.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneGroup.java new file mode 100644 index 000000000..24dcb7963 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneGroup.java @@ -0,0 +1,152 @@ +package com.tencent.polaris.plugins.connector.consul.service.lane.entity; + +import java.sql.Timestamp; + +/** + * 泳道 + * User: MackZhang + * Date: 2020/1/15 + */ +public class LaneGroup { + + private String laneGroupId; + + /** + * 泳道ID + */ + private String laneId; + + /** + * 部署组ID + */ + private String groupId; + + private String groupName; + + /** + * 应用ID + */ + private String applicationId; + + private String applicationName; + + private String namespaceId; + + private String namespaceName; + + /** + * 是否入口应用 + */ + private boolean entrance; + + /** + * 1 已删除/ 2 运行中 / 3 停止 + */ + private Integer status; + + /** + * 规则创建时间 + */ + private Timestamp createTime; + + /** + * 规则更新时间 + */ + private Timestamp updateTime; + + public String getLaneGroupId() { + return laneGroupId; + } + + public void setLaneGroupId(final String laneGroupId) { + this.laneGroupId = laneGroupId; + } + + public String getLaneId() { + return laneId; + } + + public void setLaneId(final String laneId) { + this.laneId = laneId; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(final String groupId) { + this.groupId = groupId; + } + + public String getGroupName() { + return groupName; + } + + public void setGroupName(final String groupName) { + this.groupName = groupName; + } + + public String getApplicationId() { + return applicationId; + } + + public void setApplicationId(final String applicationId) { + this.applicationId = applicationId; + } + + public String getApplicationName() { + return applicationName; + } + + public void setApplicationName(final String applicationName) { + this.applicationName = applicationName; + } + + public String getNamespaceId() { + return namespaceId; + } + + public void setNamespaceId(final String namespaceId) { + this.namespaceId = namespaceId; + } + + public String getNamespaceName() { + return namespaceName; + } + + public void setNamespaceName(final String namespaceName) { + this.namespaceName = namespaceName; + } + + public boolean isEntrance() { + return entrance; + } + + public void setEntrance(final boolean entrance) { + this.entrance = entrance; + } + + public Integer getStatus() { + return status; + } + + public void setStatus(final Integer status) { + this.status = status; + } + + public Timestamp getCreateTime() { + return createTime; + } + + public void setCreateTime(final Timestamp createTime) { + this.createTime = createTime; + } + + public Timestamp getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(final Timestamp updateTime) { + this.updateTime = updateTime; + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneInfo.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneInfo.java new file mode 100644 index 000000000..ef919ecf5 --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneInfo.java @@ -0,0 +1,90 @@ +package com.tencent.polaris.plugins.connector.consul.service.lane.entity; + +import java.sql.Timestamp; +import java.util.List; + +/** + * 泳道 + * User: MackZhang + * Date: 2020/1/14 + */ +public class LaneInfo { + + /** + * 泳道ID + */ + private String laneId; + + /** + * 泳道名称 + */ + private String laneName; + + /** + * 备注 + */ + private String remark; + + /** + * 规则创建时间 + */ + private Timestamp createTime; + + /** + * 规则更新时间 + */ + private Timestamp updateTime; + + /** + * 泳道部署组信息 + */ + private List laneGroupList; + + public String getLaneId() { + return laneId; + } + + public void setLaneId(final String laneId) { + this.laneId = laneId; + } + + public String getLaneName() { + return laneName; + } + + public void setLaneName(final String laneName) { + this.laneName = laneName; + } + + public String getRemark() { + return remark; + } + + public void setRemark(final String remark) { + this.remark = remark; + } + + public List getLaneGroupList() { + return laneGroupList; + } + + public void setLaneGroupList(final List laneGroupList) { + this.laneGroupList = laneGroupList; + } + + public Timestamp getCreateTime() { + return createTime; + } + + public void setCreateTime(final Timestamp createTime) { + this.createTime = createTime; + } + + public Timestamp getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(final Timestamp updateTime) { + this.updateTime = updateTime; + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRule.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRule.java new file mode 100644 index 000000000..1d8751a3b --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRule.java @@ -0,0 +1,120 @@ +package com.tencent.polaris.plugins.connector.consul.service.lane.entity; + +import java.sql.Timestamp; +import java.util.List; + +/** + * User: MackZhang + * Date: 2020/1/14 + */ +public class LaneRule { + + private String ruleId; + + private String ruleName; + + /** + * 越小优先级越高 + */ + private Integer priority; + + private String remark; + + private List ruleTagList; + + private RuleTagRelationship ruleTagRelationship; + + private String laneId; + + private boolean enable; + + /** + * 规则创建时间 + */ + private Timestamp createTime; + + /** + * 规则更新时间 + */ + private Timestamp updateTime; + + public String getRuleId() { + return ruleId; + } + + public void setRuleId(final String ruleId) { + this.ruleId = ruleId; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(final Integer priority) { + this.priority = priority; + } + + public String getRuleName() { + return ruleName; + } + + public void setRuleName(final String ruleName) { + this.ruleName = ruleName; + } + + public String getRemark() { + return remark; + } + + public void setRemark(final String remark) { + this.remark = remark; + } + + public List getRuleTagList() { + return ruleTagList; + } + + public void setRuleTagList(final List ruleTagList) { + this.ruleTagList = ruleTagList; + } + + public RuleTagRelationship getRuleTagRelationship() { + return ruleTagRelationship; + } + + public void setRuleTagRelationship(final RuleTagRelationship ruleTagRelationship) { + this.ruleTagRelationship = ruleTagRelationship; + } + + public String getLaneId() { + return laneId; + } + + public void setLaneId(final String laneId) { + this.laneId = laneId; + } + + public Timestamp getCreateTime() { + return createTime; + } + + public void setCreateTime(final Timestamp createTime) { + this.createTime = createTime; + } + + public Timestamp getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(final Timestamp updateTime) { + this.updateTime = updateTime; + } + + public boolean isEnable() { + return enable; + } + + public void setEnable(final boolean enable) { + this.enable = enable; + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRuleTag.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRuleTag.java new file mode 100644 index 000000000..ef9877a7d --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/LaneRuleTag.java @@ -0,0 +1,88 @@ +package com.tencent.polaris.plugins.connector.consul.service.lane.entity; + +import java.sql.Timestamp; + +/** + * 泳道规则标签 + *

+ * User: MackZhang + * Date: 2020/1/15 + */ +public class LaneRuleTag { + + private String tagId; + + private String tagName; + + private String tagOperator; + + private String tagValue; + + private String laneRuleId; + + /** + * 规则创建时间 + */ + private Timestamp createTime; + + /** + * 规则更新时间 + */ + private Timestamp updateTime; + + public String getTagId() { + return tagId; + } + + public void setTagId(final String tagId) { + this.tagId = tagId; + } + + public String getTagName() { + return tagName; + } + + public void setTagName(final String tagName) { + this.tagName = tagName; + } + + public String getTagOperator() { + return tagOperator; + } + + public void setTagOperator(final String tagOperator) { + this.tagOperator = tagOperator; + } + + public String getTagValue() { + return tagValue; + } + + public void setTagValue(final String tagValue) { + this.tagValue = tagValue; + } + + public String getLaneRuleId() { + return laneRuleId; + } + + public void setLaneRuleId(final String laneRuleId) { + this.laneRuleId = laneRuleId; + } + + public Timestamp getCreateTime() { + return createTime; + } + + public void setCreateTime(final Timestamp createTime) { + this.createTime = createTime; + } + + public Timestamp getUpdateTime() { + return updateTime; + } + + public void setUpdateTime(final Timestamp updateTime) { + this.updateTime = updateTime; + } +} diff --git a/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/RuleTagRelationship.java b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/RuleTagRelationship.java new file mode 100644 index 000000000..95c98fe2e --- /dev/null +++ b/polaris-plugins/polaris-plugins-connector/connector-consul/src/main/java/com/tencent/polaris/plugins/connector/consul/service/lane/entity/RuleTagRelationship.java @@ -0,0 +1,17 @@ +package com.tencent.polaris.plugins.connector.consul.service.lane.entity; + +/** + * User: MackZhang + * Date: 2020/2/12 + */ +public enum RuleTagRelationship { + /** + * 与 + */ + RELEATION_AND, + + /** + * 或 + */ + RELEATION_OR; +} diff --git a/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/MessagePersistHandler.java b/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/MessagePersistHandler.java index 11f7f3450..bc68b6b6d 100644 --- a/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/MessagePersistHandler.java +++ b/polaris-plugins/polaris-plugins-registry/registry-memory/src/main/java/com/tencent/polaris/plugins/registry/memory/MessagePersistHandler.java @@ -238,6 +238,8 @@ private Path doSaveService(ServiceEventKey svcEventKey, Message message) { File persistTmpFile = new File(persistDirPath + File.separator + tmpFileName); File persistLockFile = new File(persistDirPath + File.separator + lockFileName); try { + // Occasionally the directory cannot be found, so we need to init every time. + init(); if (!persistLockFile.exists()) { if (!persistLockFile.createNewFile()) { LOG.warn("lock file {} already exists", persistLockFile.getAbsolutePath()); diff --git a/polaris-plugins/polaris-plugins-router/router-lane/pom.xml b/polaris-plugins/polaris-plugins-router/router-lane/pom.xml index 70fd01a4b..b26a99e51 100644 --- a/polaris-plugins/polaris-plugins-router/router-lane/pom.xml +++ b/polaris-plugins/polaris-plugins-router/router-lane/pom.xml @@ -23,12 +23,7 @@ com.tencent.polaris - polaris-logging - ${project.version} - - - com.tencent.polaris - polaris-metadata + polaris-client ${project.version} diff --git a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java index 871ef7e0f..0eb07dbf7 100644 --- a/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java +++ b/polaris-plugins/polaris-plugins-router/router-lane/src/main/java/com/tencent/polaris/plugins/router/lane/LaneRouter.java @@ -18,26 +18,24 @@ package com.tencent.polaris.plugins.router.lane; import com.google.protobuf.InvalidProtocolBufferException; -import com.tencent.polaris.annonation.JustForTest; import com.tencent.polaris.api.config.consumer.ServiceRouterConfig; import com.tencent.polaris.api.exception.ErrorCode; import com.tencent.polaris.api.exception.PolarisException; import com.tencent.polaris.api.plugin.common.InitContext; -import com.tencent.polaris.api.plugin.registry.EventCompleteNotifier; -import com.tencent.polaris.api.plugin.registry.ResourceFilter; import com.tencent.polaris.api.plugin.route.RouteInfo; import com.tencent.polaris.api.plugin.route.RouteResult; -import com.tencent.polaris.api.pojo.Instance; -import com.tencent.polaris.api.pojo.RouteArgument; -import com.tencent.polaris.api.pojo.ServiceEventKey; -import com.tencent.polaris.api.pojo.ServiceInstances; -import com.tencent.polaris.api.pojo.ServiceKey; -import com.tencent.polaris.api.pojo.ServiceRule; +import com.tencent.polaris.api.pojo.*; +import com.tencent.polaris.api.rpc.RequestBaseEntity; import com.tencent.polaris.api.utils.CollectionUtils; +import com.tencent.polaris.api.utils.CompareUtils; import com.tencent.polaris.api.utils.RuleUtils; import com.tencent.polaris.api.utils.StringUtils; +import com.tencent.polaris.client.flow.BaseFlow; +import com.tencent.polaris.client.flow.DefaultFlowControlParam; +import com.tencent.polaris.client.flow.ResourcesResponse; import com.tencent.polaris.logging.LoggerFactory; import com.tencent.polaris.metadata.core.MessageMetadataContainer; +import com.tencent.polaris.metadata.core.MetadataContainer; import com.tencent.polaris.metadata.core.MetadataType; import com.tencent.polaris.metadata.core.TransitiveType; import com.tencent.polaris.metadata.core.manager.MetadataContext; @@ -48,18 +46,7 @@ import com.tencent.polaris.specification.api.v1.traffic.manage.RoutingProto; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -81,7 +68,7 @@ public class LaneRouter extends AbstractServiceRouter { private static final String SERVICE_SELECTOR = "polarismesh.cn/service"; - private Function> ruleGetter = serviceKey -> { + private final Function> ruleGetter = serviceKey -> { if (Objects.isNull(serviceKey)) { return Collections.emptyList(); } @@ -89,48 +76,16 @@ public class LaneRouter extends AbstractServiceRouter { return Collections.emptyList(); } - ServiceEventKey eventKey = ServiceEventKey.builder() - .serviceKey(serviceKey) - .eventType(ServiceEventKey.EventType.LANE_RULE) - .build(); - - AtomicBoolean finish = new AtomicBoolean(false); - CountDownLatch latch = new CountDownLatch(1); - - ServiceRule outbound = extensions.getLocalRegistry().getServiceRule(new ResourceFilter(eventKey, true, false)); - - // 之前已经 Load 成功了,就直接用 - if (outbound.isInitialized()) { - Object rule = outbound.getRule(); - if (Objects.nonNull(rule)) { - return ((ResponseProto.DiscoverResponse) rule).getLanesList(); - } - return Collections.emptyList(); - } - extensions.getLocalRegistry().loadServiceRule(eventKey, new EventCompleteNotifier() { - @Override - public void complete(ServiceEventKey svcEventKey) { - if (finish.compareAndSet(false, true)) { - latch.countDown(); - } - } - - @Override - public void completeExceptionally(ServiceEventKey svcEventKey, Throwable throwable) { - LOG.error("fetch lane group fail: {}", eventKey, throwable); - if (finish.compareAndSet(false, true)) { - latch.countDown(); - } - } - }); - - try { - boolean ok = latch.await(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - - outbound = extensions.getLocalRegistry().getServiceRule(new ResourceFilter(eventKey, true, true)); + DefaultFlowControlParam engineFlowControlParam = new DefaultFlowControlParam(); + BaseFlow.buildFlowControlParam(new RequestBaseEntity(), extensions.getConfiguration(), engineFlowControlParam); + Set routerKeys = new HashSet<>(); + ServiceEventKey dstSvcEventKey = ServiceEventKey.builder().serviceKey(serviceKey).eventType(ServiceEventKey.EventType.LANE_RULE).build(); + routerKeys.add(dstSvcEventKey); + DefaultServiceEventKeysProvider svcKeysProvider = new DefaultServiceEventKeysProvider(); + svcKeysProvider.setSvcEventKeys(routerKeys); + ResourcesResponse resourcesResponse = BaseFlow + .syncGetResources(extensions, false, svcKeysProvider, engineFlowControlParam); + ServiceRule outbound = resourcesResponse.getServiceRule(dstSvcEventKey); Object rule = outbound.getRule(); if (Objects.nonNull(rule)) { return ((ResponseProto.DiscoverResponse) rule).getLanesList(); @@ -159,8 +114,13 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw MetadataContext manager = MetadataContextHolder.getOrCreate(); MessageMetadataContainer callerMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); MessageMetadataContainer calleeMsgContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + ServiceKey caller = routeInfo.getSourceService() == null ? null : routeInfo.getSourceService().getServiceKey(); + ServiceKey callee = instances.getServiceKey() == null ? null : instances.getServiceKey(); - LaneRuleContainer container = fetchLaneRules(routeInfo, instances); + LaneRuleContainer container = fetchLaneRules(manager, caller, callee); + + // get callee lane group list + List laneGroupList = container.getGroupListByCalleeNamespaceAndService(callee); // 判断当前流量是否已存在染色 String stainLabel = callerMsgContainer.getHeader(TRAFFIC_STAIN_LABEL); @@ -176,9 +136,8 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw // 泳道规则不存在,转为基线路由 if (!targetRule.isPresent()) { - return new RouteResult(redirectToBase(instances), RouteResult.State.Next); + return new RouteResult(redirectToBase(laneGroupList, instances), RouteResult.State.Next); } - ServiceKey caller = routeInfo.getSourceService() == null ? null : routeInfo.getSourceService().getServiceKey(); LaneProto.LaneRule laneRule = targetRule.get(); // 尝试进行流量染色动作,该操作仅在当前 Caller 服务为泳道入口时操作 @@ -190,11 +149,11 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw } else { LOG.debug("current traffic not in lane, redirect to base, caller: {} callee: {}", caller, instances.getServiceKey()); // 如果当前自己不是泳道入口,并且没有发现已经染色的标签,不能走泳道路由, - return new RouteResult(redirectToBase(instances), RouteResult.State.Next); + return new RouteResult(redirectToBase(laneGroupList, instances), RouteResult.State.Next); } } - List resultInstances = tryRedirectToLane(container, laneRule, instances); + List resultInstances = tryRedirectToLane(container, laneRule, laneGroupList, instances); if (CollectionUtils.isNotEmpty(resultInstances)) { return new RouteResult(resultInstances, RouteResult.State.Next); } @@ -203,21 +162,22 @@ public RouteResult router(RouteInfo routeInfo, ServiceInstances instances) throw return new RouteResult(Collections.emptyList(), RouteResult.State.Next); } // 宽松模式,降级为返回基线实例 - return new RouteResult(redirectToBase(instances), RouteResult.State.Next); + return new RouteResult(redirectToBase(laneGroupList, instances), RouteResult.State.Next); } - private List tryRedirectToLane(LaneRuleContainer container, LaneProto.LaneRule rule, ServiceInstances instances) { + private List tryRedirectToLane(LaneRuleContainer container, LaneProto.LaneRule rule, + List laneGroupList, ServiceInstances instances) { LaneProto.LaneGroup group = container.groups.get(rule.getGroupName()); if (Objects.isNull(group)) { LOG.debug("not found lane_group, redirect to base, lane_rule: {}, lane_group: {}, callee: {}", rule.getName(), rule.getGroupName(), instances.getServiceKey()); // 泳道组不存在,直接认为不需要过滤实例, 默认转发至基线实例 - return redirectToBase(instances); + return redirectToBase(laneGroupList, instances); } // 判断目标服务是否属于泳道内服务 boolean inLane = false; ServiceKey callee = instances.getServiceKey(); for (RoutingProto.DestinationGroup destination : group.getDestinationsList()) { - if (Objects.equals(callee.getNamespace(), destination.getNamespace()) && Objects.equals(callee.getService(), destination.getService())) { + if (RuleUtils.matchService(callee, destination.getNamespace(), destination.getService())) { inLane = true; break; } @@ -226,7 +186,7 @@ private List tryRedirectToLane(LaneRuleContainer container, LaneProto. // 不在泳道内的服务,不需要进行实例过滤, 默认转发至基线实例 if (!inLane) { LOG.debug("current traffic not in lane, redirect to base, lane_rule: {}, lane_group: {}, callee: {}", rule.getName(), rule.getGroupName(), instances.getServiceKey()); - return redirectToBase(instances); + return redirectToBase(laneGroupList, instances); } return instances.getInstances().stream().filter(instance -> { @@ -234,19 +194,46 @@ private List tryRedirectToLane(LaneRuleContainer container, LaneProto. if (CollectionUtils.isEmpty(metadata)) { return false; } - String val = metadata.get(INTERNAL_INSTANCE_LANE_KEY); - return StringUtils.equals(val, rule.getDefaultLabelValue()); + String labelKey = StringUtils.isNotBlank(rule.getLabelKey()) ? rule.getLabelKey() : INTERNAL_INSTANCE_LANE_KEY; + String val = metadata.get(labelKey); + String defaultLabelValue = rule.getDefaultLabelValue(); + Set defaultLabelValues = new HashSet<>(Arrays.asList(defaultLabelValue.split(","))); + return defaultLabelValues.contains(val); }).collect(Collectors.toList()); } - private List redirectToBase(ServiceInstances instances) { + private List redirectToBase(List laneGroupList, ServiceInstances instances) { return instances.getInstances().stream().filter(instance -> { Map metadata = instance.getMetadata(); if (CollectionUtils.isEmpty(metadata)) { return true; } - // 元数据中没有携带 lane 标签的实例均认为是基线实例 - return !metadata.containsKey(INTERNAL_INSTANCE_LANE_KEY); + + boolean inBase = true; + for (LaneProto.LaneGroup laneGroup : laneGroupList) { + Map> laneKeyValueMap = new HashMap<>(); + for (LaneProto.LaneRule laneRule : laneGroup.getRulesList()) { + String labelKey = StringUtils.isNotBlank(laneRule.getLabelKey()) ? laneRule.getLabelKey() : INTERNAL_INSTANCE_LANE_KEY; + if (!laneKeyValueMap.containsKey(labelKey)) { + laneKeyValueMap.put(labelKey, new HashSet<>()); + } + String defaultLabelValue = laneRule.getDefaultLabelValue(); + String[] split = defaultLabelValue.split(","); + laneKeyValueMap.get(labelKey).addAll(Arrays.asList(split)); + } + if (CollectionUtils.isNotEmpty(laneKeyValueMap)) { + for (Map.Entry entry : metadata.entrySet()) { + if (laneKeyValueMap.containsKey(entry.getKey()) && laneKeyValueMap.get(entry.getKey()).contains(entry.getValue())) { + inBase = false; + break; + } + } + } + if (!inBase) { + return false; + } + } + return true; }).collect(Collectors.toList()); } @@ -263,45 +250,52 @@ private boolean tryStainCurrentTraffic(MetadataContext manager, ServiceKey calle throw new PolarisException(ErrorCode.INVALID_STATE, "lane_group where lane_rule located not found"); } - try { - boolean needStain = false; - for (LaneProto.TrafficEntry entry : group.getEntriesList()) { + boolean needStain = isTrafficEntry(group, manager, caller); + if (needStain) { + MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + metadataContainer.setHeader(TRAFFIC_STAIN_LABEL, buildStainLabel(rule), TransitiveType.PASS_THROUGH); + } + LOG.debug("stain current traffic: {}, lane_rule: {}, lane_group: {}, caller: {}", needStain, rule.getName(), rule.getGroupName(), caller); + return needStain; + } + + private LaneRuleContainer fetchLaneRules(MetadataContext manager, ServiceKey caller, ServiceKey callee) { + // 获取泳道规则 + List result = new ArrayList<>(); + if (Objects.nonNull(caller)) { + result.addAll(ruleGetter.apply(caller)); + } + if (Objects.nonNull(callee)) { + result.addAll(ruleGetter.apply(callee)); + } + return new LaneRuleContainer(manager, caller, result); + } + + private static boolean isTrafficEntry(LaneProto.LaneGroup group, MetadataContext manager, ServiceKey caller) { + boolean result = false; + for (LaneProto.TrafficEntry entry : group.getEntriesList()) { + try { switch (entry.getType()) { case GATEWAY_SELECTOR: LaneProto.ServiceGatewaySelector gatewaySelector = entry.getSelector().unpack(LaneProto.ServiceGatewaySelector.class); - if (Objects.equals(caller.getNamespace(), gatewaySelector.getNamespace()) && Objects.equals(caller.getService(), gatewaySelector.getService())) { - needStain = true; + if (RuleUtils.matchService(caller, gatewaySelector.getNamespace(), gatewaySelector.getService()) + && RuleUtils.matchMetadata(gatewaySelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { + result = true; } break; case SERVICE_SELECTOR: LaneProto.ServiceSelector serviceSelector = entry.getSelector().unpack(LaneProto.ServiceSelector.class); - if (Objects.equals(caller.getNamespace(), serviceSelector.getNamespace()) && Objects.equals(caller.getService(), serviceSelector.getService())) { - needStain = true; + if (RuleUtils.matchService(caller, serviceSelector.getNamespace(), serviceSelector.getService()) + && RuleUtils.matchMetadata(serviceSelector.getLabelsMap(), null, manager.getMetadataContainerGroup(false))) { + result = true; } break; } + } catch (InvalidProtocolBufferException invalidProtocolBufferException) { + LOG.warn("lane_group: {} unpack traffic entry selector fail", group.getName(), invalidProtocolBufferException); } - - if (needStain) { - MessageMetadataContainer metadataContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); - metadataContainer.setHeader(TRAFFIC_STAIN_LABEL, buildStainLabel(rule), TransitiveType.PASS_THROUGH); - } - LOG.debug("stain current traffic: {}, lane_rule: {}, lane_group: {}, caller: {}", needStain, rule.getName(), rule.getGroupName(), caller); - return needStain; - } catch (InvalidProtocolBufferException e) { - LOG.error("lane_rule: {}, lane_group: {} unpack traffic entry selector fail", rule.getName(), rule.getGroupName(), e); - throw new PolarisException(ErrorCode.INVALID_RULE); - } - } - - private LaneRuleContainer fetchLaneRules(RouteInfo routeInfo, ServiceInstances instances) { - List result = new ArrayList<>(); - // 获取泳道规则 - if (Objects.nonNull(routeInfo.getSourceService())) { - result.addAll(ruleGetter.apply(routeInfo.getSourceService().getServiceKey())); } - result.addAll(ruleGetter.apply(instances.getServiceKey())); - return new LaneRuleContainer(result); + return result; } private static class LaneRuleContainer { @@ -311,9 +305,10 @@ private static class LaneRuleContainer { private final Map ruleMapping = new HashMap<>(); - LaneRuleContainer(List list) { + LaneRuleContainer(MetadataContext manager, ServiceKey caller, List list) { list.forEach(laneGroup -> { if (groups.containsKey(laneGroup.getName())) { + LOG.warn("lane group: {} duplicate, ignore", laneGroup.getName()); return; } groups.put(laneGroup.getName(), laneGroup); @@ -327,8 +322,30 @@ private static class LaneRuleContainer { }); }); - // 数字越小,规则优先级越大 - rules.sort(Comparator.comparingInt(LaneProto.LaneRule::getPriority)); + rules.sort((o1, o2) -> { + // 主调泳道入口规则优先 + boolean b1 = isTrafficEntry(groups.get(o1.getGroupName()), manager, caller); + boolean b2 = isTrafficEntry(groups.get(o2.getGroupName()), manager, caller); + int entryResult = CompareUtils.compareBoolean(b1, b2); + if (entryResult != 0) { + return entryResult; + } + + // 比较优先级,数字越小,规则优先级越大 + return o1.getPriority() - o2.getPriority(); + }); + } + + public List getGroupListByCalleeNamespaceAndService(ServiceKey callee) { + List groupList = new ArrayList<>(); + for (LaneProto.LaneGroup group : groups.values()) { + for (RoutingProto.DestinationGroup destinationGroup : group.getDestinationsList()) { + if (RuleUtils.matchService(callee, destinationGroup.getNamespace(), destinationGroup.getService())) { + groupList.add(group); + } + } + } + return groupList; } public Optional matchRule(String labelValue) { @@ -352,8 +369,9 @@ public Optional matchRule(RouteInfo routeInfo, MetadataConte switch (sourceMatch.getValue().getValueType()) { case TEXT: // 直接匹配 - boolean a = RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, - sourceMatch.getValue().getValue().getValue()); + boolean a = StringUtils.isNotBlank(trafficValue) && + RuleUtils.matchStringValue(sourceMatch.getValue().getType(), trafficValue, + sourceMatch.getValue().getValue().getValue()); booleans.add(a); break; case VARIABLE: @@ -403,8 +421,9 @@ public Optional matchRule(RouteInfo routeInfo, MetadataConte private static String findTrafficValue(RouteInfo routeInfo, RoutingProto.SourceMatch sourceMatch, MetadataContext manager) { Map trafficLabels = routeInfo.getRouterMetadata(ServiceRouterConfig.DEFAULT_ROUTER_LANE); - MessageMetadataContainer calleeContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); - MessageMetadataContainer callerContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); + MessageMetadataContainer calleeMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, false); + MetadataContainer calleeCustomContainer = manager.getMetadataContainer(MetadataType.CUSTOM, false); + MessageMetadataContainer callerMessageContainer = manager.getMetadataContainer(MetadataType.MESSAGE, true); String trafficValue = ""; switch (sourceMatch.getType()) { @@ -413,49 +432,49 @@ private static String findTrafficValue(RouteInfo routeInfo, RoutingProto.SourceM if (trafficLabels.containsKey(headerKey)) { return trafficLabels.get(headerKey); } - trafficValue = Optional.ofNullable(calleeContainer.getHeader(sourceMatch.getKey())).orElse(callerContainer.getHeader(sourceMatch.getKey())); + trafficValue = Optional.ofNullable(calleeMessageContainer.getHeader(sourceMatch.getKey())).orElse(callerMessageContainer.getHeader(sourceMatch.getKey())); break; case CUSTOM: String customKey = RouteArgument.ArgumentType.CUSTOM.key(sourceMatch.getKey()); if (trafficLabels.containsKey(customKey)) { return trafficLabels.get(customKey); } - trafficValue = Optional.ofNullable(calleeContainer.getRawMetadataStringValue(sourceMatch.getKey())).orElse(callerContainer.getRawMetadataStringValue(sourceMatch.getKey())); + trafficValue = Optional.ofNullable(calleeCustomContainer.getRawMetadataStringValue(sourceMatch.getKey())).orElse(""); break; case METHOD: String methodKey = RouteArgument.ArgumentType.METHOD.key(sourceMatch.getKey()); if (trafficLabels.containsKey(methodKey)) { return trafficLabels.get(methodKey); } - trafficValue = Optional.ofNullable(calleeContainer.getMethod()).orElse(callerContainer.getMethod()); + trafficValue = Optional.ofNullable(calleeMessageContainer.getMethod()).orElse(callerMessageContainer.getMethod()); break; case CALLER_IP: String callerIpKey = RouteArgument.ArgumentType.CALLER_IP.key(sourceMatch.getKey()); if (trafficLabels.containsKey(callerIpKey)) { return trafficLabels.get(callerIpKey); } - trafficValue = Optional.ofNullable(calleeContainer.getCallerIP()).orElse(callerContainer.getCallerIP()); + trafficValue = Optional.ofNullable(calleeMessageContainer.getCallerIP()).orElse(callerMessageContainer.getCallerIP()); break; case COOKIE: String cookieKey = RouteArgument.ArgumentType.COOKIE.key(sourceMatch.getKey()); if (trafficLabels.containsKey(cookieKey)) { return trafficLabels.get(cookieKey); } - trafficValue = Optional.ofNullable(calleeContainer.getCookie(sourceMatch.getKey())).orElse(callerContainer.getCookie(sourceMatch.getKey())); + trafficValue = Optional.ofNullable(calleeMessageContainer.getCookie(sourceMatch.getKey())).orElse(callerMessageContainer.getCookie(sourceMatch.getKey())); break; case QUERY: String queryKey = RouteArgument.ArgumentType.QUERY.key(sourceMatch.getKey()); if (trafficLabels.containsKey(queryKey)) { return trafficLabels.get(queryKey); } - trafficValue = Optional.ofNullable(calleeContainer.getQuery(sourceMatch.getKey())).orElse(callerContainer.getQuery(sourceMatch.getKey())); + trafficValue = Optional.ofNullable(calleeMessageContainer.getQuery(sourceMatch.getKey())).orElse(callerMessageContainer.getQuery(sourceMatch.getKey())); break; case PATH: String pathKey = RouteArgument.ArgumentType.PATH.key(sourceMatch.getKey()); if (trafficLabels.containsKey(pathKey)) { return trafficLabels.get(pathKey); } - trafficValue = Optional.ofNullable(calleeContainer.getPath()).orElse(callerContainer.getPath()); + trafficValue = Optional.ofNullable(calleeMessageContainer.getPath()).orElse(callerMessageContainer.getPath()); break; } return trafficValue; @@ -464,9 +483,4 @@ private static String findTrafficValue(RouteInfo routeInfo, RoutingProto.SourceM private static String buildStainLabel(LaneProto.LaneRule rule) { return rule.getGroupName() + "/" + rule.getName(); } - - @JustForTest - public void setRuleGetter(Function> ruleGetter) { - this.ruleGetter = ruleGetter; - } } diff --git a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml index a851c9679..207c768ed 100644 --- a/polaris-ratelimit/polaris-ratelimit-factory/pom.xml +++ b/polaris-ratelimit/polaris-ratelimit-factory/pom.xml @@ -49,6 +49,11 @@ router-isolated ${project.version} + + com.tencent.polaris + router-lane + ${project.version} + com.tencent.polaris router-healthy diff --git a/pom.xml b/pom.xml index c95b68e35..e380e22d8 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 1.8 1.8 2.14.2 - 1.5.3 + 1.5.4-SNAPSHOT 1.4.5 2.9.1 2.1.1