Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deploy pixiu as dubbo service egress gateway in k8s istio #446

Merged
merged 18 commits into from
Sep 20, 2022
Merged
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ coverage.txt
target/
samples/**/dist

out/
out/
/test
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/creasty/defaults v1.5.2
github.com/davecgh/go-spew v1.1.1
github.com/docker/cli v20.10.17+incompatible
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220427143451-c0a68bf5b29a
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220612115254-d9a176b25b99
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5
github.com/dubbogo/gost v1.11.25
github.com/dubbogo/grpc-go v1.42.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ github.com/docker/libnetwork v0.8.0-dev.2.0.20200917202933-d0951081b35f/go.mod h
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220427143451-c0a68bf5b29a h1:M4EqGQRk3jO+k50Wa9EtrOoCYlU/kXzbpYxVo4s4Wi0=
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220427143451-c0a68bf5b29a/go.mod h1:1l+6pDTdEHwCyyyJmfckOAdGp6f5PZ33ZVMgxso9q/U=
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220612115254-d9a176b25b99 h1:UjDxgIEu6DbJVJTrxm5mwC0j54jNao1pkYVlT8X+KgY=
github.com/dubbo-go-pixiu/pixiu-api v0.1.6-0.20220612115254-d9a176b25b99/go.mod h1:1l+6pDTdEHwCyyyJmfckOAdGp6f5PZ33ZVMgxso9q/U=
github.com/dubbogo/go-zookeeper v1.0.3/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5 h1:XoR8SSVziXe698dt4uZYDfsmHpKLemqAgFyndQsq5Kw=
github.com/dubbogo/go-zookeeper v1.0.4-0.20211212162352-f9d2183d89d5/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
Expand Down
1 change: 0 additions & 1 deletion pixiu/pkg/adapter/dubboregistry/registry/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package registry

import (
"dubbo.apache.org/dubbo-go/v3/common"

"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/definition"
dr "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"

"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"

"github.com/dubbogo/go-zookeeper/zk"
)

