Skip to content

Commit

Permalink
Merge branch 'develop' into fix/fix_ut_tmp_dir
Browse files Browse the repository at this point in the history
  • Loading branch information
HeyJavaBean authored Jan 24, 2025
2 parents 9fb99a3 + d840d1a commit e074eaf
Show file tree
Hide file tree
Showing 169 changed files with 10,247 additions and 921 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
uses: crate-ci/typos@v1.13.14

golangci-lint:
runs-on: [ self-hosted, X64 ]
runs-on: [ Linux, X64 ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down
28 changes: 20 additions & 8 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: [ push, pull_request ]

jobs:
unit-scenario-test:
runs-on: [ self-hosted, X64 ]
runs-on: [ Linux, X64 ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down Expand Up @@ -34,12 +34,26 @@ jobs:
# setting benchtime=100ms is saving our time...
run: go test -bench=. -benchmem -run=none ./... -benchtime=100ms

compatibility-test:
compatibility-test-x64:
strategy:
matrix:
go: [ "1.18", "1.19", "1.20", "1.21", "1.22", "1.23" ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
runs-on: [ Linux, X64 ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go }}
cache: false # don't use cache for self-hosted runners
- name: Unit Test
run: go test -race ./...

compatibility-test-arm:
strategy:
matrix:
go: [ "1.18", "1.19", "1.20", "1.21", "1.22", "1.23" ]
runs-on: [ ARM64 ] # It's OK under Linux or macOS
steps:
- uses: actions/checkout@v4
- name: Set up Go
Expand Down Expand Up @@ -80,15 +94,13 @@ jobs:
bash ./codegen_run.sh
windows-test:
runs-on: windows-latest
env: # Fixes https://github.com/actions/setup-go/issues/240
GOMODCACHE: 'D:\go\pkg\mod'
GOCACHE: 'D:\go\go-build'
runs-on: [ Windows ]
steps:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: stable
cache: false # don't use cache for self-hosted runners
- name: Windows compatibility test
run: go test -run=^$ ./...
38 changes: 33 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strconv"
"sync/atomic"

"github.com/cloudwego/kitex/pkg/streamx"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/localsession/backup"

Expand Down Expand Up @@ -70,8 +72,14 @@ type kClient struct {
mws []endpoint.Middleware
eps endpoint.Endpoint
sEps endpoint.Endpoint
opt *client.Options
lbf *lbcache.BalancerFactory

// streamx
sxStreamMW streamx.StreamMiddleware
sxStreamRecvMW streamx.StreamRecvMiddleware
sxStreamSendMW streamx.StreamSendMiddleware

opt *client.Options
lbf *lbcache.BalancerFactory

inited bool
closed bool
Expand Down Expand Up @@ -302,7 +310,9 @@ func (kc *kClient) initStreamMiddlewares(ctx context.Context) {

func richMWsWithBuilder(ctx context.Context, mwBs []endpoint.MiddlewareBuilder) (mws []endpoint.Middleware) {
for i := range mwBs {
mws = append(mws, mwBs[i](ctx))
if mw := mwBs[i](ctx); mw != nil {
mws = append(mws, mw)
}
}
return
}
Expand Down Expand Up @@ -424,21 +434,30 @@ func (kc *kClient) richRemoteOption() {
// (newClientStreamer: call WriteMeta before remotecli.NewClient)
transInfoHdlr := bound.NewTransMetaHandler(kc.opt.MetaHandlers)
kc.opt.RemoteOpt.PrependBoundHandler(transInfoHdlr)

// add meta handlers into streaming meta handlers
for _, h := range kc.opt.MetaHandlers {
if shdlr, ok := h.(remote.StreamingMetaHandler); ok {
kc.opt.RemoteOpt.StreamingMetaHandlers = append(kc.opt.RemoteOpt.StreamingMetaHandlers, shdlr)
}
}
}
}

func (kc *kClient) buildInvokeChain() error {
mwchain := endpoint.Chain(kc.mws...)

innerHandlerEp, err := kc.invokeHandleEndpoint()
if err != nil {
return err
}
kc.eps = endpoint.Chain(kc.mws...)(innerHandlerEp)
kc.eps = mwchain(innerHandlerEp)

innerStreamingEp, err := kc.invokeStreamingEndpoint()
if err != nil {
return err
}
kc.sEps = endpoint.Chain(kc.mws...)(innerStreamingEp)
kc.sEps = mwchain(innerStreamingEp)
return nil
}

Expand Down Expand Up @@ -720,6 +739,9 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
rpcStats.ImmutableView(),
)

if mi != nil {
ri.Invocation().(rpcinfo.InvocationSetter).SetStreamingMode(mi.StreamingMode())
}
if fromMethod := ctx.Value(consts.CtxKeyMethod); fromMethod != nil {
rpcinfo.AsMutableEndpointInfo(ri.From()).SetMethod(fromMethod.(string))
}
Expand All @@ -732,6 +754,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
}
}

