-
Notifications
You must be signed in to change notification settings - Fork 19
/
namespace.go
130 lines (99 loc) · 2.77 KB
/
namespace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package curator
import (
"errors"
"fmt"
"strings"
"sync"
)
type namespaceImpl struct {
client *curatorFramework
namespace string
ensurePath EnsurePath
}
func newNamespace(client *curatorFramework, namespace string) *namespaceImpl {
n := &namespaceImpl{
client: client,
namespace: namespace,
}
if len(namespace) > 0 {
if err := ValidatePath("/" + namespace); err != nil {
client.logError(fmt.Errorf("Invalid namespace: %s, %s", namespace, err))
return newNamespace(client, "")
}
n.ensurePath = NewEnsurePath(JoinPath("/", namespace))
}
return n
}
// Apply the namespace to the given path
func FixForNamespace(namespace, path string, isSequential bool) (string, error) {
if len(namespace) > 0 {
return JoinPath(namespace, path), nil
}
return path, nil
}
func (n *namespaceImpl) fixForNamespace(path string, isSequential bool) string {
if n.ensurePath != nil {
n.ensurePath.Ensure(n.client.ZookeeperClient())
}
s, _ := FixForNamespace(n.namespace, path, isSequential)
return s
}
func (n *namespaceImpl) unfixForNamespace(path string) string {
if len(n.namespace) > 0 && len(path) > 0 {
prefix := JoinPath(n.namespace)
if strings.HasPrefix(path, prefix) {
if len(prefix) < len(path) {
return path[len(prefix):]
} else {
return PATH_SEPARATOR
}
}
}
return path
}
type namespaceFacade struct {
curatorFramework
}
func newNamespaceFacade(client *curatorFramework, namespace string) *namespaceFacade {
facade := &namespaceFacade{
curatorFramework: *client,
}
facade.namespace = newNamespace(client, namespace)
facade.fixForNamespace = facade.namespace.fixForNamespace
facade.unfixForNamespace = facade.namespace.unfixForNamespace
return facade
}
func (f *namespaceFacade) Start() error {
return errors.New("the requested operation is not supported")
}
func (f *namespaceFacade) Close() error {
return errors.New("the requested operation is not supported")
}
func (f *namespaceFacade) CuratorListenable() CuratorListenable {
f.logError(errors.New("CuratorListenable() is only available from a non-namespaced CuratorFramework instance"))
return f.curatorFramework.listeners
}
func (f *namespaceFacade) Namespace() string {
return f.namespace.namespace
}
type namespaceFacadeCache struct {
client *curatorFramework
cache map[string]*namespaceFacade
lock sync.Mutex
}
func newNamespaceFacadeCache(client *curatorFramework) *namespaceFacadeCache {
return &namespaceFacadeCache{
client: client,
cache: make(map[string]*namespaceFacade),
}
}
func (c *namespaceFacadeCache) Get(namespace string) *namespaceFacade {
c.lock.Lock()
defer c.lock.Unlock()
if facade, exists := c.cache[namespace]; exists {
return facade
}
facade := newNamespaceFacade(c.client, namespace)
c.cache[namespace] = facade
return facade
}