Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add zookeeper registry support #2284

Merged
merged 17 commits into from
Nov 18, 2022
5 changes: 5 additions & 0 deletions .github/workflows/gf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ jobs:
ports:
- 5236:5236

zookeeper:
image: zookeeper
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved
ports:
- 2181:2181

strategy:
matrix:
go-version: [ "1.15", "1.16", "1.17", "1.18" ]
Expand Down
17 changes: 17 additions & 0 deletions contrib/registry/zookeeper/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# GoFrame Etcd Registry

Use `zookeeper` as service registration and discovery management.

## Installation
```
go get -u -v github.com/gogf/gf/contrib/registry/zookeeper/v2
```
suggested using `go.mod`:
```
require github.com/gogf/gf/contrib/registry/zookeeper/v2 latest
```

huyuanxin marked this conversation as resolved.
Show resolved Hide resolved
## License

`GoFrame zookeeper` is licensed under the [MIT License](../../../LICENSE), 100% free and open-source, forever.

11 changes: 11 additions & 0 deletions contrib/registry/zookeeper/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/gogf/gf/contrib/registry/zookeeper/v2

go 1.15

require (
github.com/go-zookeeper/zk v1.0.3
github.com/gogf/gf/v2 v2.2.2
golang.org/x/sync v0.1.0
)
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved

replace github.com/gogf/gf/v2 => ../../../
172 changes: 172 additions & 0 deletions contrib/registry/zookeeper/go.sum

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions contrib/registry/zookeeper/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

// Package etcd implements service Registry and Discovery using zookeeper.
package zookeeper
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved

import (
"encoding/json"
)

func unmarshal(data []byte) (c *Content, err error) {
err = json.Unmarshal(data, &c)
return
}

func marshal(c *Content) ([]byte, error) {
return json.Marshal(c)
}
62 changes: 62 additions & 0 deletions contrib/registry/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package zookeeper
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/go-zookeeper/zk"
"github.com/gogf/gf/v2/net/gsvc"
"golang.org/x/sync/singleflight"
)

var _ gsvc.Registry = &Registry{}

type Content struct {
Key string
Value string
}

// Option is etcd registry option.
type Option func(o *options)

type options struct {
namespace string
user string
password string
}

// WithRootPath with registry root path.
func WithRootPath(path string) Option {
return func(o *options) { o.namespace = path }
}

// WithDigestACL with registry password.
func WithDigestACL(user string, password string) Option {
return func(o *options) {
o.user = user
o.password = password
}
}

// Registry is consul registry
type Registry struct {
opts *options
conn *zk.Conn
group singleflight.Group
}

func New(conn *zk.Conn, opts ...Option) *Registry {
options := &options{
namespace: "/microservices",
}
for _, o := range opts {
o(options)
}
return &Registry{
opts: options,
conn: conn,
}
}
71 changes: 71 additions & 0 deletions contrib/registry/zookeeper/zookeeper_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package zookeeper
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"path"
"strings"
)

// Search is the etcd discovery search function.
func (r *Registry) Search(_ context.Context, in gsvc.SearchInput) ([]gsvc.Service, error) {
prefix := strings.TrimPrefix(strings.ReplaceAll(in.Prefix, "/", "-"), "-")
instances, err, _ := r.group.Do(prefix, func() (interface{}, error) {
serviceNamePath := path.Join(r.opts.namespace, prefix)
servicesID, _, err := r.conn.Children(serviceNamePath)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with search the children node under %s",
serviceNamePath,
)
}
items := make([]gsvc.Service, 0, len(servicesID))
for _, service := range servicesID {
servicePath := path.Join(serviceNamePath, service)
byteData, _, err := r.conn.Get(servicePath)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with node data which name is %s",
servicePath,
)
}
item, err := unmarshal(byteData)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with unmarshal node data to Content",
)
}
svc, err := gsvc.NewServiceWithKV(item.Key, item.Value)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with new service with KV in Content",
)
}
items = append(items, svc)
}
return items, nil
})
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with group do",
)
}
return instances.([]gsvc.Service), nil
}

// Watch is the etcd discovery watch function.
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
return newWatcher(ctx, r.opts.namespace, key, r.conn)
}
127 changes: 127 additions & 0 deletions contrib/registry/zookeeper/zookeeper_registrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package zookeeper
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"github.com/go-zookeeper/zk"
"github.com/gogf/gf/v2/errors/gerror"
"path"
"strings"
"time"

"github.com/gogf/gf/v2/net/gsvc"
)

// Register registers `service` to Registry.
// Note that it returns a new Service if it changes the input Service with custom one.
func (r *Registry) Register(_ context.Context, service gsvc.Service) (gsvc.Service, error) {
var (
data []byte
err error
)
if err = r.ensureName(r.opts.namespace, []byte(""), 0); err != nil {
return service, gerror.Wrapf(
err,
"Error Creat node which name is %s",
r.opts.namespace,
)
}
prefix := strings.TrimPrefix(strings.ReplaceAll(service.GetPrefix(), "/", "-"), "-")
servicePrefixPath := path.Join(r.opts.namespace, prefix)
if err = r.ensureName(servicePrefixPath, []byte(""), 0); err != nil {
return service, gerror.Wrapf(
err,
"Error Creat node which name is %s",
servicePrefixPath,
)
}

if data, err = marshal(&Content{
Key: service.GetKey(),
Value: service.GetValue(),
}); err != nil {
return service, gerror.Wrapf(
err,
"Error with marshal Content to Json string",
)
}
servicePath := path.Join(servicePrefixPath, service.GetName())
if err = r.ensureName(servicePath, data, zk.FlagEphemeral); err != nil {
return service, gerror.Wrapf(
err,
"Error Creat node which name is %s",
servicePath,
)
}
go r.reRegister(servicePath, data)
return service, nil
}

// Deregister off-lines and removes `service` from the Registry.
func (r *Registry) Deregister(ctx context.Context, service gsvc.Service) error {
ch := make(chan error, 1)
prefix := strings.TrimPrefix(strings.ReplaceAll(service.GetPrefix(), "/", "-"), "-")
servicePath := path.Join(r.opts.namespace, prefix, service.GetName())
go func() {
err := r.conn.Delete(servicePath, -1)
ch <- err
}()
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-ch:
}
return err
huyuanxin marked this conversation as resolved.
Show resolved Hide resolved
}

// ensureName ensure node exists, if not exist, create and set data
func (r *Registry) ensureName(path string, data []byte, flags int32) error {
exists, stat, err := r.conn.Exists(path)
if err != nil {
return err
}
// ephemeral nodes handling after restart
// fixes a race condition if the server crashes without using CreateProtectedEphemeralSequential()
if flags&zk.FlagEphemeral == zk.FlagEphemeral {
err = r.conn.Delete(path, stat.Version)
if err != nil && err != zk.ErrNoNode {
return err
}
exists = false
}
if !exists {
if len(r.opts.user) > 0 && len(r.opts.password) > 0 {
_, err = r.conn.Create(path, data, flags, zk.DigestACL(zk.PermAll, r.opts.user, r.opts.password))
} else {
_, err = r.conn.Create(path, data, flags, zk.WorldACL(zk.PermAll))
}
if err != nil {
return err
}
}
return nil
}

// reRegister re-register data node info when bad connection recovered
func (r *Registry) reRegister(path string, data []byte) {
sessionID := r.conn.SessionID()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
cur := r.conn.SessionID()
// sessionID changed
if cur > 0 && sessionID != cur {
// re-ensureName
if err := r.ensureName(path, data, zk.FlagEphemeral); err != nil {
return
}
sessionID = cur
}
}
}
Loading