Skip to content

Commit

Permalink
create worker task working
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed Apr 29, 2022
1 parent cdecf3c commit 2336278
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 20 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ run: go.build
$(GO_OUT_DIR)/$(PROJECT_NAME) --debug

dev: $(KIND) $(KUBECTL)
@$(INFO) Creating kind cluster
@$(KIND) create cluster --name=$(PROJECT_NAME)-dev
@$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev
# @$(INFO) Creating kind cluster
# @$(KIND) create cluster --name=$(PROJECT_NAME)-dev
# @$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev
@$(INFO) Installing Crossplane CRDs
@$(KUBECTL) apply -k https://github.com/crossplane/crossplane//cluster?ref=master
@$(INFO) Installing Provider SQL CRDs
Expand Down
26 changes: 24 additions & 2 deletions apis/sample/v1alpha1/mytype_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,38 @@ type MyTypeObservation struct {
ObservableField string `json:"observableField,omitempty"`
}

// KafkaTopics are part of the desired state fields
type KafkaTopics struct {
NumPartitions int16 `json:"numPartitions,omitempty"`
ReplicationFactor int8 `json:"replicationFactor,omitempty"`
}

// A MyTypeSpec defines the desired state of a MyType.
type MyTypeSpec struct {
xpv1.ResourceSpec `json:",inline"`
ForProvider MyTypeParameters `json:"forProvider"`
xpv1.ResourceSpec `json:",inline"`
Class string `json:"class,omitempty"`
DurationMs int64 `json:"durationMs,omitempty"`
ProducerNode string `json:"producerNode,omitempty"`
ClientNode string `json:"clientNode,omitempty"`
BootstrapServers string `json:"bootstrapServers,omitempty"`
TargetMessagesPerSec int32 `json:"targetMessagesPerSec,omitempty"`
MaxMessages int64 `json:"maxMessages,omitempty"`
ActiveTopics map[string]KafkaTopics `json:"activeTopics,omitempty"`
InactiveTopics map[string]KafkaTopics `json:"inactiveTopics,omitempty"`
ProducerConf map[string]string `json:"producerConf,omitempty"`
ConsumerConf map[string]string `json:"consumerConf,omitempty"`
CommonClientConf map[string]string `json:"commonClientConf,omitempty"`
AdminClientConf map[string]string `json:"adminClientConf,omitempty"`
ForProvider MyTypeParameters `json:"forProvider"`
}

