Skip to content
This repository has been archived by the owner on Apr 14, 2024. It is now read-only.

Support multiple forwarding modes for egress gateway. #126

Merged
merged 4 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions pkg/catalog/egress_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strings"

policyv1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)
Expand All @@ -17,17 +18,25 @@ func (mc *MeshCatalog) GetEgressGatewayPolicy() (*trafficpolicy.EgressGatewayPol
for _, egressGateway := range egressGateways {
if egressGateway.Spec.GlobalEgressGateways != nil {
for _, globalGateway := range egressGateway.Spec.GlobalEgressGateways {
sourceMeshSvc := service.MeshService{
egressGatewayMeshSvc := service.MeshService{
Name: globalGateway.Service,
Namespace: globalGateway.Namespace,
}
gatewayConfig := trafficpolicy.EgressGatewayConfig{
Service: globalGateway.Service,
Namespace: globalGateway.Namespace,
Weight: globalGateway.Weight,
Endpoints: mc.listEndpointsForService(sourceMeshSvc),
egressGatewaySvc := mc.kubeController.GetService(egressGatewayMeshSvc)
if egressGatewaySvc != nil {
mode, exist := egressGatewaySvc.GetAnnotations()[constants.EgressGatewayModeAnnotation]
if !exist || (constants.EgressGatewayModeHTTP2Tunnel == mode || constants.EgressGatewayModeSock5 == mode) {
mode = constants.EgressGatewayModeHTTP2Tunnel
}
gatewayConfig := trafficpolicy.EgressGatewayConfig{
Service: globalGateway.Service,
Namespace: globalGateway.Namespace,
Mode: mode,
Weight: globalGateway.Weight,
Endpoints: mc.listEndpointsForService(egressGatewayMeshSvc),
}
egressGatewayPolicy.Global = append(egressGatewayPolicy.Global, &gatewayConfig)
}
egressGatewayPolicy.Global = append(egressGatewayPolicy.Global, &gatewayConfig)
}
}
if egressGateway.Spec.EgressPolicyGatewayRules != nil {
Expand All @@ -42,17 +51,25 @@ func (mc *MeshCatalog) GetEgressGatewayPolicy() (*trafficpolicy.EgressGatewayPol
})
}
for _, gateway := range rule.EgressGateways {
sourceMeshSvc := service.MeshService{
egressGatewayMeshSvc := service.MeshService{
Name: gateway.Service,
Namespace: gateway.Namespace,
}
gatewayConfig := trafficpolicy.EgressGatewayConfig{
Service: gateway.Service,
Namespace: gateway.Namespace,
Weight: gateway.Weight,
Endpoints: mc.listEndpointsForService(sourceMeshSvc),
egressGatewaySvc := mc.kubeController.GetService(egressGatewayMeshSvc)
if egressGatewaySvc != nil {
mode, exist := egressGatewaySvc.GetAnnotations()[constants.EgressGatewayModeAnnotation]
if !exist || (constants.EgressGatewayModeHTTP2Tunnel == mode || constants.EgressGatewayModeSock5 == mode) {
mode = constants.EgressGatewayModeHTTP2Tunnel
}
gatewayConfig := trafficpolicy.EgressGatewayConfig{
Service: gateway.Service,
Namespace: gateway.Namespace,
Mode: mode,
Weight: gateway.Weight,
Endpoints: mc.listEndpointsForService(egressGatewayMeshSvc),
}
egressGatewayRule.EgressGateways = append(egressGatewayRule.EgressGateways, gatewayConfig)
}
egressGatewayRule.EgressGateways = append(egressGatewayRule.EgressGateways, gatewayConfig)
}
egressGatewayPolicy.Rules = append(egressGatewayPolicy.Rules, egressGatewayRule)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,21 @@ const (
PrometheusPathAnnotation = "prometheus.io/path"
)

// Annotations used for Egress Gateway
const (
// EgressGatewayModeAnnotation is the key of the annotation used to indicate the mode of egress gateway
EgressGatewayModeAnnotation = "openservicemesh.io/egress-gateway-mode"
)

// Egress Gateway Mode
const (
// http2tunnel
EgressGatewayModeHTTP2Tunnel = "http2tunnel"

// sock5
EgressGatewayModeSock5 = "sock5"
)

// App labels as defined in the "osm.labels" template in _helpers.tpl of the Helm chart.
const (
OSMAppNameLabelKey = "app.kubernetes.io/name"
Expand Down
37 changes: 33 additions & 4 deletions pkg/sidecar/providers/pipy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,27 @@ func (p *PipyRepoClient) Delete(codebaseName string) (success bool, err error) {
return
}

// deleteFile delete codebase file
func (p *PipyRepoClient) deleteFile(fileName string) (success bool, err error) {
var resp *resty.Response

p.httpClient.SetBaseURL(p.apiURI.baseRepoFilesURI)

resp, err = p.httpClient.R().Delete(fileName)

if err == nil {
if resp.IsSuccess() {
success = true
return
}
err = fmt.Errorf("error happened while deleting codebase[%s], reason: %s", fileName, resp.Status())
return
}

log.Err(err).Msgf("error happened while deleting codebase[%s]", fileName)
return
}

// Commit the codebase, version is the current vesion of the codebase, it will be increased by 1 when committing
func (p *PipyRepoClient) commit(codebaseName string, version string) (success bool, err error) {
var etag uint64
Expand Down Expand Up @@ -313,10 +334,18 @@ func (p *PipyRepoClient) Batch(version string, batches []Batch) (success bool, e
// 2. upload each json to repo
for _, item := range batch.Items {
fullPath := fmt.Sprintf("%s%s/%s", batch.Basepath, item.Path, item.Filename)
log.Info().Msgf("Creating/updating config %q", fullPath)
success, err = p.upsertFile(fullPath, item.Content)
if err != nil || !success {
return
if item.Obsolete {
log.Info().Msgf("Deleting config %q", fullPath)
_, err = p.deleteFile(fullPath)
if err != nil {
log.Err(err).Msgf("fail to delete %q", fullPath)
}
} else {
log.Info().Msgf("Creating/updating config %q", fullPath)
success, err = p.upsertFile(fullPath, item.Content)
if err != nil || !success {
return
}
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sidecar/providers/pipy/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Batch struct {

// BatchItem is a resource wrapper
type BatchItem struct {
Obsolete bool
Path string
Filename string
Content interface{}
Expand Down
73 changes: 73 additions & 0 deletions pkg/sidecar/providers/pipy/repo/codebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package repo

import (
_ "embed"

"github.com/openservicemesh/osm/pkg/sidecar/providers/pipy/client"
)

//go:embed codebase/main.js
Expand Down Expand Up @@ -54,3 +56,74 @@ var codebaseOutboundRecvHTTPJS []byte

//go:embed codebase/dns-main.js
var codebaseDNSMainJS []byte

var osmCodebaseItems = []client.BatchItem{
{
Filename: "main.js",
Content: codebaseMainJS,
},
{
Filename: "config.js",
Content: codebaseConfigJS,
},
{
Filename: "metrics.js",
Content: codebaseMetricsJS,
},
{
Filename: osmCodebaseConfig,
Content: codebaseConfigJSON,
},
{
Filename: "codes.js",
Content: codebaseCodesJS,
},
{
Filename: "breaker.js",
Content: codebaseBreakerJS,
},
{
Filename: "gather.js",
Content: codebaseGatherJS,
},
{
Filename: "stats.js",
Content: codebaseStatsJS,
},
{
Filename: "inbound-proxy-tcp.js",
Content: codebaseInboundProxyTCPJS,
},
{
Filename: "inbound-recv-http.js",
Content: codebaseInboundRecvHTTPJS,
},
{
Filename: "inbound-recv-tcp.js",
Content: codebaseInboundRecvTCPJS,
},
{
Filename: "inbound-throttle.js",
Content: codebaseInboundThrottleJS,
},
{
Filename: "outbound-breaker.js",
Content: codebaseOutboundBreakerJS,
},
{
Filename: "outbound-mux-http.js",
Content: codebaseOutboundMuxHTTPJS,
},
{
Filename: "outbound-proxy-tcp.js",
Content: codebaseOutboundProxyTCPJS,
},
{
Filename: "outbound-recv-http.js",
Content: codebaseOutboundRecvHTTPJS,
},
{
Filename: "dns-main.js",
Content: codebaseDNSMainJS,
},
}
19 changes: 11 additions & 8 deletions pkg/sidecar/providers/pipy/repo/codebase/config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// version: '2022.12.03'
// version: '2022.12.11'
(
(config = JSON.decode(pipy.load('config.json')),
metrics = pipy.solve('metrics.js'),
Expand Down Expand Up @@ -218,7 +218,7 @@
Object.fromEntries(Object.entries(json).map(
([name, rule]) => [
name,
rule.map(
rule?.RouteRules && rule.RouteRules.map(
(condition, obj) => (
obj = {
Path: condition.Path,
Expand Down Expand Up @@ -420,12 +420,15 @@
),

global.forwardEgressGateways = config?.Forward?.EgressGateways && Object.fromEntries(
Object.entries(
config.Forward.EgressGateways).map(
([k, v]) => [
k, new algo.RoundRobinLoadBalancer(v?.Endpoints || {})
]
)
Object.entries(config.Forward.EgressGateways).map(
([k, v]) => [
k, { balancer: new algo.RoundRobinLoadBalancer(
Object.fromEntries(Object.entries(v?.Endpoints || {}).map(
([k, v]) => [k, v?.Weight || 100]
))
), mode: v?.Mode }
]
)
),

// Initialize probeScheme, probeTarget, probePath
Expand Down
12 changes: 1 addition & 11 deletions pkg/sidecar/providers/pipy/repo/codebase/main.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// version: '2022.11.29'
// version: '2022.12.11'
((
{
config,
Expand All @@ -12,8 +12,6 @@
outTrafficMatches,
outClustersConfigs,
allowedEndpoints,
forwardMatches,
forwardEgressGateways,
prometheusTarget,
probeScheme,
probeTarget,
Expand Down Expand Up @@ -74,7 +72,6 @@
_outMatch: null,
_outTarget: null,
_egressMode: null,
_egressEndpoint: null,
_outSourceCert: null,
_outRequestTime: null,
_outBytesStruct: null,
Expand Down Expand Up @@ -205,13 +202,6 @@
match
)),

// Find egress nat gateway
forwardMatches && ((policy, egw) => (
policy = _outMatch?.EgressForwardGateway ? _outMatch?.EgressForwardGateway : '*',
egw = forwardMatches[policy]?.next?.()?.id,
egw && (_egressEndpoint = forwardEgressGateways?.[egw]?.next?.()?.id)
))(),

// Layer 4 load balance
_outTarget = (
(
Expand Down
53 changes: 41 additions & 12 deletions pkg/sidecar/providers/pipy/repo/codebase/outbound-proxy-tcp.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// version: '2022.10.09'
// version: '2022.12.11'
((
{
config,
Expand All @@ -7,14 +7,19 @@
tlsCertChain,
tlsPrivateKey,
tlsIssuingCA,
listIssuingCA
listIssuingCA,
forwardMatches,
forwardEgressGateways,
} = pipy.solve('config.js')) => (

pipy({})
pipy({
_egressType: '',
_egressEndpoint: null,
})

.import({
_outMatch: 'main',
_egressMode: 'main',
_egressEndpoint: 'main',
_outSourceCert: 'main',
_outTarget: 'main',
_upstreamClusterName: 'main'
Expand All @@ -26,7 +31,16 @@
.pipeline()
.onStart(
() => (
debugLogLevel && console.log('outbound connectTLS - TLS/_egressMode/_egressEndpoint/_outSourceCert', Boolean(tlsCertChain), Boolean(_egressMode), _egressEndpoint, Boolean(_outSourceCert)),
// Find egress nat gateway
forwardMatches && ((policy, egw) => (
policy = _outMatch?.EgressForwardGateway || '*',
(egw = forwardMatches[policy]?.next?.()?.id) && (
_egressType = forwardEgressGateways?.[egw]?.mode || 'http2tunnel',
_egressEndpoint = forwardEgressGateways?.[egw]?.balancer?.next?.()?.id
)
))(),
debugLogLevel && console.log('outbound connectTLS - TLS/_egressMode/_egressEndpoint/_egressType/_outSourceCert',
Boolean(tlsCertChain), Boolean(_egressMode), _egressEndpoint, _egressType, Boolean(_outSourceCert)),
metrics.activeConnectionGauge.withLabels(_upstreamClusterName).increase(),
config?.outClustersBreakers?.[_upstreamClusterName]?.incConnections?.(),
null
Expand Down Expand Up @@ -72,15 +86,30 @@
.connect(() => _outTarget?.id)
),
() => (Boolean(_egressMode) && Boolean(_egressEndpoint)), $ => $
.connectSOCKS(
() => _outTarget?.id,
).to($ => $
.connect(
() => _egressEndpoint
.branch(
() => _egressType === 'http2tunnel', (
$ => $
.connectHTTPTunnel(
() => new Message({
method: 'CONNECT',
path: _outTarget?.id,
})
).to(
$ => $.muxHTTP(() => _outTarget?.id, { version: 2 }).to(
$ => $.connect(() => _egressEndpoint)
)
)
), ($ => $
.connectSOCKS(
() => _outTarget?.id,
).to($ => $
.connect(
() => _egressEndpoint
)
)
)
),
$ => $
.connect(() => _outTarget?.id)
$ => $.connect(() => _outTarget?.id)
)
.handleData(
(data) => (
Expand Down
Loading