Skip to content

Commit

Permalink
feat: add test framework & refactor RPC server (#2579)
Browse files Browse the repository at this point in the history
Co-authored-by: Rene Jochum <rene@jochum.dev>
  • Loading branch information
Davincible and jochumdev authored Oct 20, 2022
1 parent c25dee7 commit a3980c2
Show file tree
Hide file tree
Showing 54 changed files with 3,468 additions and 2,262 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
go get -v -t -d ./...
- name: Run tests
id: tests
run: richgo test -v -race -cover ./...
run: richgo test -v -race -cover -bench=. ./...
env:
IN_TRAVIS_CI: yes
RICHGO_FORCE_COLOR: 1
Expand All @@ -60,6 +60,6 @@ jobs:
go get -v -t -d ./...
- name: Run tests
id: tests
run: go test -v -race -cover -json ./... | tparse -notests -format=markdown >> $GITHUB_STEP_SUMMARY
run: go test -v -race -cover -json -bench=. ./... | tparse -notests -format=markdown >> $GITHUB_STEP_SUMMARY
env:
IN_TRAVIS_CI: yes
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ _cgo_export.*
*~
*.swp
*.swo

# go work files
go.work
go.work.sum
9 changes: 9 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ output:

# all available settings of specific linters
linters-settings:
wsl:
allow-cuddle-with-calls: ["Lock", "RLock", "defer"]
funlen:
lines: 80
statements: 60
varnamelen:
# The longest distance, in source lines, that is being considered a "small scope".
# Variables used in at most this many lines will be ignored.
Expand Down Expand Up @@ -184,6 +189,7 @@ linters:
- makezero
- gofumpt
- nlreturn
- thelper

# Can be considered to be enabled
- gochecknoinits
Expand All @@ -197,6 +203,9 @@ linters:
- exhaustruct
- containedctx
- godox
- forcetypeassert
- gci
- lll

issues:
# List of regexps of issue texts to exclude, empty list by default.
Expand Down
2 changes: 1 addition & 1 deletion .richstyle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ skipStyle:
foreground: lightBlack
passPackageStyle:
foreground: green
hide: true
hide: false
failPackageStyle:
bold: true
foreground: "#821515"
Expand Down
5 changes: 3 additions & 2 deletions broker/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
merr "go-micro.dev/v4/errors"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/registry/cache"
"go-micro.dev/v4/transport/headers"
maddr "go-micro.dev/v4/util/addr"
mnet "go-micro.dev/v4/util/net"
mls "go-micro.dev/v4/util/tls"
Expand Down Expand Up @@ -313,7 +314,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

topic := m.Header["Micro-Topic"]
topic := m.Header[headers.Message]
// delete(m.Header, ":topic")

if len(topic) == 0 {
Expand Down Expand Up @@ -517,7 +518,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
m.Header[k] = v
}

m.Header["Micro-Topic"] = topic
m.Header[headers.Message] = topic

// encode the message
b, err := h.opts.Codec.Marshal(m)
Expand Down
6 changes: 5 additions & 1 deletion client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"time"

cache "github.com/patrickmn/go-cache"

"go-micro.dev/v4/metadata"
"go-micro.dev/v4/transport/headers"
)

