Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions tests/certification/embedded/components.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package embedded

import (
"github.com/dapr/dapr/pkg/runtime"
"github.com/dapr/kit/logger"

// Name resolutions.
nr "github.com/dapr/components-contrib/nameresolution"
nr_consul "github.com/dapr/components-contrib/nameresolution/consul"
nr_kubernetes "github.com/dapr/components-contrib/nameresolution/kubernetes"
nr_mdns "github.com/dapr/components-contrib/nameresolution/mdns"

nr_loader "github.com/dapr/dapr/pkg/components/nameresolution"
)

func CommonComponents(log logger.Logger) []runtime.Option {
return []runtime.Option{
runtime.WithNameResolutions(
nr_loader.New("mdns", func() nr.Resolver {
return nr_mdns.NewResolver(log)
}),
nr_loader.New("kubernetes", func() nr.Resolver {
return nr_kubernetes.NewResolver(log)
}),
nr_loader.New("consul", func() nr.Resolver {
return nr_consul.NewResolver(log)
}),
),
}
}
155 changes: 155 additions & 0 deletions tests/certification/embedded/embedded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package embedded

import (
"fmt"
"os"

"github.com/dapr/dapr/pkg/acl"
global_config "github.com/dapr/dapr/pkg/config"
env "github.com/dapr/dapr/pkg/config/env"
"github.com/dapr/dapr/pkg/cors"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/operator/client"
"github.com/dapr/dapr/pkg/runtime"
"github.com/dapr/dapr/pkg/runtime/security"
"github.com/dapr/kit/logger"
)

const (
placementAddresses = "127.0.0.1"
controlPlaneAddress = ""
allowedOrigins = cors.DefaultAllowedOrigins
mode = modes.StandaloneMode
config = "config.yaml"
componentsPath = "./components"
profilePort = runtime.DefaultProfilePort
enableProfiling = true
maxConcurrency = -1
enableMTLS = false
sentryAddress = ""
appSSL = false
maxRequestBodySize = 4

daprHTTPPort = runtime.DefaultDaprHTTPPort
daprAPIGRPCPort = runtime.DefaultDaprAPIGRPCPort
daprInternalGRPC = runtime.DefaultDaprAPIGRPCPort + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not have a constant value for internal grpc port?

/cc @artursouza @yaron2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is constant. I don't understand the question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he is asking why there is not a constant for the internal port in dapr/dapr's runtime package. That struck me as strange also. Is the port dynamically selected (which would since)?

appPort = 8000
)

var log = logger.NewLogger("dapr.runtime")

type Option func(config *runtime.Config)

func WithAppProtocol(protocol runtime.Protocol, port int) Option {
return func(config *runtime.Config) {
config.ApplicationProtocol = protocol
config.ApplicationPort = port
}
}

func WithDaprHTTPPort(port int) Option {
return func(config *runtime.Config) {
config.HTTPPort = port
}
}

func WithDaprGRPCPort(port int) Option {
return func(config *runtime.Config) {
config.APIGRPCPort = port
}
}

func WithDaprInternalGRPCPort(port int) Option {
return func(config *runtime.Config) {
config.InternalGRPCPort = port
}
}

func WithListenAddresses(addresses []string) Option {
return func(config *runtime.Config) {
config.APIListenAddresses = addresses
}
}

func NewRuntime(appID string, opts ...Option) (*runtime.DaprRuntime, error) {
var err error

runtimeConfig := runtime.NewRuntimeConfig(
appID, []string{}, controlPlaneAddress,
allowedOrigins, config, componentsPath, string(runtime.HTTPProtocol), string(mode),
daprHTTPPort, daprInternalGRPC, daprAPIGRPCPort, []string{"127.0.0.1"}, nil, appPort, profilePort,
enableProfiling, maxConcurrency, enableMTLS, sentryAddress, appSSL, maxRequestBodySize, "",
runtime.DefaultReadBufferSize, false)

for _, opt := range opts {
opt(runtimeConfig)
}

variables := map[string]string{
env.AppID: runtimeConfig.ID,
env.AppPort: fmt.Sprintf("%d", runtimeConfig.ApplicationPort),
env.HostAddress: "127.0.0.1",
env.DaprPort: fmt.Sprintf("%d", runtimeConfig.InternalGRPCPort),
env.DaprGRPCPort: fmt.Sprintf("%d", runtimeConfig.APIGRPCPort),
env.DaprHTTPPort: fmt.Sprintf("%d", runtimeConfig.HTTPPort),
env.DaprProfilePort: fmt.Sprintf("%d", runtimeConfig.ProfilePort),
}

for key, value := range variables {
err := os.Setenv(key, value)
if err != nil {
return nil, err
}
}

var globalConfig *global_config.Configuration
var configErr error

if enableMTLS {
if runtimeConfig.CertChain, err = security.GetCertChain(); err != nil {
return nil, err
}
}

var accessControlList *global_config.AccessControlList
var namespace string

if config != "" {
switch modes.DaprMode(mode) {
case modes.KubernetesMode:
client, conn, clientErr := client.GetOperatorClient(controlPlaneAddress, security.TLSServerName, runtimeConfig.CertChain)
if clientErr != nil {
return nil, err
}
defer conn.Close()
namespace = os.Getenv("NAMESPACE")
globalConfig, configErr = global_config.LoadKubernetesConfiguration(config, namespace, client)
case modes.StandaloneMode:
globalConfig, _, configErr = global_config.LoadStandaloneConfiguration(config)
}

if configErr != nil {
log.Debugf("Config error: %v", configErr)
}
}

if configErr != nil {
return nil, fmt.Errorf("error loading configuration: %w", configErr)
}
if globalConfig == nil {
log.Info("loading default configuration")
globalConfig = global_config.LoadDefaultConfiguration()
}

accessControlList, err = acl.ParseAccessControlSpec(globalConfig.Spec.AccessControlSpec, string(runtimeConfig.ApplicationProtocol))
if err != nil {
return nil, err
}

return runtime.NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil
}
80 changes: 80 additions & 0 deletions tests/certification/flow/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package app

