Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport/grpchttp2: add http2.Framer bridge #7453

Merged
merged 22 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/transport/grpchttp2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@
}
return fmt.Sprintf("unknown error code %#x", uint32(err))
}

type connError ErrCode

func (err connError) Error() string {
return fmt.Sprintf("connection error: %s", ErrCode(err))

Check warning on line 77 in internal/transport/grpchttp2/errors.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/errors.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
6 changes: 6 additions & 0 deletions internal/transport/grpchttp2/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
FlagContinuationEndHeaders Flag = 0x4
)

func (f Flag) Has(flag Flag) bool {
return f&flag != 0

Check warning on line 57 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L56-L57

Added lines #L56 - L57 were not covered by tests
}

// Setting represents the id and value pair of an HTTP/2 setting.
// See [Setting Format].
//
Expand Down Expand Up @@ -242,6 +246,8 @@
return f.hdr
}

func (f *WindowUpdateFrame) Free() {}

Check warning on line 249 in internal/transport/grpchttp2/framer.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/framer.go#L249

Added line #L249 was not covered by tests

// ContinuationFrame is the representation of a [CONTINUATION Frame]. The
// CONTINUATION frame is used to continue a sequence of header block fragments.
//
Expand Down
214 changes: 214 additions & 0 deletions internal/transport/grpchttp2/http2bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpchttp2

import (
"io"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
)

const (
initHeaderTableSize = 4096
maxFrameLen = 16384
)

type HTTP2FramerBridge struct {
framer *http2.Framer
buf [maxFrameLen]byte
pool grpc.SharedBufferPool
}

func NewHTTP2FramerBridge(w io.Writer, r io.Reader, maxHeaderListSize uint32) *HTTP2FramerBridge {
fr := &HTTP2FramerBridge{
framer: http2.NewFramer(w, r),
pool: grpc.NewSharedBufferPool(),
}

fr.framer.SetReuseFrames()
fr.framer.MaxHeaderListSize = maxHeaderListSize
fr.framer.ReadMetaHeaders = (hpack.NewDecoder(initHeaderTableSize, nil))

return fr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But once I typed the above comment, I had a thought as to whether it makes sense to accept the pool as a parameter instead of defaulting to the default pool.

Totally totally optional is the following: go/go-style/best-practices#option-structure. Even with the pool as a parameter, you will only have 4 arguments. So, not a big deal at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind 4 input parameters, I am happy leaving it as is. I will add the pool as a parameter as well.

}

func (fr *HTTP2FramerBridge) ReadFrame() (Frame, error) {
f, err := fr.framer.ReadFrame()
if err != nil {
return nil, err

Check warning on line 56 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L56

Added line #L56 was not covered by tests
}

h := f.Header()
hdr := &FrameHeader{
Size: h.Length,
Type: FrameType(h.Type),
Flags: Flag(h.Flags),
StreamID: h.StreamID,
}

switch f := f.(type) {
case *http2.DataFrame:
buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.Data())
df := &DataFrame{
hdr: hdr,
Data: buf,
}
df.free = func() {
fr.pool.Put(&buf)
df.Data = nil

Check warning on line 77 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
return df, nil
case *http2.HeadersFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle this type if the underlying framer is setup for meta frames, which is the default I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right, we don't need to handle them for the adapter. Thanks.

buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.HeaderBlockFragment())
hf := &HeadersFrame{
hdr: hdr,
HdrBlock: buf,
}
hf.free = func() {
fr.pool.Put(&buf)
hf.HdrBlock = nil

Check warning on line 89 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}
return hf, nil
case *http2.RSTStreamFrame:
return &RSTStreamFrame{
hdr: hdr,
Code: ErrCode(f.ErrCode),
}, nil
case *http2.SettingsFrame:
buf := make([]Setting, 0, f.NumSettings())
f.ForeachSetting(func(s http2.Setting) error {
buf = append(buf, Setting{
ID: SettingID(s.ID),
Value: s.Val,
})
return nil
})
sf := &SettingsFrame{
hdr: hdr,
Settings: buf,
}
return sf, nil
case *http2.PingFrame:
buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.Data[:])
pf := &PingFrame{
hdr: hdr,
Data: buf,
}
pf.free = func() {
fr.pool.Put(&buf)
pf.Data = nil

Check warning on line 120 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}
return pf, nil
case *http2.GoAwayFrame:
buf := fr.pool.Get(int(hdr.Size - 8))
copy(buf, f.DebugData())
gf := &GoAwayFrame{
hdr: hdr,
DebugData: buf,
Code: ErrCode(f.ErrCode),
LastStreamID: f.LastStreamID,
}
gf.free = func() {
fr.pool.Put(&buf)
gf.DebugData = nil

Check warning on line 134 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}
return gf, nil
case *http2.WindowUpdateFrame:
return &WindowUpdateFrame{
hdr: hdr,
Inc: f.Increment,
}, nil
case *http2.ContinuationFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ... I don't see any handling of this frame type in our current code. Do you happen to know how they are handled currently? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right again, we don't need to handle Continuation frames as they get automatically merged into Meta Headers frame.