Expand Down
7 changes: 7 additions & 0 deletions pixiu/pkg/config/xds/apiclient/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
import (
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"

anypb "github.com/golang/protobuf/ptypes/any"

"github.com/pkg/errors"

"google.golang.org/protobuf/proto"
Expand All @@ -35,6 +37,7 @@ type (

ProtoAny struct {
typeConfig *v3.TypedExtensionConfig
any *anypb.Any
}

DeltaResources struct {
Expand All @@ -48,6 +51,10 @@ func (p *ProtoAny) GetName() string {
}

func (p *ProtoAny) To(configModel PixiuDynamicConfigModel) error {
if p.any != nil {
return p.any.UnmarshalTo(configModel)
}

err := p.typeConfig.TypedConfig.UnmarshalTo(configModel)
if err != nil {
panic(err)
Expand Down
78 changes: 39 additions & 39 deletions pixiu/pkg/config/xds/apiclient/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
)

import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/server/controls"

envoyconfigcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
extensionpb "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3"
Expand All @@ -39,12 +43,6 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)

import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/server/controls"
)

// agent name to talk with xDS server
const xdsAgentName = "dubbo-go-pixiu"

Expand All @@ -54,18 +52,19 @@ var (
)

type (
GrpcApiClient struct {
GrpcExtensionApiClient struct {
config model.ApiConfigSource
grpcMg *GRPCClusterManager
node *model.Node
xDSExtensionClient extensionpb.ExtensionConfigDiscoveryServiceClient
resourceNames []ResourceTypeName
typeUrl string
exitCh chan struct{}
xdsState xdsState
}
xdsState struct {
nonce string
deltaVersion map[string]string
versionInfo string
}
)

Expand All @@ -82,23 +81,23 @@ func Stop() {
}
}

// CreateGrpcApiClient create Grpc type ApiClient
func CreateGrpcApiClient(config *model.ApiConfigSource, node *model.Node,
// CreateGrpExtensionApiClient create Grpc type ApiClient
func CreateGrpExtensionApiClient(config *model.ApiConfigSource, node *model.Node,
exitCh chan struct{},
typeNames ...ResourceTypeName) *GrpcApiClient {
v := &GrpcApiClient{
config: *config,
node: node,
resourceNames: typeNames,
grpcMg: grpcMg,
exitCh: exitCh,
typeName ResourceTypeName) *GrpcExtensionApiClient {
v := &GrpcExtensionApiClient{
config: *config,
node: node,
typeUrl: typeName,
grpcMg: grpcMg,
exitCh: exitCh,
}
v.init()
return v
}

// Fetch get config data from discovery service and return Any type.
func (g *GrpcApiClient) Fetch(localVersion string) ([]*ProtoAny, error) {
func (g *GrpcExtensionApiClient) Fetch(localVersion string) ([]*ProtoAny, error) {
clsRsp, err := g.xDSExtensionClient.FetchExtensionConfigs(context.Background(), &discoverypb.DiscoveryRequest{
VersionInfo: localVersion,
Node: g.makeNode(),
Expand All @@ -120,39 +119,40 @@ func (g *GrpcApiClient) Fetch(localVersion string) ([]*ProtoAny, error) {
return extensions, nil
}

func (g *GrpcApiClient) decodeSource(resource *anypb.Any) (*ProtoAny, error) {
func (g *GrpcExtensionApiClient) decodeSource(resource *anypb.Any) (*ProtoAny, error) {
extension := envoyconfigcorev3.TypedExtensionConfig{}
err := resource.UnmarshalTo(&extension)
if err != nil {
return nil, errors.Wrapf(err, "typed extension as expected.(%s)", g.resourceNames)
}
elems := &ProtoAny{&extension}
elems := &ProtoAny{typeConfig: &extension}
return elems, nil
}

func (g *GrpcApiClient) makeNode() *envoyconfigcorev3.Node {
func (g *GrpcExtensionApiClient) makeNode() *envoyconfigcorev3.Node {
return &envoyconfigcorev3.Node{
Id: g.node.Id,
Cluster: g.node.Cluster,
UserAgentName: xdsAgentName,
}
}

func (g *GrpcApiClient) Delta() (chan *DeltaResources, error) {
func (g *GrpcExtensionApiClient) Delta() (chan *DeltaResources, error) {
outputCh := make(chan *DeltaResources)
return outputCh, g.runDelta(outputCh)
}

func (g *GrpcApiClient) runDelta(output chan<- *DeltaResources) error {
func (g *GrpcExtensionApiClient) runDelta(output chan<- *DeltaResources) error {
var delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient
var cancel context.CancelFunc
var xState xdsState
backoff := func() {
for {
//back off
var err error
var ctx context.Context // context to sync exitCh
ctx, cancel = context.WithCancel(context.TODO())
delta, err = g.sendInitDeltaRequest(ctx)
delta, err = g.sendInitDeltaRequest(ctx, &xState)
if err != nil {
logger.Error("can not receive delta discovery request, will back off 1 sec later", err)
select {
Expand Down Expand Up @@ -187,9 +187,9 @@ func (g *GrpcApiClient) runDelta(output chan<- *DeltaResources) error {
logger.Error("can not receive delta discovery request", err)
break
}
g.handleDeltaResponse(resp, output)
g.handleDeltaResponse(resp, &xState, output)

err = g.subscribeOnGoingChang(delta)
err = g.subscribeOnGoingChang(delta, &xState)
if err != nil {
logger.Error("can not recv delta discovery request", err)
break
Expand All @@ -202,10 +202,10 @@ func (g *GrpcApiClient) runDelta(output chan<- *DeltaResources) error {
return nil
}

func (g *GrpcApiClient) handleDeltaResponse(resp *discoverypb.DeltaDiscoveryResponse, output chan<- *DeltaResources) {
func (g *GrpcExtensionApiClient) handleDeltaResponse(resp *discoverypb.DeltaDiscoveryResponse, xState *xdsState, output chan<- *DeltaResources) {
// save the xds state
g.xdsState.deltaVersion = make(map[string]string, 1)
g.xdsState.nonce = resp.Nonce
xState.deltaVersion = make(map[string]string, 1)
xState.nonce = resp.Nonce

resources := &DeltaResources{
NewResources: make([]*ProtoAny, 0, 1),
Expand All @@ -219,7 +219,7 @@ func (g *GrpcApiClient) handleDeltaResponse(resp *discoverypb.DeltaDiscoveryResp

for _, res := range resp.Resources {
logger.Infof("new resource found %s version=%s", res.Name, res.Version)
g.xdsState.deltaVersion[res.Name] = res.Version
xState.deltaVersion[res.Name] = res.Version
elems, err := g.decodeSource(res.Resource)
if err != nil {
logger.Infof("can not decode source %s version=%s", res.Name, res.Version, err)
Expand All @@ -230,28 +230,28 @@ func (g *GrpcApiClient) handleDeltaResponse(resp *discoverypb.DeltaDiscoveryResp
output <- resources
}

func (g *GrpcApiClient) subscribeOnGoingChang(delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient) error {
func (g *GrpcExtensionApiClient) subscribeOnGoingChang(delta extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient, xState *xdsState) error {
err := delta.Send(&discoverypb.DeltaDiscoveryRequest{
Node: g.makeNode(),
TypeUrl: resource.ExtensionConfigType,
InitialResourceVersions: g.xdsState.deltaVersion,
ResponseNonce: g.xdsState.nonce,
InitialResourceVersions: xState.deltaVersion,
ResponseNonce: xState.nonce,
})
return err
}

func (g *GrpcApiClient) sendInitDeltaRequest(ctx context.Context) (extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient, error) {
func (g *GrpcExtensionApiClient) sendInitDeltaRequest(ctx context.Context, xState *xdsState) (extensionpb.ExtensionConfigDiscoveryService_DeltaExtensionConfigsClient, error) {
delta, err := g.xDSExtensionClient.DeltaExtensionConfigs(ctx)
if err != nil {
return nil, errors.Wrapf(err, "can not start delta stream with xds server ")
}
err = delta.Send(&discoverypb.DeltaDiscoveryRequest{
Node: g.makeNode(),
TypeUrl: resource.ExtensionConfigType,
ResourceNamesSubscribe: g.resourceNames,
ResourceNamesSubscribe: []string{g.typeUrl},
ResourceNamesUnsubscribe: nil,
InitialResourceVersions: g.xdsState.deltaVersion,
ResponseNonce: g.xdsState.nonce,
InitialResourceVersions: xState.deltaVersion,
ResponseNonce: xState.nonce,
ErrorDetail: nil,
})
if err != nil {
Expand All @@ -260,7 +260,7 @@ func (g *GrpcApiClient) sendInitDeltaRequest(ctx context.Context) (extensionpb.E
return delta, nil
}

func (g *GrpcApiClient) init() {
func (g *GrpcExtensionApiClient) init() {
if len(g.config.ClusterName) == 0 {
panic("should config one cluster at least")
}
Expand Down Expand Up @@ -362,11 +362,11 @@ func (g *GRPCCluster) GetConnection() (conn *grpc.ClientConn, err error) {
grpc.WithTransportCredentials(creds),
grpc.WithBlock(),
)
logger.Infof("connected xds server (%s)", endpoint)
if err != nil {
err = errors.Errorf("grpc.Dial(%s) failed: %v", endpoint, err)
return
}
logger.Infof("connected xds server (%s)", endpoint)
g.conn = conn
})
return g.conn, nil
Expand Down
Loading