// A MyTypeStatus represents the observed state of a MyType.
type MyTypeStatus struct {
xpv1.ResourceStatus `json:",inline"`
AtProvider MyTypeObservation `json:"atProvider,omitempty"`
TaskStatus string `json:"taskStatus,omitempty"`
TaskId string `json:"taskId,omitempty"`
WorkerId uint64 `json:"workerId,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
30 changes: 30 additions & 0 deletions apis/sample/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions examples/provider/config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
apiVersion: v1
kind: Secret
metadata:
namespace: crossplane-system
name: example-provider-secret
type: Opaque
data:
# credentials: BASE64ENCODED_PROVIDER_CREDS
credentials: dGVzdAo=
---
apiVersion: template.crossplane.io/v1alpha1
kind: ProviderConfig
Expand All @@ -15,6 +14,7 @@ spec:
credentials:
source: Secret
secretRef:
namespace: crossplane-system
namespace: tarasque
name: example-provider-secret
key: credentials
---
2 changes: 1 addition & 1 deletion examples/sample/mytype.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ spec:
forProvider:
configurableField: test
providerConfigRef:
name: example
name: example
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ go 1.16
require (
github.com/crossplane/crossplane-runtime v0.15.0
github.com/crossplane/crossplane-tools v0.0.0-20210320162312-1baca298c527
github.com/go-resty/resty/v2 v2.7.0
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.1.2
github.com/jarcoal/httpmock v1.1.0
github.com/pkg/errors v0.9.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/apimachinery v0.21.3
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+MYsct2VUrAJ4=
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobuffalo/flect v0.1.5/go.mod h1:W3K3X9ksuZfir8f/LrfVtWmCDQFfayuylOJ7sz/Fj80=
Expand Down Expand Up @@ -360,6 +362,8 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jarcoal/httpmock v1.1.0 h1:F47ChZj1Y2zFsCXxNkBPwNNKnAyOATcdQibk0qEdVCE=
github.com/jarcoal/httpmock v1.1.0/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
Expand Down Expand Up @@ -700,8 +704,9 @@ golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb h1:pirldcYWx7rx7kE5r+9WsOXPXK0+WH5+uZ7uPmJ44uM=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1106,7 +1111,6 @@ sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:w
sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca h1:6dsH6AYQWbyZmtttJNe8Gq1cXOeS1BdV3eW37zHilAQ=
sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.1.0/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw=
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 h1:Hr/htKFmJEbtMgS/UD0N+gtgctAqz81t3nu+sPzynno=
sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
Expand Down
67 changes: 60 additions & 7 deletions internal/controller/mytype/mytype.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@ package mytype

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"os"
"reflect"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
Expand All @@ -35,6 +42,7 @@ import (

"github.com/crossplane/provider-template/apis/sample/v1alpha1"
apisv1alpha1 "github.com/crossplane/provider-template/apis/v1alpha1"
resty "github.com/go-resty/resty/v2"
)

const (
Expand All @@ -43,14 +51,24 @@ const (
errGetPC = "cannot get ProviderConfig"
errGetCreds = "cannot get credentials"

errNewClient = "cannot create new Service"
errNewClient = "cannot create new Service"
errNewTask = "cannot create new Task"
defaultAgentServiceUrl = "https://tarasque-agent.tarasque.svc.cluster.local"
)

func getEnvOrDefault(key, fallback string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return fallback
}

// A NoOpService does nothing.
type NoOpService struct{}

var (
newNoOpService = func(_ []byte) (interface{}, error) { return &NoOpService{}, nil }
newNoOpService = func(_ []byte) (*resty.Client, error) { return resty.New(), nil }
agentServiceUrl = getEnvOrDefault("SERVICE_URL", defaultAgentServiceUrl)
)

// Setup adds a controller that reconciles MyType managed resources.
Expand All @@ -77,12 +95,24 @@ func Setup(mgr ctrl.Manager, l logging.Logger, rl workqueue.RateLimiter) error {
Complete(r)
}

type WorkerTaskSpec struct {
v1alpha1.MyTypeSpec
StartMs int64 `json:"startMs,omitempty"`
}

// KafkaTopics are part of the desired state fields
type WorkerTask struct {
TaskId string `json:"taskId,omitempty"`
WorkerId int64 `json:"workerId,omitempty"`
Spec WorkerTaskSpec `json:"spec,omitempty"`
}

// A connector is expected to produce an ExternalClient when its Connect method
// is called.
type connector struct {
kube client.Client
usage resource.Tracker
newServiceFn func(creds []byte) (interface{}, error)
newServiceFn func(creds []byte) (*resty.Client, error)
}

// Connect typically produces an ExternalClient by:
Expand Down Expand Up @@ -124,7 +154,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
type external struct {
// A 'client' used to connect to the external resource API. In practice this
// would be something like an AWS SDK client.
service interface{}
service *resty.Client
}

func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) {
Expand All @@ -134,13 +164,13 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}

// These fmt statements should be removed in the real implementation.
fmt.Printf("Observing: %+v", cr)
fmt.Printf("Observing: %+v \n", cr)

return managed.ExternalObservation{
// Return false when the external resource does not exist. This lets
// the managed resource reconciler know that it needs to call Create to
// (re)create the resource, or that it has successfully been deleted.
ResourceExists: true,
ResourceExists: cr.Status.TaskId != "",

// Return false when the external resource exists, but it not up to date
// with the desired managed resource state. This lets the managed
Expand All @@ -159,7 +189,30 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
return managed.ExternalCreation{}, errors.New(errNotMyType)
}

fmt.Printf("Creating: %+v", cr)
payload := WorkerTask{Spec: WorkerTaskSpec{cr.Spec, time.Now().UnixMilli()}, WorkerId: rand.Int63(), TaskId: uuid.New().String()}
body, err := json.Marshal(payload)
if err != nil {
return managed.ExternalCreation{}, err
}

mBody := make(map[string]interface{}, reflect.ValueOf(payload).NumField())
if err := json.Unmarshal(body, &mBody); err != nil {
return managed.ExternalCreation{}, err
}
spec := mBody["spec"].(map[string]interface{})
delete(spec, "providerConfigRef")
delete(spec, "forProvider")
delete(spec, "deletionPolicy")
fmt.Printf("Creating: %+v \n", string(body))
resp, err := c.service.NewRequest().
SetHeader("Accept", "application/json").
SetHeader("Content-Type", "application/json").
SetBody(mBody).Post(agentServiceUrl + "/agent/worker/create")

fmt.Printf("Response: %v \n", string(resp.Body()))
if resp.StatusCode() != http.StatusOK || err != nil {
return managed.ExternalCreation{}, err
}

return managed.ExternalCreation{
// Optionally return any details that may be required to connect to the
Expand Down
Loading

0 comments on commit 2336278

Please sign in to comment.