// streamx config
sopt := opt.StreamX
if sopt.RecvTimeout > 0 {
cfg.SetStreamRecvTimeout(sopt.RecvTimeout)
}

ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

if callOpts != nil && callOpts.CompressorName != "" {
Expand Down
86 changes: 86 additions & 0 deletions client/client_streamx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
"context"

"github.com/cloudwego/kitex/client/streamxclient/streamxcallopt"
istreamxclient "github.com/cloudwego/kitex/internal/streamx/streamxclient"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/pkg/streamx"
)

// NewStream create stream for streamx mode
func (kc *kClient) NewStream(ctx context.Context, method string, streamArgs streamx.StreamArgs, callOptions ...streamxcallopt.CallOption) (context.Context, streamx.ClientStream, error) {
if !kc.inited {
panic("client not initialized")
}
if kc.closed {
panic("client is already closed")
}
if ctx == nil {
panic("ctx is nil")
}
var ri rpcinfo.RPCInfo
ctx, ri, _ = kc.initRPCInfo(ctx, method, 0, nil)

err := rpcinfo.AsMutableRPCConfig(ri.Config()).SetInteractionMode(rpcinfo.Streaming)
if err != nil {
return nil, nil, err
}
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)

// tracing
ctx = kc.opt.TracerCtl.DoStart(ctx, ri)
ctx, copts := istreamxclient.NewCtxWithCallOptions(ctx)
callOptions = append(callOptions, istreamxclient.WithStreamCloseCallback(func(nErr error) {
kc.opt.TracerCtl.DoFinish(ctx, ri, nErr)
}))
copts.Apply(callOptions)

if msargs := streamx.AsMutableStreamArgs(streamArgs); msargs != nil {
msargs.SetStreamMiddleware(kc.sxStreamMW)

eventHandler := kc.opt.TracerCtl.GetStreamEventHandler()
if eventHandler == nil {
msargs.SetStreamRecvMiddleware(kc.sxStreamRecvMW)
msargs.SetStreamSendMiddleware(kc.sxStreamSendMW)
} else {
traceRecvMW := streamx.NewStreamRecvStatMiddleware(ctx, eventHandler)
traceSendMW := streamx.NewStreamSendStatMiddleware(ctx, eventHandler)
if kc.sxStreamRecvMW == nil {
msargs.SetStreamRecvMiddleware(traceRecvMW)
} else {
msargs.SetStreamRecvMiddleware(streamx.StreamRecvMiddlewareChain(traceRecvMW, kc.sxStreamRecvMW))
}
if kc.sxStreamSendMW == nil {
msargs.SetStreamSendMiddleware(traceSendMW)
} else {
msargs.SetStreamSendMiddleware(streamx.StreamSendMiddlewareChain(traceSendMW, kc.sxStreamSendMW))
}
}
}
// with streamx mode, req is nil and resp is streamArgs
// it's an ugly trick but if we don't want to refactor too much,
// this is the only way to compatible with current endpoint API.
err = kc.sEps(ctx, nil, streamArgs)
if err != nil {
return nil, nil, err
}
return ctx, streamArgs.Stream().(streamx.ClientStream), nil
}
5 changes: 4 additions & 1 deletion client/genericclient/generic_stream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

func streamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
func StreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
return newClientStreamingServiceInfo(g)
}

