diff --git a/contrib/cover.sh b/contrib/cover.sh index 5e2c179a8..d94b162b6 100755 --- a/contrib/cover.sh +++ b/contrib/cover.sh @@ -6,14 +6,14 @@ TMP=$(mktemp /tmp/badger-coverage-XXXXX.txt) BUILD=$1 OUT=$2 -set -e +set -ex pushd $SRC &> /dev/null # create coverage output echo 'mode: atomic' > $OUT for PKG in $(go list ./...|grep -v -E 'vendor'); do - go test -covermode=atomic -coverprofile=$TMP $PKG + go test -v -covermode=atomic -coverprofile=$TMP $PKG tail -n +2 $TMP >> $OUT done diff --git a/pb/pb.pb.go b/pb/pb.pb.go index bfff39b7b..5c9e6cdc5 100644 --- a/pb/pb.pb.go +++ b/pb/pb.pb.go @@ -8,6 +8,7 @@ import ( proto "github.com/golang/protobuf/proto" io "io" math "math" + math_bits "math/bits" ) // Reference imports to suppress errors if they are not otherwise used. @@ -101,7 +102,9 @@ type KV struct { ExpiresAt uint64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` Meta []byte `protobuf:"bytes,6,opt,name=meta,proto3" json:"meta,omitempty"` // Stream id is used to identify which stream the KV came from. - StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + // Stream done is to indicate end of stream. + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -189,6 +192,13 @@ func (m *KV) GetStreamId() uint32 { return 0 } +func (m *KV) GetStreamDone() bool { + if m != nil { + return m.StreamDone + } + return false +} + type KVList struct { Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -632,46 +642,47 @@ func init() { func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } var fileDescriptor_f80abaa17e25ccc8 = []byte{ - // 611 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xdd, 0x6e, 0x12, 0x41, - 0x14, 0x66, 0x16, 0xba, 0xc0, 0xa1, 0xa5, 0xeb, 0x44, 0x9b, 0x35, 0x2a, 0xc1, 0x35, 0x26, 0xd8, - 0x34, 0x5c, 0xb4, 0xc6, 0x1b, 0xaf, 0x28, 0xc5, 0x48, 0x68, 0x43, 0x32, 0x36, 0x4d, 0xe3, 0x0d, - 0x19, 0x76, 0x0f, 0x65, 0xb3, 0xbf, 0xd9, 0x19, 0x36, 0xe5, 0x4d, 0x7c, 0x0f, 0x5f, 0xc2, 0x4b, - 0x1f, 0xc1, 0xd4, 0x07, 0xd1, 0xcc, 0xec, 0xd2, 0x40, 0xf4, 0xee, 0x9c, 0xef, 0x3b, 0x73, 0xe6, - 0xcc, 0xf7, 0xcd, 0x81, 0x46, 0x3a, 0xef, 0xa7, 0x59, 0x22, 0x13, 0x6a, 0xa4, 0x73, 0xe7, 0x3b, - 0x01, 0x63, 0x72, 0x43, 0x2d, 0xa8, 0x06, 0xb8, 0xb6, 0x49, 0x97, 0xf4, 0xf6, 0x99, 0x0a, 0xe9, - 0x53, 0xd8, 0xcb, 0x79, 0xb8, 0x42, 0xdb, 0xd0, 0x58, 0x91, 0xd0, 0x17, 0xd0, 0x5c, 0x09, 0xcc, - 0x66, 0x11, 0x4a, 0x6e, 0x57, 0x35, 0xd3, 0x50, 0xc0, 0x15, 0x4a, 0x4e, 0x6d, 0xa8, 0xe7, 0x98, - 0x09, 0x3f, 0x89, 0xed, 0x5a, 0x97, 0xf4, 0x6a, 0x6c, 0x93, 0xd2, 0x57, 0x00, 0x78, 0x9f, 0xfa, - 0x19, 0x8a, 0x19, 0x97, 0xf6, 0x9e, 0x26, 0x9b, 0x25, 0x32, 0x90, 0x94, 0x42, 0x4d, 0x37, 0x34, - 0x75, 0x43, 0x1d, 0xab, 0x9b, 0x84, 0xcc, 0x90, 0x47, 0x33, 0xdf, 0xb3, 0xa1, 0x4b, 0x7a, 0x07, - 0xac, 0x51, 0x00, 0x63, 0xcf, 0xe9, 0x82, 0x39, 0xb9, 0xb9, 0xf4, 0x85, 0xa4, 0x47, 0x60, 0x04, - 0xb9, 0x4d, 0xba, 0xd5, 0x5e, 0xeb, 0xd4, 0xec, 0xa7, 0xf3, 0xfe, 0xe4, 0x86, 0x19, 0x41, 0xee, - 0x0c, 0xe0, 0xc9, 0x15, 0x8f, 0xfd, 0x05, 0x0a, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, 0xbf, 0xa0, 0xa4, - 0x27, 0x50, 0x77, 0x75, 0x22, 0xca, 0x13, 0x54, 0x9d, 0xd8, 0xad, 0x63, 0x9b, 0x12, 0xe7, 0x0f, - 0x81, 0xf6, 0x2e, 0x47, 0xdb, 0x60, 0x8c, 0x3d, 0xad, 0x52, 0x8d, 0x19, 0x63, 0x8f, 0x9e, 0x80, - 0x31, 0x4d, 0xb5, 0x42, 0xed, 0xd3, 0x97, 0xff, 0xf6, 0xea, 0x4f, 0x53, 0xcc, 0xb8, 0xf4, 0x93, - 0x98, 0x19, 0xd3, 0x54, 0x49, 0x7a, 0x89, 0x39, 0x86, 0x5a, 0xb8, 0x03, 0x56, 0x24, 0xf4, 0x19, - 0x98, 0x01, 0xae, 0xd5, 0x2b, 0x0b, 0xd1, 0xf6, 0x02, 0x5c, 0x8f, 0x3d, 0xfa, 0x11, 0x0e, 0x31, - 0x76, 0xb3, 0x75, 0xaa, 0x8e, 0xcf, 0x78, 0x78, 0x97, 0x68, 0xdd, 0xda, 0xc5, 0xcc, 0xa3, 0x47, - 0x6a, 0x10, 0xde, 0x25, 0xac, 0x8d, 0x3b, 0x39, 0xed, 0x42, 0xcb, 0x4d, 0xa2, 0x34, 0x43, 0xa1, - 0xdd, 0x30, 0xf5, 0x7d, 0xdb, 0x90, 0xf3, 0x06, 0x9a, 0x8f, 0xc3, 0x51, 0x00, 0x73, 0xc8, 0x46, - 0x83, 0xeb, 0x91, 0x55, 0x51, 0xf1, 0xc5, 0xe8, 0x72, 0x74, 0x3d, 0xb2, 0x88, 0x33, 0x86, 0xd6, - 0x79, 0x98, 0xb8, 0xc1, 0x74, 0xb1, 0x10, 0x28, 0xff, 0xf3, 0x49, 0x8e, 0xc0, 0x4c, 0x34, 0xa7, - 0x35, 0x38, 0x60, 0x65, 0xa6, 0x2a, 0x43, 0x8c, 0xcb, 0x77, 0xaa, 0xd0, 0xf9, 0x0a, 0x70, 0xcd, - 0xe7, 0x21, 0x8e, 0x63, 0x0f, 0xef, 0xe9, 0x3b, 0xa8, 0x17, 0x95, 0x1b, 0x23, 0x0e, 0xd5, 0xa3, - 0xb6, 0xee, 0x62, 0x1b, 0x9e, 0xbe, 0x86, 0xfd, 0x79, 0x98, 0x24, 0xd1, 0x6c, 0xe1, 0x87, 0x12, - 0xb3, 0xf2, 0x3b, 0xb6, 0x34, 0xf6, 0x49, 0x43, 0x4e, 0x02, 0x8d, 0xe1, 0x12, 0xdd, 0x40, 0xac, - 0x22, 0x7a, 0x0c, 0x35, 0xad, 0x15, 0xd1, 0x5a, 0x1d, 0xa9, 0xb6, 0x1b, 0xae, 0xaf, 0xa4, 0xc9, - 0x7c, 0xb9, 0x8c, 0x98, 0xae, 0x51, 0x53, 0x8a, 0x55, 0xa4, 0x3b, 0xd6, 0x98, 0x0a, 0x9d, 0xb7, - 0xd0, 0x7c, 0x2c, 0x2a, 0x54, 0x19, 0x9e, 0x9d, 0x0e, 0xad, 0x0a, 0xdd, 0x87, 0xc6, 0xed, 0xed, - 0x67, 0x2e, 0x96, 0x1f, 0xde, 0x5b, 0xc4, 0x71, 0xa1, 0x7e, 0xc1, 0x25, 0x9f, 0xe0, 0x7a, 0xcb, - 0x3d, 0xb2, 0xed, 0x1e, 0x85, 0x9a, 0xc7, 0x25, 0x2f, 0xa7, 0xd5, 0xb1, 0xfa, 0x3c, 0x7e, 0x5e, - 0x2e, 0x8d, 0xe1, 0xe7, 0x6a, 0x29, 0xdc, 0x0c, 0xb9, 0x44, 0x4f, 0x2d, 0x85, 0x32, 0xbf, 0xca, - 0x9a, 0x25, 0x32, 0x90, 0xc7, 0xcf, 0xa1, 0xbd, 0xeb, 0x32, 0xad, 0x43, 0x95, 0xa3, 0xb0, 0x2a, - 0xe7, 0xd6, 0x8f, 0x87, 0x0e, 0xf9, 0xf9, 0xd0, 0x21, 0xbf, 0x1e, 0x3a, 0xe4, 0xdb, 0xef, 0x4e, - 0x65, 0x6e, 0xea, 0x8d, 0x3e, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x2f, 0x21, 0x79, 0xdd, - 0x03, 0x00, 0x00, + // 631 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcf, 0x6e, 0xd3, 0x4e, + 0x10, 0xce, 0x3a, 0xa9, 0x93, 0x4c, 0xda, 0x34, 0xbf, 0xd5, 0x8f, 0xca, 0x08, 0x08, 0xc6, 0x08, + 0x29, 0x54, 0x55, 0x0e, 0x2d, 0xe2, 0xc2, 0x29, 0x4d, 0x83, 0x88, 0xd2, 0x2a, 0xd2, 0x52, 0x55, + 0x15, 0x97, 0x68, 0x63, 0x4f, 0x1a, 0xcb, 0x7f, 0xd6, 0xb2, 0x37, 0x56, 0xf3, 0x26, 0x3c, 0x12, + 0x47, 0x0e, 0x3c, 0x00, 0x2a, 0x0f, 0x02, 0xda, 0xb5, 0x53, 0x25, 0x82, 0xdb, 0xcc, 0xf7, 0xcd, + 0xce, 0xce, 0x7e, 0xdf, 0x0e, 0x34, 0x92, 0x79, 0x3f, 0x49, 0x85, 0x14, 0xd4, 0x48, 0xe6, 0xce, + 0x0f, 0x02, 0xc6, 0xe4, 0x86, 0x76, 0xa0, 0x1a, 0xe0, 0xda, 0x22, 0x36, 0xe9, 0xed, 0x33, 0x15, + 0xd2, 0xff, 0x61, 0x2f, 0xe7, 0xe1, 0x0a, 0x2d, 0x43, 0x63, 0x45, 0x42, 0x9f, 0x41, 0x73, 0x95, + 0x61, 0x3a, 0x8b, 0x50, 0x72, 0xab, 0xaa, 0x99, 0x86, 0x02, 0xae, 0x50, 0x72, 0x6a, 0x41, 0x3d, + 0xc7, 0x34, 0xf3, 0x45, 0x6c, 0xd5, 0x6c, 0xd2, 0xab, 0xb1, 0x4d, 0x4a, 0x5f, 0x00, 0xe0, 0x7d, + 0xe2, 0xa7, 0x98, 0xcd, 0xb8, 0xb4, 0xf6, 0x34, 0xd9, 0x2c, 0x91, 0x81, 0xa4, 0x14, 0x6a, 0xba, + 0xa1, 0xa9, 0x1b, 0xea, 0x58, 0xdd, 0x94, 0xc9, 0x14, 0x79, 0x34, 0xf3, 0x3d, 0x0b, 0x6c, 0xd2, + 0x3b, 0x60, 0x8d, 0x02, 0x18, 0x7b, 0xf4, 0x25, 0xb4, 0x4a, 0xd2, 0x13, 0x31, 0x5a, 0x2d, 0x9b, + 0xf4, 0x1a, 0x0c, 0x0a, 0xe8, 0x42, 0xc4, 0xe8, 0xd8, 0x60, 0x4e, 0x6e, 0x2e, 0xfd, 0x4c, 0xd2, + 0x23, 0x30, 0x82, 0xdc, 0x22, 0x76, 0xb5, 0xd7, 0x3a, 0x35, 0xfb, 0xc9, 0xbc, 0x3f, 0xb9, 0x61, + 0x46, 0x90, 0x3b, 0x03, 0xf8, 0xef, 0x8a, 0xc7, 0xfe, 0x02, 0x33, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, + 0x3f, 0xa3, 0xa4, 0x27, 0x50, 0x77, 0x75, 0x92, 0x95, 0x27, 0xa8, 0x3a, 0xb1, 0x5b, 0xc7, 0x36, + 0x25, 0xce, 0x6f, 0x02, 0xed, 0x5d, 0x8e, 0xb6, 0xc1, 0x18, 0x7b, 0x5a, 0xc6, 0x1a, 0x33, 0xc6, + 0x1e, 0x3d, 0x01, 0x63, 0x9a, 0x68, 0x09, 0xdb, 0xa7, 0xcf, 0xff, 0xee, 0xd5, 0x9f, 0x26, 0x98, + 0x72, 0xe9, 0x8b, 0x98, 0x19, 0xd3, 0x44, 0x69, 0x7e, 0x89, 0x39, 0x86, 0x5a, 0xd9, 0x03, 0x56, + 0x24, 0xf4, 0x09, 0x98, 0x01, 0xae, 0x95, 0x0c, 0x85, 0xaa, 0x7b, 0x01, 0xae, 0xc7, 0x1e, 0xfd, + 0x00, 0x87, 0x18, 0xbb, 0xe9, 0x3a, 0x51, 0xc7, 0x67, 0x3c, 0xbc, 0x13, 0x5a, 0xd8, 0x76, 0x31, + 0xf3, 0xe8, 0x91, 0x1a, 0x84, 0x77, 0x82, 0xb5, 0x71, 0x27, 0xa7, 0x36, 0xb4, 0x5c, 0x11, 0x25, + 0x29, 0x66, 0xda, 0x2e, 0x53, 0xdf, 0xb7, 0x0d, 0x39, 0xaf, 0xa1, 0xf9, 0x38, 0x1c, 0x05, 0x30, + 0x87, 0x6c, 0x34, 0xb8, 0x1e, 0x75, 0x2a, 0x2a, 0xbe, 0x18, 0x5d, 0x8e, 0xae, 0x47, 0x1d, 0xe2, + 0x8c, 0xa1, 0x75, 0x1e, 0x0a, 0x37, 0x98, 0x2e, 0x16, 0x19, 0xca, 0x7f, 0xfc, 0xa2, 0x23, 0x30, + 0x85, 0xe6, 0xb4, 0x06, 0x07, 0xac, 0xcc, 0x54, 0x65, 0x88, 0x71, 0xf9, 0x4e, 0x15, 0x3a, 0x5f, + 0x00, 0xae, 0xf9, 0x3c, 0xc4, 0x71, 0xec, 0xe1, 0x3d, 0x7d, 0x0b, 0xf5, 0xa2, 0x72, 0x63, 0xc4, + 0xa1, 0x7a, 0xd4, 0xd6, 0x5d, 0x6c, 0xc3, 0xd3, 0x57, 0xb0, 0x3f, 0x0f, 0x85, 0x88, 0x66, 0x0b, + 0x3f, 0x94, 0x98, 0x96, 0xff, 0xb5, 0xa5, 0xb1, 0x8f, 0x1a, 0x72, 0x04, 0x34, 0x86, 0x4b, 0x74, + 0x83, 0x6c, 0x15, 0xd1, 0x63, 0xa8, 0x69, 0xad, 0x88, 0xd6, 0xea, 0x48, 0xb5, 0xdd, 0x70, 0x7d, + 0x25, 0x4d, 0xea, 0xcb, 0x65, 0xc4, 0x74, 0x8d, 0x9a, 0x32, 0x5b, 0x45, 0xba, 0x63, 0x8d, 0xa9, + 0xd0, 0x79, 0x03, 0xcd, 0xc7, 0xa2, 0x42, 0x95, 0xe1, 0xd9, 0xe9, 0xb0, 0x53, 0xa1, 0xfb, 0xd0, + 0xb8, 0xbd, 0xfd, 0xc4, 0xb3, 0xe5, 0xfb, 0x77, 0x1d, 0xe2, 0xb8, 0x50, 0xbf, 0xe0, 0x92, 0x4f, + 0x70, 0xbd, 0xe5, 0x1e, 0xd9, 0x76, 0x8f, 0x42, 0xcd, 0xe3, 0x92, 0x97, 0xd3, 0xea, 0x58, 0x7d, + 0x1e, 0x3f, 0x2f, 0xb7, 0xca, 0xf0, 0x73, 0xb5, 0x35, 0x6e, 0x8a, 0x5c, 0xa2, 0xa7, 0xb6, 0x46, + 0x99, 0x5f, 0x65, 0xcd, 0x12, 0x19, 0xc8, 0xe3, 0xa7, 0xd0, 0xde, 0x75, 0x99, 0xd6, 0xa1, 0xca, + 0x31, 0xeb, 0x54, 0xce, 0x3b, 0xdf, 0x1e, 0xba, 0xe4, 0xfb, 0x43, 0x97, 0xfc, 0x7c, 0xe8, 0x92, + 0xaf, 0xbf, 0xba, 0x95, 0xb9, 0xa9, 0x57, 0xfe, 0xec, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5d, + 0x5f, 0x6a, 0x47, 0xfe, 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -728,6 +739,16 @@ func (m *KV) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.StreamId)) } + if m.StreamDone { + dAtA[i] = 0x58 + i++ + if m.StreamDone { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1041,6 +1062,9 @@ func (m *KV) Size() (n int) { if m.StreamId != 0 { n += 1 + sovPb(uint64(m.StreamId)) } + if m.StreamDone { + n += 2 + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1202,14 +1226,7 @@ func (m *DataKey) Size() (n int) { } func sovPb(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n + return (math_bits.Len64(x|1) + 6) / 7 } func sozPb(x uint64) (n int) { return sovPb(uint64((x << 1) ^ uint64((int64(x) >> 63)))) @@ -1436,6 +1453,26 @@ func (m *KV) Unmarshal(dAtA []byte) error { break } } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamDone", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.StreamDone = bool(v != 0) default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/pb/pb.proto b/pb/pb.proto index 1edb0edc4..4d91eec04 100644 --- a/pb/pb.proto +++ b/pb/pb.proto @@ -29,6 +29,8 @@ message KV { // Stream id is used to identify which stream the KV came from. uint32 stream_id = 10; + // Stream done is used to indicate end of stream. + bool stream_done = 11; } message KVList { diff --git a/stream_writer.go b/stream_writer.go index b3ab8f141..b59eb9bb3 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -17,6 +17,7 @@ package badger import ( + "fmt" "math" "sync" @@ -47,7 +48,7 @@ type StreamWriter struct { throttle *y.Throttle maxVersion uint64 writers map[uint32]*sortedWriter - closer *y.Closer + maxHead valuePointer } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -61,7 +62,6 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), - closer: y.NewCloser(0), } } @@ -85,8 +85,23 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { if len(kvs.GetKv()) == 0 { return nil } + + // closedStreams keeps track of all streams which are going to be marked as done. We are + // keeping track of all streams so that we can close them at the end, after inserting all + // the valid kvs. + closedStreams := make(map[uint32]struct{}) streamReqs := make(map[uint32]*request) for _, kv := range kvs.Kv { + if kv.StreamDone { + closedStreams[kv.StreamId] = struct{}{} + continue + } + + // Panic if some kv comes after stream has been marked as closed. + if _, ok := closedStreams[kv.StreamId]; ok { + panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) + } + var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -104,7 +119,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { ExpiresAt: kv.ExpiresAt, meta: meta, } - // If the value can be colocated with the key in LSM tree, we can skip + // If the value can be collocated with the key in LSM tree, we can skip // writing the value to value log. e.skipVlog = sw.db.shouldWriteValueToLSM(*e) req := streamReqs[kv.StreamId] @@ -121,6 +136,10 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { sw.writeLock.Lock() defer sw.writeLock.Unlock() + + // We are writing all requests to vlog even if some request belongs to already closed stream. + // It is safe to do because we are panicking while writing to sorted writer, which will be nil + // for closed stream. At restart, stream writer will drop all the data in Prepare function. if err := sw.db.vlog.write(all); err != nil { return err } @@ -135,8 +154,35 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } sw.writers[streamID] = writer } + + if writer == nil { + panic(fmt.Sprintf("write performed on closed stream: %d", streamID)) + } + writer.reqCh <- req } + + // Now we can close any streams if required. We will make writer for + // the closed streams as nil. + for streamId := range closedStreams { + writer, ok := sw.writers[streamId] + if !ok { + sw.db.opt.Logger.Warningf("Trying to close stream: %d, but no sorted "+ + "writer found for it", streamId) + continue + } + + writer.closer.SignalAndWait() + if err := writer.Done(); err != nil { + return err + } + + if sw.maxHead.Less(writer.head) { + sw.maxHead = writer.head + } + + sw.writers[streamId] = nil + } return nil } @@ -148,19 +194,26 @@ func (sw *StreamWriter) Flush() error { defer sw.done() - sw.closer.SignalAndWait() - var maxHead valuePointer for _, writer := range sw.writers { + if writer != nil { + writer.closer.SignalAndWait() + } + } + + for _, writer := range sw.writers { + if writer == nil { + continue + } if err := writer.Done(); err != nil { return err } - if maxHead.Less(writer.head) { - maxHead = writer.head + if sw.maxHead.Less(writer.head) { + sw.maxHead = writer.head } } // Encode and write the value log head into a new table. - data := maxHead.Encode() + data := sw.maxHead.Encode() headWriter, err := sw.newWriter(headStreamId) if err != nil { return errors.Wrap(err, "failed to create head writer") @@ -211,6 +264,8 @@ type sortedWriter struct { streamID uint32 reqCh chan *request head valuePointer + // Have separate closer for each writer, as it can be closed at any time. + closer *y.Closer } func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { @@ -227,17 +282,18 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { throttle: sw.throttle, builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), + closer: y.NewCloser(1), } - sw.closer.AddRunning(1) - go w.handleRequests(sw.closer) + + go w.handleRequests() return w, nil } // ErrUnsortedKey is returned when any out of order key arrives at sortedWriter during call to Add. var ErrUnsortedKey = errors.New("Keys not in sorted order") -func (w *sortedWriter) handleRequests(closer *y.Closer) { - defer closer.Done() +func (w *sortedWriter) handleRequests() { + defer w.closer.Done() process := func(req *request) { for i, e := range req.Entries { @@ -273,7 +329,7 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { select { case req := <-w.reqCh: process(req) - case <-closer.HasBeenClosed(): + case <-w.closer.HasBeenClosed(): close(w.reqCh) for req := range w.reqCh { process(req) @@ -292,7 +348,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { sameKey := y.SameKey(key, w.lastKey) // Same keys should go into the same SSTable. if !sameKey && w.builder.ReachedCapacity(w.db.opt.MaxTableSize) { - if err := w.send(); err != nil { + if err := w.send(false); err != nil { return err } } @@ -302,7 +358,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { return nil } -func (w *sortedWriter) send() error { +func (w *sortedWriter) send(done bool) error { if err := w.throttle.Do(); err != nil { return err } @@ -310,6 +366,13 @@ func (w *sortedWriter) send() error { err := w.createTable(builder) w.throttle.Done(err) }(w.builder) + // If done is true, this indicates we can close the writer. + // No need to allocate underlying TableBuilder now. + if done { + w.builder = nil + return nil + } + dk, err := w.db.registry.latestDataKey() if err != nil { return y.Wrapf(err, "Error while retriving datakey in sortedWriter.send") @@ -324,9 +387,12 @@ func (w *sortedWriter) send() error { // to sortedWriter. It completes writing current SST to disk. func (w *sortedWriter) Done() error { if w.builder.Empty() { + // Assign builder as nil, so that underlying memory can be garbage collected. + w.builder = nil return nil } - return w.send() + + return w.send(true) } func (w *sortedWriter) createTable(builder *table.Builder) error { diff --git a/stream_writer_test.go b/stream_writer_test.go index f361f4b1b..332d42387 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -20,8 +20,10 @@ import ( "bytes" "encoding/binary" "fmt" + "io/ioutil" "math" "math/rand" + "os" "testing" "github.com/stretchr/testify/require" @@ -328,3 +330,126 @@ func TestStreamWriter6(t *testing.T) { require.NoError(t, db.Close()) }) } + +func TestStreamDone(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + for i := 0; i < 10; i++ { + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", i)), + Value: val[:], + Version: 1, + StreamId: uint32(i), + } + kv2 := &pb.KV{ + StreamId: uint32(i), + StreamDone: true, + } + list.Kv = append(list.Kv, kv1, kv2) + require.NoError(t, sw.Write(list), "sw.Write() failed") + } + require.NoError(t, sw.Flush(), "sw.Flush() failed") + require.NoError(t, db.Close()) + + var err error + db, err = Open(db.opt) + require.NoError(t, err) + require.NoError(t, db.Close()) + }) +} + +func TestSendOnClosedStream(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + opts := getTestOptions(dir) + db, err := Open(opts) + require.NoError(t, err) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 1)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + kv2 := &pb.KV{ + StreamId: uint32(1), + StreamDone: true, + } + list.Kv = append(list.Kv, kv1, kv2) + require.NoError(t, sw.Write(list), "sw.Write() failed") + + // Defer for panic. + defer func() { + require.NotNil(t, recover(), "should have paniced") + require.NoError(t, sw.Flush()) + require.NoError(t, db.Close()) + }() + // Send once stream is closed. + list = &pb.KVList{} + kv1 = &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 2)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + list.Kv = append(list.Kv, kv1) + sw.Write(list) +} + +func TestSendOnClosedStream2(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + opts := getTestOptions(dir) + db, err := Open(opts) + require.NoError(t, err) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + + var val [10]byte + rand.Read(val[:]) + list := &pb.KVList{} + kv1 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 1)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + kv2 := &pb.KV{ + StreamId: uint32(1), + StreamDone: true, + } + kv3 := &pb.KV{ + Key: []byte(fmt.Sprintf("%d", 2)), + Value: val[:], + Version: 1, + StreamId: uint32(1), + } + list.Kv = append(list.Kv, kv1, kv2, kv3) + + // Defer for panic. + defer func() { + require.NotNil(t, recover(), "should have paniced") + require.NoError(t, sw.Flush()) + require.NoError(t, db.Close()) + }() + + require.NoError(t, sw.Write(list), "sw.Write() failed") +}