forked from couchbase/gocb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bucket_internal.go
69 lines (60 loc) · 2.17 KB
/
bucket_internal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package gocb
import (
"github.com/couchbase/gocb/gocbcore"
)
type bucketInternal struct {
b *Bucket;
}
// Retrieves a document from the bucket
func (bi *bucketInternal) GetRandom(valuePtr interface{}) (string, Cas, error) {
return bi.b.getRandom(valuePtr)
}
// Inserts or replaces (with meta) a document in a bucket.
func (bi *bucketInternal) UpsertMeta(key string, value, extra []byte, flags, expiry uint32, cas, revseqno uint64) (Cas, error) {
outcas, _, err := bi.b.upsertMeta(key, value, extra, flags, expiry, cas, revseqno)
return outcas, err
}
// Removes a document (with meta) from the bucket.
func (bi *bucketInternal) RemoveMeta(key string, extra []byte, flags, expiry uint32, cas, revseqno uint64) (Cas, error) {
outcas, _, err := bi.b.removeMeta(key, extra, flags, expiry, cas, revseqno)
return outcas, err
}
func (b *Bucket) getRandom(valuePtr interface{}) (keyOut string, casOut Cas, errOut error) {
signal := make(chan bool, 1)
op, err := b.client.GetRandom(func(keyBytes, bytes []byte, flags uint32, cas gocbcore.Cas, err error) {
errOut = err
if errOut == nil {
errOut = b.transcoder.Decode(bytes, flags, valuePtr)
if errOut == nil {
casOut = Cas(cas)
keyOut = string(keyBytes)
}
}
signal <- true
})
if err != nil {
return "", 0, err
}
timeoutTmr := gocbcore.AcquireTimer(b.opTimeout)
select {
case <-signal:
gocbcore.ReleaseTimer(timeoutTmr, false)
return
case <-timeoutTmr.C:
gocbcore.ReleaseTimer(timeoutTmr, true)
op.Cancel()
return "", 0, timeoutError{}
}
}
func (b *Bucket) upsertMeta(key string, value, extra []byte, flags uint32, expiry uint32, cas, revseqno uint64) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.SetMeta([]byte(key), value, extra, flags, expiry, cas, revseqno, gocbcore.StoreCallback(cb))
return op, err
})
}
func (b *Bucket) removeMeta(key string, extra []byte, flags uint32, expiry uint32, cas, revseqno uint64) (Cas, MutationToken, error) {
return b.hlpCasExec(func(cb ioCasCallback) (pendingOp, error) {
op, err := b.client.DeleteMeta([]byte(key), extra, flags, expiry, cas, revseqno, gocbcore.RemoveCallback(cb))
return op, err
})
}