import (
"log"
"net/http"

"github.com/dapr/go-sdk/service/common"

daprd "github.com/dapr/go-sdk/service/http"

"github.com/dapr/components-contrib/tests/certification/flow"
)

type App struct {
appName string
address string
setup SetupFn
}

type SetupFn func(flow.Context, common.Service) error

func Run(appName, address string, setup SetupFn) (string, flow.Runnable, flow.Runnable) {
return New(appName, address, setup).ToStep()
}

func New(appName, address string, setup SetupFn) App {
return App{
appName: appName,
address: address,
setup: setup,
}
}

func (a App) AppName() string {
return a.appName
}

func (a App) ToStep() (string, flow.Runnable, flow.Runnable) {
return a.appName, a.Start, a.Stop
}

func Start(appName, address string, setup SetupFn) flow.Runnable {
return New(appName, address, setup).Start
}

func (a App) Start(ctx flow.Context) error {
s := daprd.NewService(a.address)

if err := a.setup(ctx, s); err != nil {
return err
}

go func() {
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Printf("error listenning: %v", err)
}
}()

ctx.Set(a.appName, s)

return nil
}

func Stop(appName string) flow.Runnable {
return App{appName: appName}.Stop
}

func (a App) Stop(ctx flow.Context) error {
var s common.Service
if ctx.Get(a.appName, &s) {
return s.Stop()
}

return nil
}
89 changes: 89 additions & 0 deletions tests/certification/flow/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation and Dapr Contributors.
// Licensed under the MIT License.
// ------------------------------------------------------------

package flow

import (
"context"
"testing"
"time"
)

type Context struct {
name string
context.Context
*testing.T
*Flow
}

func (c Context) Name() string {
return c.name
}
func (c Context) Deadline() (deadline time.Time, ok bool) {
return c.Context.Deadline()
}
func (c Context) Done() <-chan struct{} {
return c.Context.Done()
}
func (c Context) Err() error {
return c.Context.Err()
}
func (c Context) Value(key interface{}) interface{} {
return c.Context.Value(key)
}
func (c Context) MustGet(args ...interface{}) {
if len(args)%2 != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to comment magic number here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be explained in the forthcoming Go docs.

c.Fatal("invalid number of arguments passed to Get")
}

for i := 0; i < len(args); i += 2 {
varName, ok := args[i].(string)
if !ok {
c.Fatalf("argument %d is not a string", i)
}

c.varsMu.RLock()
variable, ok := c.variables[varName]
c.varsMu.RUnlock()
if !ok {
c.Fatalf("could not find variable %q", varName)
}
if !as(variable, args[i+1]) {
c.Fatalf("could not resolve variable %q", varName)
}
}
}

func (c Context) Get(args ...interface{}) bool {
if len(args)%2 != 0 {
c.Fatal("invalid number of arguments passed to Get")
}

for i := 0; i < len(args); i += 2 {
varName, ok := args[i].(string)
if !ok {
c.Fatalf("argument %d is not a string", i)
}

c.varsMu.RLock()
variable, ok := c.variables[varName]
c.varsMu.RUnlock()
if !ok {
return false
}
if !as(variable, args[i+1]) {
c.Fatalf("could not resolve variable %q", varName)
}
}

return true
}

func (c Context) Set(varName string, value interface{}) {
c.varsMu.Lock()
defer c.varsMu.Unlock()

c.variables[varName] = value
}
Loading