buf := fr.pool.Get(int(hdr.Size))
copy(buf, f.HeaderBlockFragment())
return &ContinuationFrame{
hdr: hdr,
HdrBlock: buf,
}, nil
case *http2.MetaHeadersFrame:
return &MetaHeadersFrame{
hdr: hdr,
Fields: f.Fields,
}, nil
}

return nil, connError(ErrCodeProtocol)

Check warning on line 156 in internal/transport/grpchttp2/http2bridge.go

View check run for this annotation

Codecov / codecov/patch

internal/transport/grpchttp2/http2bridge.go#L156

Added line #L156 was not covered by tests
}

func (fr *HTTP2FramerBridge) WriteData(streamID uint32, endStream bool, data ...[]byte) error {
off := 0

for _, s := range data {
off += copy(fr.buf[off:], s)
}

return fr.framer.WriteData(streamID, endStream, fr.buf[:off])
}

func (fr *HTTP2FramerBridge) WriteHeaders(streamID uint32, endStream, endHeaders bool, headerBlock []byte) error {
p := http2.HeadersFrameParam{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: inline

	return fr.framer.WriteHeaders(http2.HeadersFrameParam{
		StreamID:      streamID,
		EndStream:     endStream,
		EndHeaders:    endHeaders,
		BlockFragment: headerBlock,
	})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

StreamID: streamID,
EndStream: endStream,
EndHeaders: endHeaders,
BlockFragment: headerBlock,
}

return fr.framer.WriteHeaders(p)
}

func (fr *HTTP2FramerBridge) WriteRSTStream(streamID uint32, code ErrCode) error {
return fr.framer.WriteRSTStream(streamID, http2.ErrCode(code))
}

func (fr *HTTP2FramerBridge) WriteSettings(settings ...Setting) error {
ss := make([]http2.Setting, 0, len(settings))
for _, s := range settings {
ss = append(ss, http2.Setting{
ID: http2.SettingID(s.ID),
Val: s.Value,
})
}

return fr.framer.WriteSettings(ss...)
}

func (fr *HTTP2FramerBridge) WriteSettingsAck() error {
return fr.framer.WriteSettingsAck()
}

func (fr *HTTP2FramerBridge) WritePing(ack bool, data [8]byte) error {
return fr.framer.WritePing(ack, data)
}

func (fr *HTTP2FramerBridge) WriteGoAway(maxStreamID uint32, code ErrCode, debugData []byte) error {
return fr.framer.WriteGoAway(maxStreamID, http2.ErrCode(code), debugData)
}

func (fr *HTTP2FramerBridge) WriteWindowUpdate(streamID, inc uint32) error {
return fr.framer.WriteWindowUpdate(streamID, inc)
}

func (fr *HTTP2FramerBridge) WriteContinuation(streamID uint32, endHeaders bool, headerBlock []byte) error {
return fr.framer.WriteContinuation(streamID, endHeaders, headerBlock)
}
Loading