Expand Down Expand Up @@ -104,6 +104,9 @@ func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
if extra.GetExtra(generic.CombineServiceKey) == "true" {
svcInfo.Extra["combine_service"] = true
}
if pkg := extra.GetExtra("PackageName"); pkg != "" {
svcInfo.Extra["PackageName"] = pkg
}
}
return svcInfo
}
1 change: 1 addition & 0 deletions client/genericclient/generic_stream_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestGenericStreamService(t *testing.T) {
svcInfo := newClientStreamingServiceInfo(g)
test.Assert(t, svcInfo.Extra["generic"] == true)
test.Assert(t, svcInfo.Extra["combine_service"] == nil)
test.Assert(t, svcInfo.Extra["PackageName"] == "pbapi")
svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
return svcInfo.Methods[name]
}
Expand Down
30 changes: 9 additions & 21 deletions client/genericclient/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,40 +31,28 @@ import (
"github.com/cloudwego/kitex/pkg/streaming"
)

// NOTE: this is a temporary adjustment for ci check. remove it after fully completing the generic streaming support

var (
_ clientStreaming = nil
_ serverStreaming = nil
_ bidirectionalStreaming = nil
_ = newStreamingClient
_ = newClientStreaming
_ = newServerStreaming
_ = newBidirectionalStreaming
)

type clientStreaming interface {
type ClientStreaming interface {
streaming.Stream
Send(req interface{}) error
CloseAndRecv() (resp interface{}, err error)
}

type serverStreaming interface {
type ServerStreaming interface {
streaming.Stream
Recv() (resp interface{}, err error)
}

type bidirectionalStreaming interface {
type BidirectionalStreaming interface {
streaming.Stream
Send(req interface{}) error
Recv() (resp interface{}, err error)
}

func newStreamingClient(destService string, g generic.Generic, opts ...client.Option) (Client, error) {
return newStreamingClientWithServiceInfo(destService, g, streamingServiceInfo(g), opts...)
func NewStreamingClient(destService string, g generic.Generic, opts ...client.Option) (Client, error) {
return NewStreamingClientWithServiceInfo(destService, g, StreamingServiceInfo(g), opts...)
}

func newStreamingClientWithServiceInfo(destService string, g generic.Generic, svcInfo *serviceinfo.ServiceInfo, opts ...client.Option) (Client, error) {
func NewStreamingClientWithServiceInfo(destService string, g generic.Generic, svcInfo *serviceinfo.ServiceInfo, opts ...client.Option) (Client, error) {
var options []client.Option
options = append(options, client.WithGeneric(g))
options = append(options, client.WithDestService(destService))
Expand Down Expand Up @@ -119,7 +107,7 @@ type clientStreamingClient struct {
methodInfo serviceinfo.MethodInfo
}

func newClientStreaming(ctx context.Context, genericCli Client, method string, callOpts ...callopt.Option) (clientStreaming, error) {
func NewClientStreaming(ctx context.Context, genericCli Client, method string, callOpts ...callopt.Option) (ClientStreaming, error) {
gCli, ok := genericCli.(*genericServiceClient)
if !ok {
return nil, errors.New("invalid generic client")
Expand Down Expand Up @@ -154,7 +142,7 @@ type serverStreamingClient struct {
methodInfo serviceinfo.MethodInfo
}

func newServerStreaming(ctx context.Context, genericCli Client, method string, req interface{}, callOpts ...callopt.Option) (serverStreaming, error) {
func NewServerStreaming(ctx context.Context, genericCli Client, method string, req interface{}, callOpts ...callopt.Option) (ServerStreaming, error) {
gCli, ok := genericCli.(*genericServiceClient)
if !ok {
return nil, errors.New("invalid generic client")
Expand Down Expand Up @@ -191,7 +179,7 @@ type bidirectionalStreamingClient struct {
methodInfo serviceinfo.MethodInfo
}

func newBidirectionalStreaming(ctx context.Context, genericCli Client, method string, callOpts ...callopt.Option) (bidirectionalStreaming, error) {
func NewBidirectionalStreaming(ctx context.Context, genericCli Client, method string, callOpts ...callopt.Option) (BidirectionalStreaming, error) {
gCli, ok := genericCli.(*genericServiceClient)
if !ok {
return nil, errors.New("invalid generic client")
Expand Down
Loading

0 comments on commit e074eaf

Please sign in to comment.