Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fragment watch response by server request limit #9291

Merged
merged 12 commits into from
May 14, 2018
26 changes: 21 additions & 5 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- Previously, `Repair(dirpath string) bool`, now `Repair(lg *zap.Logger, dirpath string) bool`.
- Previously, `Create(dirpath string, metadata []byte) (*WAL, error)`, now `Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error)`.
- Remove [`pkg/cors` package](https://github.com/coreos/etcd/pull/9490).
- Change [`--experimental-enable-v2v3`](TODO) flag to `--enable-v2v3`; v2 storage emulation is now stable.
- Move internal package `"github.com/coreos/etcd/snap"` to [`"github.com/coreos/etcd/raftsnap"`](https://github.com/coreos/etcd/pull/9211).
- Move internal package `"github.com/coreos/etcd/etcdserver/auth"` to [`"github.com/coreos/etcd/etcdserver/v2auth"`](https://github.com/coreos/etcd/pull/9275).
- Move internal package `"github.com/coreos/etcd/error"` to [`"github.com/coreos/etcd/etcdserver/v2error"`](https://github.com/coreos/etcd/pull/9274).
- Move internal package `"github.com/coreos/etcd/store"` to [`"github.com/coreos/etcd/etcdserver/v2store"`](https://github.com/coreos/etcd/pull/9274).
- [`--experimental-enable-v2v3`](TODO) has been deprecated, `--enable-v2v3` flag is now stable.
- Move internal package `"github.com/coreos/etcd/etcdserver/auth"` to `"github.com/coreos/etcd/etcdserver/api/v2auth"`.
- Move internal package `"github.com/coreos/etcd/etcdserver/stats"` to `"github.com/coreos/etcd/etcdserver/api/v2stats"`.
- Move internal package `"github.com/coreos/etcd/error"` to `"github.com/coreos/etcd/etcdserver/api/v2error"`.
- Move internal package `"github.com/coreos/etcd/store"` to `"github.com/coreos/etcd/etcdserver/api/v2store"`.

### Dependency

Expand Down Expand Up @@ -198,8 +199,14 @@ See [security doc](https://github.com/coreos/etcd/blob/master/Documentation/op-g
### API

- Add [`snapshot`](https://github.com/coreos/etcd/pull/9118) package for snapshot restore/save operations (see [`godoc.org/github.com/etcd/snapshot`](https://godoc.org/github.com/coreos/etcd/snapshot) for more).
- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065), allow user-provided watch ID to `mvcc`.
- Add [`watch_id` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9065) to allow user-provided watch ID to `mvcc`.
- Corresponding `watch_id` is returned via `etcdserverpb.WatchResponse`, if any.
- Add [`fragment` field to `etcdserverpb.WatchCreateRequest`](https://github.com/coreos/etcd/pull/9291) to request etcd server to [split watch events](https://github.com/coreos/etcd/issues/9294) when the total size of events exceeds `--max-request-bytes` flag value plus gRPC-overhead 512 bytes.
- The default server-side request bytes limit is `embed.DefaultMaxRequestBytes` which is 1.5 MiB plus gRPC-overhead 512 bytes.
- If watch response events exceed this server-side request limit and watch request is created with `fragment` field `true`, the server will split watch events into a set of chunks, each of which is a subset of watch events below server-side request limit.
- For example, watch response contains 10 events, where each event is 1 MiB. And server `--max-request-bytes` flag value is 1 MiB. Then, server will send 10 separate fragmented events to the client.
- For example, watch response contains 5 events, where each event is 2 MiB. And server `--max-request-bytes` flag value is 1 MiB and `clientv3.Config.MaxCallRecvMsgSize` is 1 MiB. Then, server will try to send 5 separate fragmented events to the client, and the client will error with `"code = ResourceExhausted desc = grpc: received message larger than max (...)"`.
- Client must implement fragmented watch event merge (which `clientv3` does in etcd v3.4).
- Add [`raftAppliedIndex` field to `etcdserverpb.StatusResponse`](https://github.com/coreos/etcd/pull/9176) for current Raft applied index.
- Add [`errors` field to `etcdserverpb.StatusResponse`](https://github.com/coreos/etcd/pull/9206) for server-side error.
- e.g. `"etcdserver: no leader", "NOSPACE", "CORRUPT"`
Expand All @@ -223,6 +230,15 @@ See [security doc](https://github.com/coreos/etcd/blob/master/Documentation/op-g
- Add [`CLUSTER_DEBUG` to enable test cluster logging](https://github.com/coreos/etcd/pull/9678).
- Deprecated `capnslog` in integration tests.

### client v3

- Add [`WithFragment` `OpOption`](https://github.com/coreos/etcd/pull/9291) to support [watch events fragmentation](https://github.com/coreos/etcd/issues/9294) when the total size of events exceeds `--max-request-bytes` flag value plus gRPC-overhead 512 bytes.
- Watch fragmentation is disabled by default.
- The default server-side request bytes limit is `embed.DefaultMaxRequestBytes` which is 1.5 MiB plus gRPC-overhead 512 bytes.
- If watch response events exceed this server-side request limit and watch request is created with `fragment` field `true`, the server will split watch events into a set of chunks, each of which is a subset of watch events below server-side request limit.
- For example, watch response contains 10 events, where each event is 1 MiB. And server `--max-request-bytes` flag value is 1 MiB. Then, server will send 10 separate fragmented events to the client.
- For example, watch response contains 5 events, where each event is 2 MiB. And server `--max-request-bytes` flag value is 1 MiB and `clientv3.Config.MaxCallRecvMsgSize` is 1 MiB. Then, server will try to send 5 separate fragmented events to the client, and the client will error with `"code = ResourceExhausted desc = grpc: received message larger than max (...)"`.

### etcdctl v3

- Add [`check datascale`](https://github.com/coreos/etcd/pull/9185) command.
Expand Down
2 changes: 2 additions & 0 deletions Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| filters | filters filter the events at server side before it sends back to the watcher. | (slice of) FilterType |
| prev_kv | If prev_kv is set, created watcher gets the previous KV before the event happens. If the previous KV is already compacted, nothing will be returned. | bool |
| watch_id | If watch_id is provided and non-zero, it will be assigned to this watcher. Since creating a watcher in etcd is not a synchronous operation, this can be used ensure that ordering is correct when creating multiple watchers on the same stream. Creating a watcher with an ID already in use on the stream will cause an error to be returned. | int64 |
| fragment | fragment enables splitting large revisions into multiple watch responses. | bool |



Expand All @@ -859,6 +860,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| canceled | canceled is set to true if the response is for a cancel watch request. No further events will be sent to the canceled watcher. | bool |
| compact_revision | compact_revision is set to the minimum index if a watcher tries to watch at a compacted index. This happens when creating a watcher at a compacted revision or the watcher cannot catch up with the progress of the key-value store. The client should treat the watcher as canceled and should not try to create any watcher with the same start_revision again. | int64 |
| cancel_reason | cancel_reason indicates the reason for canceling the watcher. | string |
| fragment | framgment is true if large watch response was split over multiple responses. | bool |
| events | | (slice of) mvccpb.Event |


Expand Down
10 changes: 10 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2359,6 +2359,11 @@
"$ref": "#/definitions/WatchCreateRequestFilterType"
}
},
"fragment": {
"description": "fragment enables splitting large revisions into multiple watch responses.",
"type": "boolean",
"format": "boolean"
},
"key": {
"description": "key is the key to register for watching.",
"type": "string",
Expand Down Expand Up @@ -2430,6 +2435,11 @@
"$ref": "#/definitions/mvccpbEvent"
}
},
"fragment": {
"description": "framgment is true if large watch response was split over multiple responses.",
"type": "boolean",
"format": "boolean"
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
},
Expand Down
123 changes: 123 additions & 0 deletions clientv3/integration/watch_fragment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

// TestWatchFragmentDisable ensures that large watch
// response exceeding server-side request limit can
// arrive even without watch response fragmentation.
func TestWatchFragmentDisable(t *testing.T) {
testWatchFragment(t, false, false)
}

// TestWatchFragmentDisableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// cannot arrive without watch events fragmentation,
// because multiple events exceed client-side gRPC
// response receive limit.
func TestWatchFragmentDisableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, false, true)
}

// TestWatchFragmentEnable ensures that large watch
// response exceeding server-side request limit arrive
// with watch response fragmentation.
func TestWatchFragmentEnable(t *testing.T) {
testWatchFragment(t, true, false)
}

// TestWatchFragmentEnableWithGRPCLimit verifies
// large watch response exceeding server-side request
// limit and client-side gRPC response receive limit
// can arrive only when watch events are fragmented.
func TestWatchFragmentEnableWithGRPCLimit(t *testing.T) {
testWatchFragment(t, true, true)
}

// testWatchFragment triggers watch response that spans over multiple
// revisions exceeding server request limits when combined.
func testWatchFragment(t *testing.T, fragment, exceedRecvLimit bool) {
cfg := &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: 1.5 * 1024 * 1024,
}
if exceedRecvLimit {
cfg.ClientMaxCallRecvMsgSize = 1.5 * 1024 * 1024
}
clus := integration.NewClusterV3(t, cfg)
defer clus.Terminate(t)

cli := clus.Client(0)
errc := make(chan error)
for i := 0; i < 10; i++ {
go func(i int) {
_, err := cli.Put(context.TODO(),
fmt.Sprint("foo", i),
strings.Repeat("a", 1024*1024),
)
errc <- err
}(i)
}
for i := 0; i < 10; i++ {
if err := <-errc; err != nil {
t.Fatalf("failed to put: %v", err)
}
}

opts := []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(1)}
if fragment {
opts = append(opts, clientv3.WithFragment())
}
wch := cli.Watch(context.TODO(), "foo", opts...)

// expect 10 MiB watch response
select {
case ws := <-wch:
// without fragment, should exceed gRPC client receive limit
if !fragment && exceedRecvLimit {
if len(ws.Events) != 0 {
t.Fatalf("expected 0 events with watch fragmentation, got %d", len(ws.Events))
}
exp := "code = ResourceExhausted desc = grpc: received message larger than max ("
if !strings.Contains(ws.Err().Error(), exp) {
t.Fatalf("expected 'ResourceExhausted' error, got %v", ws.Err())
}
return
}

// still expect merged watch events
if len(ws.Events) != 10 {
t.Fatalf("expected 10 events with watch fragmentation, got %d", len(ws.Events))
}
if ws.Err() != nil {
t.Fatalf("unexpected error %v", ws.Err())
}

case <-time.After(testutil.RequestTimeout):
t.Fatalf("took too long to receive events")
}
}
36 changes: 31 additions & 5 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ const (
tTxn
)

var (
noPrefixEnd = []byte{0}
)
var noPrefixEnd = []byte{0}

// Op represents an Operation that kv can execute.
type Op struct {
Expand All @@ -53,6 +51,12 @@ type Op struct {
// for watch, put, delete
prevKV bool

// for watch
// fragmentation should be disabled by default
// if true, split watch events when total exceeds
// "--max-request-bytes" flag value + 512-byte
fragment bool

// for put
ignoreValue bool
ignoreLease bool
Expand All @@ -77,8 +81,15 @@ type Op struct {

// accessors / mutators

func (op Op) IsTxn() bool { return op.t == tTxn }
func (op Op) Txn() ([]Cmp, []Op, []Op) { return op.cmps, op.thenOps, op.elseOps }
// IsTxn returns true if the "Op" type is transaction.
func (op Op) IsTxn() bool {
return op.t == tTxn
}

// Txn returns the comparison(if) operations, "then" operations, and "else" operations.
func (op Op) Txn() ([]Cmp, []Op, []Op) {
return op.cmps, op.thenOps, op.elseOps
}

// KeyBytes returns the byte slice holding the Op's key.
func (op Op) KeyBytes() []byte { return op.key }
Expand Down Expand Up @@ -205,12 +216,14 @@ func (op Op) isWrite() bool {
return op.t != tRange
}

// OpGet returns "get" operation based on given key and operation options.
func OpGet(key string, opts ...OpOption) Op {
ret := Op{t: tRange, key: []byte(key)}
ret.applyOpts(opts)
return ret
}

// OpDelete returns "delete" operation based on given key and operation options.
func OpDelete(key string, opts ...OpOption) Op {
ret := Op{t: tDeleteRange, key: []byte(key)}
ret.applyOpts(opts)
Expand Down Expand Up @@ -239,6 +252,7 @@ func OpDelete(key string, opts ...OpOption) Op {
return ret
}

// OpPut returns "put" operation based on given key-value and operation options.
func OpPut(key, val string, opts ...OpOption) Op {
ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
ret.applyOpts(opts)
Expand Down Expand Up @@ -267,6 +281,7 @@ func OpPut(key, val string, opts ...OpOption) Op {
return ret
}

// OpTxn returns "txn" operation based on given transaction conditions.
func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
}
Expand Down Expand Up @@ -466,6 +481,17 @@ func WithPrevKV() OpOption {
}
}

// WithFragment to receive raw watch response with fragmentation.
// Fragmentation is disabled by default. If fragmentation is enabled,
// etcd watch server will split watch response before sending to clients
// when the total size of watch events exceed server-side request limit.
// The default server-side request limit is 1.5 MiB, which can be configured
// as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
// See "etcdserver/api/v3rpc/watch.go" for more details.
func WithFragment() OpOption {
return func(op *Op) { op.fragment = true }
}

// WithIgnoreValue updates the key using its current value.
// This option can not be combined with non-empty values.
// Returns an error if the key does not exist.
Expand Down
Loading