Skip to content

Commit

Permalink
add zookeeper registry support (#2284)
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanxin authored Nov 18, 2022
1 parent ed43efe commit 62af4f1
Show file tree
Hide file tree
Showing 10 changed files with 887 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/gf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ jobs:
ports:
- 5236:5236

zookeeper:
image: loads/zookeeper:3.8
ports:
- 2181:2181

strategy:
matrix:
go-version: [ "1.15", "1.16", "1.17", "1.18" ]
Expand Down
71 changes: 71 additions & 0 deletions contrib/registry/zookeeper/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# GoFrame Etcd Registry

Use `zookeeper` as service registration and discovery management.

## Installation
```
go get -u -v github.com/gogf/gf/contrib/registry/zookeeper/v2
```
suggested using `go.mod`:
```
require github.com/gogf/gf/contrib/registry/zookeeper/v2 latest
```

```go
package main

import (
"github.com/gogf/gf/contrib/registry/zookeeper/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gsvc"
)

func main() {
r := zookeeper.New([]string{"127.0.0.1:2181"}, zookeeper.WithRootPath("/gogf"))
gsvc.SetRegistry(r)

s := g.Server(`hello.svc`)
s.BindHandler("/", func(r *ghttp.Request) {
g.Log().Info(r.Context(), `request received`)
r.Response.Write(`Hello world`)
})
s.Run()
}
```

```go
package main

import (
"fmt"
"time"

"github.com/gogf/gf/contrib/registry/zookeeper/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gsel"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/gctx"
)

func main() {
gsvc.SetRegistry(zookeeper.New([]string{"127.0.0.1:2181"},zookeeper.WithRootPath("/gogf")))
gsel.SetBuilder(gsel.NewBuilderRoundRobin())

client := g.Client()
for i := 0; i < 100; i++ {
res, err := client.Get(gctx.New(), `http://hello.svc/`)
if err != nil {
panic(err)
}
fmt.Println(res.ReadAllString())
res.Close()
time.Sleep(time.Second)
}
}
```

## License

`GoFrame zookeeper` is licensed under the [MIT License](../../../LICENSE), 100% free and open-source, forever.

11 changes: 11 additions & 0 deletions contrib/registry/zookeeper/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/gogf/gf/contrib/registry/zookeeper/v2

go 1.15

require (
github.com/go-zookeeper/zk v1.0.3
github.com/gogf/gf/v2 v2.2.2
golang.org/x/sync v0.1.0
)

replace github.com/gogf/gf/v2 => ../../../
172 changes: 172 additions & 0 deletions contrib/registry/zookeeper/go.sum

Large diffs are not rendered by default.

71 changes: 71 additions & 0 deletions contrib/registry/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

// Package zookeeper implements service Registry and Discovery using zookeeper.
package zookeeper

import (
"github.com/go-zookeeper/zk"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"golang.org/x/sync/singleflight"
"time"
)

var _ gsvc.Registry = &Registry{}

type Content struct {
Key string
Value string
}

// Option is etcd registry option.
type Option func(o *options)

type options struct {
namespace string
user string
password string
}

// WithRootPath with registry root path.
func WithRootPath(path string) Option {
return func(o *options) { o.namespace = path }
}

// WithDigestACL with registry password.
func WithDigestACL(user string, password string) Option {
return func(o *options) {
o.user = user
o.password = password
}
}

// Registry is consul registry
type Registry struct {
opts *options
conn *zk.Conn
group singleflight.Group
}

func New(address []string, opts ...Option) *Registry {
conn, _, err := zk.Connect(address, time.Second*120)
if err != nil {
panic(gerror.Wrapf(err,
"Error with connect to zookeeper"),
)
}
options := &options{
namespace: "/microservices",
}
for _, o := range opts {
o(options)
}
return &Registry{
opts: options,
conn: conn,
}
}
71 changes: 71 additions & 0 deletions contrib/registry/zookeeper/zookeeper_discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package zookeeper

import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"path"
"strings"
)

// Search is the etcd discovery search function.
func (r *Registry) Search(_ context.Context, in gsvc.SearchInput) ([]gsvc.Service, error) {
prefix := strings.TrimPrefix(strings.ReplaceAll(in.Prefix, "/", "-"), "-")
instances, err, _ := r.group.Do(prefix, func() (interface{}, error) {
serviceNamePath := path.Join(r.opts.namespace, prefix)
servicesID, _, err := r.conn.Children(serviceNamePath)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with search the children node under %s",
serviceNamePath,
)
}
items := make([]gsvc.Service, 0, len(servicesID))
for _, service := range servicesID {
servicePath := path.Join(serviceNamePath, service)
byteData, _, err := r.conn.Get(servicePath)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with node data which name is %s",
servicePath,
)
}
item, err := unmarshal(byteData)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with unmarshal node data to Content",
)
}
svc, err := gsvc.NewServiceWithKV(item.Key, item.Value)
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with new service with KV in Content",
)
}
items = append(items, svc)
}
return items, nil
})
if err != nil {
return nil, gerror.Wrapf(
err,
"Error with group do",
)
}
return instances.([]gsvc.Service), nil
}

// Watch is the etcd discovery watch function.
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
return newWatcher(ctx, r.opts.namespace, key, r.conn)
}
20 changes: 20 additions & 0 deletions contrib/registry/zookeeper/zookeeper_func.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package zookeeper

import (
"encoding/json"
)

func unmarshal(data []byte) (c *Content, err error) {
err = json.Unmarshal(data, &c)
return
}

func marshal(c *Content) ([]byte, error) {
return json.Marshal(c)
}
Loading

0 comments on commit 62af4f1

Please sign in to comment.