Skip to content

Commit

Permalink
Merge pull request #2844 from murgatroid99/grpc-js-xds_config_tears
Browse files Browse the repository at this point in the history
grpc-js-xds: Implement relevant parts of A74 and A75
  • Loading branch information
murgatroid99 authored Nov 5, 2024
2 parents 912beea + 6e907b3 commit 8a31431
Show file tree
Hide file tree
Showing 12 changed files with 1,060 additions and 1,174 deletions.
1 change: 1 addition & 0 deletions packages/grpc-js-xds/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ export const EXPERIMENTAL_CUSTOM_LB_CONFIG = (process.env.GRPC_EXPERIMENTAL_XDS_
export const EXPERIMENTAL_RING_HASH = (process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH ?? 'true') === 'true';
export const EXPERIMENTAL_PICK_FIRST = (process.env.GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG ?? 'false') === 'true';
export const EXPERIMENTAL_DUALSTACK_ENDPOINTS = (process.env.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS ?? 'true') === 'true';
export const AGGREGATE_CLUSTER_BACKWARDS_COMPAT = (process.env.GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT ?? 'false') === 'true';
2 changes: 0 additions & 2 deletions packages/grpc-js-xds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import * as resolver_xds from './resolver-xds';
import * as load_balancer_cds from './load-balancer-cds';
import * as xds_cluster_resolver from './load-balancer-xds-cluster-resolver';
import * as xds_cluster_impl from './load-balancer-xds-cluster-impl';
import * as load_balancer_priority from './load-balancer-priority';
import * as load_balancer_weighted_target from './load-balancer-weighted-target';
Expand All @@ -40,7 +39,6 @@ export { XdsServerCredentials } from './xds-credentials';
export function register() {
resolver_xds.setup();
load_balancer_cds.setup();
xds_cluster_resolver.setup();
xds_cluster_impl.setup();
load_balancer_priority.setup();
load_balancer_weighted_target.setup();
Expand Down
376 changes: 187 additions & 189 deletions packages/grpc-js-xds/src/load-balancer-cds.ts

Large diffs are not rendered by default.

95 changes: 38 additions & 57 deletions packages/grpc-js-xds/src/load-balancer-xds-cluster-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import TypedLoadBalancingConfig = experimental.TypedLoadBalancingConfig;
import selectLbConfigFromList = experimental.selectLbConfigFromList;
import SubchannelInterface = experimental.SubchannelInterface;
import BaseSubchannelWrapper = experimental.BaseSubchannelWrapper;
import UnavailablePicker = experimental.UnavailablePicker;
import { Locality__Output } from "./generated/envoy/config/core/v3/Locality";
import { ClusterConfig, XdsConfig } from "./xds-dependency-manager";
import { CdsUpdate } from "./xds-resource-type/cluster-resource-type";

const TRACER_NAME = 'xds_cluster_impl';

Expand All @@ -53,59 +56,26 @@ export interface DropCategory {
requests_per_million: number;
}

function validateDropCategory(obj: any): DropCategory {
if (!('category' in obj && typeof obj.category === 'string')) {
throw new Error('xds_cluster_impl config drop_categories entry must have a string field category');
}
if (!('requests_per_million' in obj && typeof obj.requests_per_million === 'number')) {
throw new Error('xds_cluster_impl config drop_categories entry must have a number field requests_per_million');
}
return obj;
}

class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
private maxConcurrentRequests: number;
getLoadBalancerName(): string {
return TYPE_NAME;
}
toJsonObject(): object {
const jsonObj: {[key: string]: any} = {
cluster: this.cluster,
drop_categories: this.dropCategories,
child_policy: [this.childPolicy.toJsonObject()],
max_concurrent_requests: this.maxConcurrentRequests,
eds_service_name: this.edsServiceName,
lrs_load_reporting_server: this.lrsLoadReportingServer,
};
return {
[TYPE_NAME]: jsonObj
};
}

constructor(private cluster: string, private dropCategories: DropCategory[], private childPolicy: TypedLoadBalancingConfig, private edsServiceName: string, private lrsLoadReportingServer?: XdsServerConfig, maxConcurrentRequests?: number) {
this.maxConcurrentRequests = maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS;
}
constructor(private cluster: string, private childPolicy: TypedLoadBalancingConfig) {}

getCluster() {
return this.cluster;
}

getEdsServiceName() {
return this.edsServiceName;
}

getLrsLoadReportingServer() {
return this.lrsLoadReportingServer;
}

getMaxConcurrentRequests() {
return this.maxConcurrentRequests;
}

getDropCategories() {
return this.dropCategories;
}

getChildPolicy() {
return this.childPolicy;
}
Expand All @@ -114,27 +84,14 @@ class XdsClusterImplLoadBalancingConfig implements TypedLoadBalancingConfig {
if (!('cluster' in obj && typeof obj.cluster === 'string')) {
throw new Error('xds_cluster_impl config must have a string field cluster');
}
if (!('eds_service_name' in obj && typeof obj.eds_service_name === 'string')) {
throw new Error('xds_cluster_impl config must have a string field eds_service_name');
}
if ('max_concurrent_requests' in obj && !(obj.max_concurrent_requests === undefined || typeof obj.max_concurrent_requests === 'number')) {
throw new Error('xds_cluster_impl config max_concurrent_requests must be a number if provided');
}
if (!('drop_categories' in obj && Array.isArray(obj.drop_categories))) {
throw new Error('xds_cluster_impl config must have an array field drop_categories');
}
if (!('child_policy' in obj && Array.isArray(obj.child_policy))) {
throw new Error('xds_cluster_impl config must have an array field child_policy');
}
const childConfig = selectLbConfigFromList(obj.child_policy);
if (!childConfig) {
throw new Error('xds_cluster_impl config child_policy parsing failed');
}
let lrsServer: XdsServerConfig | undefined = undefined;
if (obj.lrs_load_reporting_server) {
lrsServer = validateXdsServerConfig(obj.lrs_load_reporting_server)
}
return new XdsClusterImplLoadBalancingConfig(obj.cluster, obj.drop_categories.map(validateDropCategory), childConfig, obj.eds_service_name, lrsServer, obj.max_concurrent_requests);
return new XdsClusterImplLoadBalancingConfig(obj.cluster, childConfig);
}
}

Expand Down Expand Up @@ -252,11 +209,12 @@ class XdsClusterImplBalancer implements LoadBalancer {
private latestConfig: XdsClusterImplLoadBalancingConfig | null = null;
private clusterDropStats: XdsClusterDropStats | null = null;
private xdsClient: XdsClient | null = null;
private latestClusterConfig: ClusterConfig | null = null;

constructor(private readonly channelControlHelper: ChannelControlHelper, credentials: ChannelCredentials, options: ChannelOptions) {
this.childBalancer = new ChildLoadBalancerHandler(createChildChannelControlHelper(channelControlHelper, {
createSubchannel: (subchannelAddress, subchannelArgs, credentialsOverride) => {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList) {
if (!this.xdsClient || !this.latestConfig || !this.lastestEndpointList || !this.latestClusterConfig) {
throw new Error('xds_cluster_impl: invalid state: createSubchannel called with xdsClient or latestConfig not populated');
}
const wrapperChild = channelControlHelper.createSubchannel(subchannelAddress, subchannelArgs, credentialsOverride);
Expand All @@ -270,23 +228,23 @@ class XdsClusterImplBalancer implements LoadBalancer {
trace('Not reporting load for address ' + subchannelAddressToString(subchannelAddress) + ' because it has unknown locality.');
return wrapperChild;
}
const lrsServer = this.latestConfig.getLrsLoadReportingServer();
const lrsServer = this.latestClusterConfig.cluster.lrsLoadReportingServer;
let statsObj: XdsClusterLocalityStats | null = null;
if (lrsServer) {
statsObj = this.xdsClient.addClusterLocalityStats(
lrsServer,
this.latestConfig.getCluster(),
this.latestConfig.getEdsServiceName(),
this.latestClusterConfig.cluster.edsServiceName ?? '',
locality
);
}
return new LocalitySubchannelWrapper(wrapperChild, statsObj);
},
updateState: (connectivityState, originalPicker) => {
if (this.latestConfig === null) {
if (this.latestConfig === null || this.latestClusterConfig === null || this.latestClusterConfig.children.type === 'aggregate' || !this.latestClusterConfig.children.endpoints) {
channelControlHelper.updateState(connectivityState, originalPicker);
} else {
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestConfig.getEdsServiceName()), this.latestConfig.getMaxConcurrentRequests(), this.latestConfig.getDropCategories(), this.clusterDropStats);
const picker = new XdsClusterImplPicker(originalPicker, getCallCounterMapKey(this.latestConfig.getCluster(), this.latestClusterConfig.cluster.edsServiceName), this.latestClusterConfig.cluster.maxConcurrentRequests ?? DEFAULT_MAX_CONCURRENT_REQUESTS, this.latestClusterConfig.children.endpoints.dropCategories, this.clusterDropStats);
channelControlHelper.updateState(connectivityState, picker);
}
}
Expand All @@ -297,15 +255,38 @@ class XdsClusterImplBalancer implements LoadBalancer {
trace('Discarding address list update with unrecognized config ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
return;
}
trace('Received update with config: ' + JSON.stringify(lbConfig, undefined, 2));
trace('Received update with config: ' + JSON.stringify(lbConfig.toJsonObject(), undefined, 2));
const xdsConfig = attributes.xdsConfig as XdsConfig;
const maybeClusterConfig = xdsConfig.clusters.get(lbConfig.getCluster());
if (!maybeClusterConfig) {
trace('Received update with no config for cluster ' + lbConfig.getCluster());
return;
}
if (!maybeClusterConfig.success) {
this.latestClusterConfig = null;
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker(maybeClusterConfig.error));
return;
}
const clusterConfig = maybeClusterConfig.value;
if (clusterConfig.children.type === 'aggregate') {
trace('Received update for aggregate cluster ' + lbConfig.getCluster());
return;
}
if (!clusterConfig.children.endpoints) {
this.childBalancer.destroy();
this.channelControlHelper.updateState(connectivityState.TRANSIENT_FAILURE, new UnavailablePicker({details: clusterConfig.children.resolutionNote}));

}
this.lastestEndpointList = endpointList;
this.latestConfig = lbConfig;
this.latestClusterConfig = clusterConfig;
this.xdsClient = attributes.xdsClient as XdsClient;
if (lbConfig.getLrsLoadReportingServer()) {
if (clusterConfig.cluster.lrsLoadReportingServer) {
this.clusterDropStats = this.xdsClient.addClusterDropStats(
lbConfig.getLrsLoadReportingServer()!,
clusterConfig.cluster.lrsLoadReportingServer,
lbConfig.getCluster(),
lbConfig.getEdsServiceName() ?? ''
clusterConfig.cluster.edsServiceName ?? ''
);
}

Expand Down
Loading

0 comments on commit 8a31431

Please sign in to comment.