// NewCache returns an initialized cache.
Expand Down Expand Up @@ -38,6 +40,7 @@ func (c *Cache) List() map[string]string {
items := c.cache.Items()

rsp := make(map[string]string, len(items))

for k, v := range items {
bytes, _ := json.Marshal(v.Object)
rsp[k] = string(bytes)
Expand All @@ -48,7 +51,7 @@ func (c *Cache) List() map[string]string {

// key returns a hash for the context and request.
func key(ctx context.Context, req *Request) string {
ns, _ := metadata.Get(ctx, "Micro-Namespace")
ns, _ := metadata.Get(ctx, headers.Namespace)

bytes, _ := json.Marshal(map[string]interface{}{
"namespace": ns,
Expand All @@ -62,5 +65,6 @@ func key(ctx context.Context, req *Request) string {

h := fnv.New64()
h.Write(bytes)

return fmt.Sprintf("%x", h.Sum(nil))
}
3 changes: 2 additions & 1 deletion client/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"go-micro.dev/v4/metadata"
"go-micro.dev/v4/transport/headers"
)

func TestCache(t *testing.T) {
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestCacheKey(t *testing.T) {
})

t.Run("DifferentMetadata", func(t *testing.T) {
mdCtx := metadata.Set(context.TODO(), "Micro-Namespace", "bar")
mdCtx := metadata.Set(context.TODO(), headers.Namespace, "bar")
key1 := key(mdCtx, &req1)
key2 := key(ctx, &req1)

Expand Down
28 changes: 7 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package client

import (
"context"
"time"

"go-micro.dev/v4/codec"
)

var (
// NewClient returns a new client.
NewClient func(...Option) Client = newRPCClient
// DefaultClient is a default client to use out of the box.
DefaultClient Client = newRPCClient()
)

// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests.
Expand Down Expand Up @@ -102,26 +108,6 @@ type MessageOption func(*MessageOptions)
// RequestOption used by NewRequest.
type RequestOption func(*RequestOptions)

var (
// DefaultClient is a default client to use out of the box.
DefaultClient Client = newRpcClient()
// DefaultBackoff is the default backoff function for retries.
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries.
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried.
DefaultRetries = 1
// DefaultRequestTimeout is the default request timeout.
DefaultRequestTimeout = time.Second * 5
// DefaultPoolSize sets the connection pool size.
DefaultPoolSize = 100
// DefaultPoolTTL sets the connection pool ttl.
DefaultPoolTTL = time.Minute

// NewClient returns a new client.
NewClient func(...Option) Client = newRpcClient
)

// Makes a synchronous call to a service using the default client.
func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.Call(ctx, request, response, opts...)
Expand Down
63 changes: 47 additions & 16 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@ import (
"go-micro.dev/v4/transport"
)

var (
// DefaultBackoff is the default backoff function for retries.
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries.
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried.
DefaultRetries = 5
// DefaultRequestTimeout is the default request timeout.
DefaultRequestTimeout = time.Second * 30
// DefaultConnectionTimeout is the default connection timeout.
DefaultConnectionTimeout = time.Second * 5
// DefaultPoolSize sets the connection pool size.
DefaultPoolSize = 100
// DefaultPoolTTL sets the connection pool ttl.
DefaultPoolTTL = time.Minute
)

// Options are the Client options.
type Options struct {
// Used to select codec
ContentType string
Expand Down Expand Up @@ -47,6 +65,7 @@ type Options struct {
Context context.Context
}

// CallOptions are options used to make calls to a server.
type CallOptions struct {
SelectOptions []selector.SelectOption

Expand All @@ -56,18 +75,23 @@ type CallOptions struct {
Backoff BackoffFunc
// Check if retriable func
Retry RetryFunc
// Transport Dial Timeout
DialTimeout time.Duration
// Number of Call attempts
Retries int
// Request/Response timeout
// Transport Dial Timeout. Used for initial dial to establish a connection.
DialTimeout time.Duration
// ConnectionTimeout of one request to the server.
// Set this lower than the RequestTimeout to enbale retries on connection timeout.
ConnectionTimeout time.Duration
// Request/Response timeout of entire srv.Call, for single request timeout set ConnectionTimeout.
RequestTimeout time.Duration
// Stream timeout for the stream
StreamTimeout time.Duration
// Use the services own auth token
ServiceToken bool
// Duration to cache the response for
CacheExpiry time.Duration
// ConnClose sets the Connection: close header.
ConnClose bool

// Middleware for low level call func
CallWrappers []CallWrapper
Expand Down Expand Up @@ -98,18 +122,20 @@ type RequestOptions struct {
Context context.Context
}

// NewOptions creates new Client options.
func NewOptions(options ...Option) Options {
opts := Options{
Cache: NewCache(),
Context: context.Background(),
ContentType: DefaultContentType,
Codecs: make(map[string]codec.NewCodec),
CallOptions: CallOptions{
Backoff: DefaultBackoff,
Retry: DefaultRetry,
Retries: DefaultRetries,
RequestTimeout: DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout,
Backoff: DefaultBackoff,
Retry: DefaultRetry,
Retries: DefaultRetries,
RequestTimeout: DefaultRequestTimeout,
ConnectionTimeout: DefaultConnectionTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
PoolSize: DefaultPoolSize,
PoolTTL: DefaultPoolTTL,
Expand Down Expand Up @@ -141,7 +167,7 @@ func Codec(contentType string, c codec.NewCodec) Option {
}
}

// Default content type of the client.
// ContentType sets the default content type of the client.
func ContentType(ct string) Option {
return func(o *Options) {
o.ContentType = ct
Expand Down Expand Up @@ -207,8 +233,7 @@ func Backoff(fn BackoffFunc) Option {
}
}

// Number of retries when making the request.
// Should this be a Call Option?
// Retries set the number of retries when making the request.
func Retries(i int) Option {
return func(o *Options) {
o.CallOptions.Retries = i
Expand All @@ -222,8 +247,7 @@ func Retry(fn RetryFunc) Option {
}
}

// The request timeout.
// Should this be a Call Option?
// RequestTimeout set the request timeout.
func RequestTimeout(d time.Duration) Option {
return func(o *Options) {
o.CallOptions.RequestTimeout = d
Expand All @@ -237,7 +261,7 @@ func StreamTimeout(d time.Duration) Option {
}
}

// Transport dial timeout.
// DialTimeout sets the transport dial timeout.
func DialTimeout(d time.Duration) Option {
return func(o *Options) {
o.CallOptions.DialTimeout = d
Expand Down Expand Up @@ -296,8 +320,8 @@ func WithRetry(fn RetryFunc) CallOption {
}
}

// WithRetries is a CallOption which overrides that which
// set in Options.CallOptions.
// WithRetries sets the number of tries for a call.
// This CallOption overrides Options.CallOptions.
func WithRetries(i int) CallOption {
return func(o *CallOptions) {
o.Retries = i
Expand All @@ -312,6 +336,13 @@ func WithRequestTimeout(d time.Duration) CallOption {
}
}

// WithConnClose sets the Connection header to close.
func WithConnClose() CallOption {
return func(o *CallOptions) {
o.ConnClose = true
}
}

// WithStreamTimeout sets the stream timeout.
func WithStreamTimeout(d time.Duration) CallOption {
return func(o *CallOptions) {
Expand Down
5 changes: 3 additions & 2 deletions client/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (
}

switch e.Code {
// retry on timeout or internal server error
case 408, 500:
// Retry on timeout, not on 500 internal server error, as that is a business
// logic error that should be handled by the user.
case 408:
return true, nil
default:
return false, nil
Expand Down
Loading

0 comments on commit a3980c2

Please sign in to comment.