Skip to content

Commit

Permalink
Fix some linting issues and rename template resources
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed May 19, 2022
1 parent 0132af1 commit fe530dd
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ linters-settings:
goimports:
# put imports beginning with prefix after 3rd-party packages;
# it's a comma-separated list of prefixes
local-prefixes: github.com/crossplane/provider-template
local-prefixes: github.com/nachomdo/tarasque

gocyclo:
# minimal code complexity to report, 30 by default (but we recommend 10-20)
Expand Down
24 changes: 18 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# ====================================================================================
# Setup Project
PROJECT_NAME := provider-template
PROJECT_REPO := github.com/crossplane/$(PROJECT_NAME)
PROJECT_NAME := tarasque
PROJECT_REPO := github.com/nachomdo/$(PROJECT_NAME)

PLATFORMS ?= linux_amd64 linux_arm64
PLATFORMS ?= linux_amd64
-include build/makelib/common.mk

# Setup Output
Expand All @@ -21,11 +21,23 @@ GO111MODULE = on
# Setup Kubernetes tools
-include build/makelib/k8s_tools.mk

# Setup Helm
USE_HELM3 = true
HELM_BASE_URL = https://charts.upbound.io
HELM_CHART_LINT_STRICT = false
HELM_S3_BUCKET = nacho-ccloud-test
HELM_CHARTS = tarasque
HELM_CHART_LINT_ARGS_tarasque = --set nameOverride='',imagePullSecrets=''
-include build/makelib/helm.mk

# Setup Images
DOCKER_REGISTRY ?= crossplane
DOCKER_REGISTRY ?= nachomdo
IMAGES = $(PROJECT_NAME) $(PROJECT_NAME)-controller
-include build/makelib/image.mk

#Local deployment
-include build/makelib/deploy.mk
-include build/makelib/local.mk
fallthrough: submodules
@echo Initial setup complete. Running make again . . .
@make
Expand Down Expand Up @@ -75,9 +87,9 @@ dev: $(KIND) $(KUBECTL)
# @$(KUBECTL) cluster-info --context kind-$(PROJECT_NAME)-dev
@$(INFO) Installing Crossplane CRDs
@$(KUBECTL) apply -k https://github.com/crossplane/crossplane//cluster?ref=master
@$(INFO) Installing Provider SQL CRDs
@$(INFO) Installing Provider Tarasque CRDs
@$(KUBECTL) apply -R -f package/crds
@$(INFO) Starting Provider SQL controllers
@$(INFO) Starting Provider Tarasque controllers
@$(GO) run cmd/provider/main.go --debug

dev-clean: $(KIND) $(KUBECTL)
Expand Down
4 changes: 2 additions & 2 deletions apis/tarasque/tarasque.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package sample contains group Sample API versions
package sample
// Package tarasque contains group Tarasque API versions
package tarasque
4 changes: 2 additions & 2 deletions apis/tarasque/v1alpha1/kafkabench_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
// KafkaBenchObservation are the observable fields of a KafkaBench.
type KafkaBenchObservation struct {
TaskStatus string `json:"taskStatus,omitempty"`
TaskId string `json:"taskId,omitempty"`
WorkerId int64 `json:"workerId,omitempty"`
TaskID string `json:"taskId,omitempty"`
WorkerID int64 `json:"workerId,omitempty"`
ProducerStats ProducerBenchResultStats `json:"producerStats,omitempty"`
ConsumerStats map[string]ConsumerBenchResultStats `json:"consumerStats,omitempty"`
RoundTripStats RoundTripBenchResultStats `json:"roundTripStats,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions apis/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package apis
import (
"k8s.io/apimachinery/pkg/runtime"

samplev1alpha1 "github.com/crossplane/provider-template/apis/tarasque/v1alpha1"
templatev1alpha1 "github.com/crossplane/provider-template/apis/v1alpha1"
samplev1alpha1 "github.com/nachomdo/tarasque/apis/tarasque/v1alpha1"
templatev1alpha1 "github.com/nachomdo/tarasque/apis/v1alpha1"
)

func init() {
Expand Down
4 changes: 2 additions & 2 deletions apis/v1alpha1/groupversion_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

// Package v1alpha1 contains the core resources of the Template provider.
// +kubebuilder:object:generate=true
// +groupName=template.crossplane.io
// +groupName=tarasque.crossplane.io
// +versionName=v1alpha1
package v1alpha1

Expand All @@ -27,7 +27,7 @@ import (

// Package type metadata.
const (
Group = "template.crossplane.io"
Group = "tarasque.crossplane.io"
Version = "v1alpha1"
)

Expand Down
14 changes: 7 additions & 7 deletions cmd/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/ratelimiter"

"github.com/crossplane/provider-template/apis"
"github.com/crossplane/provider-template/internal/controller"
"github.com/nachomdo/tarasque/apis"
"github.com/nachomdo/tarasque/internal/controller"
)

func main() {
var (
app = kingpin.New(filepath.Base(os.Args[0]), "Template support for Crossplane.").DefaultEnvars()
app = kingpin.New(filepath.Base(os.Args[0]), "Tarasque support for Crossplane.").DefaultEnvars()
debug = app.Flag("debug", "Run with debug logging.").Short('d').Bool()
syncPeriod = app.Flag("sync", "Controller manager sync period such as 300ms, 1.5h, or 2h45m").Short('s').Default("1h").Duration()
leaderElection = app.Flag("leader-election", "Use leader election for the controller manager.").Short('l').Default("false").OverrideDefaultFromEnvar("LEADER_ELECTION").Bool()
)
kingpin.MustParse(app.Parse(os.Args[1:]))

zl := zap.New(zap.UseDevMode(*debug))
log := logging.NewLogrLogger(zl.WithName("provider-template"))
log := logging.NewLogrLogger(zl.WithName("provider-tarasque"))
if *debug {
// The controller-runtime runs with a no-op logger by default. It is
// *very* verbose even at info level, so we only provide it a real
Expand All @@ -56,13 +56,13 @@ func main() {

mgr, err := ctrl.NewManager(cfg, ctrl.Options{
LeaderElection: *leaderElection,
LeaderElectionID: "crossplane-leader-election-provider-template",
LeaderElectionID: "crossplane-leader-election-provider-tarasque",
SyncPeriod: syncPeriod,
})
kingpin.FatalIfError(err, "Cannot create controller manager")

rl := ratelimiter.NewDefaultProviderRateLimiter(ratelimiter.DefaultProviderRPS)
kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add Template APIs to scheme")
kingpin.FatalIfError(controller.Setup(mgr, log, rl), "Cannot setup Template controllers")
kingpin.FatalIfError(apis.AddToScheme(mgr.GetScheme()), "Cannot add Tarasque APIs to scheme")
kingpin.FatalIfError(controller.Setup(mgr, log, rl), "Cannot setup Tarasque controllers")
kingpin.FatalIfError(mgr.Start(ctrl.SetupSignalHandler()), "Cannot start controller manager")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/crossplane/provider-template
module github.com/nachomdo/tarasque

go 1.16

Expand Down
2 changes: 1 addition & 1 deletion internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/reconciler/providerconfig"
"github.com/crossplane/crossplane-runtime/pkg/resource"

"github.com/crossplane/provider-template/apis/v1alpha1"
"github.com/nachomdo/tarasque/apis/v1alpha1"
)

// Setup adds a controller that reconciles ProviderConfigs by accounting for
Expand Down
36 changes: 21 additions & 15 deletions internal/controller/kafkabench/agent_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ import (
"reflect"
"time"

"github.com/crossplane/provider-template/apis/tarasque/v1alpha1"
"github.com/go-resty/resty/v2"
"github.com/google/uuid"

"github.com/nachomdo/tarasque/apis/tarasque/v1alpha1"
)

const (
defaultAgentServiceUrl = "https://tarasque-agent.tarasque.svc.cluster.local"
defaultAgentServiceURL = "https://tarasque-agent.tarasque.svc.cluster.local"
)

var (
agentServiceUrl = getEnvOrDefault("SERVICE_URL", defaultAgentServiceUrl)
agentServiceURL = getEnvOrDefault("SERVICE_URL", defaultAgentServiceURL)
sanitizeFields = []string{"providerConfigRef", "forProvider", "deletionPolicy"}
)

Expand Down Expand Up @@ -49,37 +50,39 @@ func sanitizeWorkerTask(wt *WorkerTask) (map[string]interface{}, error) {

if wt.Spec.Class == consumerWorkload {
keys := make([]string, 0, len(wt.Spec.ActiveTopics))
for k, _ := range wt.Spec.ActiveTopics {
for k := range wt.Spec.ActiveTopics {
if k != "" {
keys = append(keys, k)
}
}
wtSpec["activeTopics"] = keys
}
wtMap["workerId"] = wt.WorkerId
wtMap["workerId"] = wt.WorkerID
return wtMap, nil
}

//A service to communicate with the Trogdor Agent REST API
// TrogdorAgentService provides access to the Trogdor Agent REST API
type TrogdorAgentService struct {
client *resty.Client
}

// AgentStatusWorkers represents the worker status as returned by Trogdor Agent API
type AgentStatusWorkers struct {
State string `json:"state,omitempty"`
TaskId string `json:"taskId,omitempty"`
TaskID string `json:"taskId,omitempty"`
StartedMs int64 `json:"startedMs,omitempty"`
DoneMs int64 `json:"doneMs,omitempty"`
Status interface{} `json:"status,omitempty"`
Error string `json:"error,omitempty"`
}

// AgentStatusResponse encapsulates the response from the Trogdor Agent status endpoint
type AgentStatusResponse struct {
ServerStartMs int64 `json:"serverStartMs,omitempty"`
Workers map[string]AgentStatusWorkers `json:"workers,omitempty"`
}

//Create a new instance of Trogdor Service
// NewTrogdorService returns a new instance of Trogdor Service
func NewTrogdorService() *TrogdorAgentService {
return &TrogdorAgentService{
client: resty.New(),
Expand All @@ -92,8 +95,9 @@ func newTrogdorServiceWithRestClient(httpClient *resty.Client) *TrogdorAgentServ
}
}

// CreateWorkerTask initiates a new worker task on Trogdor agents
func (tas *TrogdorAgentService) CreateWorkerTask(spec v1alpha1.KafkaBenchSpec) (*WorkerTask, error) {
payload := WorkerTask{Spec: WorkerTaskSpec{spec, time.Now().UnixMilli()}, WorkerId: rand.Int63(), TaskId: uuid.New().String()}
payload := WorkerTask{Spec: WorkerTaskSpec{spec, time.Now().UnixMilli()}, WorkerID: rand.Int63(), TaskID: uuid.New().String()}

body, err := sanitizeWorkerTask(&payload)
if err != nil {
Expand All @@ -103,7 +107,7 @@ func (tas *TrogdorAgentService) CreateWorkerTask(spec v1alpha1.KafkaBenchSpec) (
resp, err := tas.client.NewRequest().
SetHeader("Accept", "application/json").
SetHeader("Content-Type", "application/json").
SetBody(body).Post(agentServiceUrl + "/agent/worker/create")
SetBody(body).Post(agentServiceURL + "/agent/worker/create")

fmt.Printf("Response: %v \n", string(resp.Body()))
if resp.StatusCode() != http.StatusOK || err != nil {
Expand All @@ -112,10 +116,11 @@ func (tas *TrogdorAgentService) CreateWorkerTask(spec v1alpha1.KafkaBenchSpec) (
return &payload, nil
}

func (tas *TrogdorAgentService) CollectWorkerTaskResult(workerId string) (*AgentStatusWorkers, error) {
// CollectWorkerTaskResult checks the status of a given workerID in Trogdor agents
func (tas *TrogdorAgentService) CollectWorkerTaskResult(workerID string) (*AgentStatusWorkers, error) {
resp, err := tas.client.NewRequest().
SetHeader("Accept", "application/json").
Get(agentServiceUrl + "/agent/status")
Get(agentServiceURL + "/agent/status")

if resp.StatusCode() != http.StatusOK || err != nil {
return nil, err
Expand All @@ -125,18 +130,19 @@ func (tas *TrogdorAgentService) CollectWorkerTaskResult(workerId string) (*Agent
return nil, err
}

workerStatus := agentStatusResponse.Workers[workerId]
workerStatus := agentStatusResponse.Workers[workerID]
if workerStatus.Error != "" {
return nil, errors.New(workerStatus.Error)
}

return &workerStatus, nil
}

func (tas *TrogdorAgentService) DeleteWorkerTask(workerId string) error {
// DeleteWorkerTask removes a given worker in Trogdor agents
func (tas *TrogdorAgentService) DeleteWorkerTask(workerID string) error {
resp, err := tas.client.NewRequest().
SetHeader("Accept", "application/json").
Delete(fmt.Sprintf("%s/agent/worker?workerId=%s", agentServiceUrl, workerId))
Delete(fmt.Sprintf("%s/agent/worker?workerId=%s", agentServiceURL, workerID))

if resp.StatusCode() != http.StatusOK || err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/kafkabench/agent_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@ func TestCollectWorkerTaskResult(t *testing.T) {
client := newTrogdorServiceWithRestClient(httpClient)
httpmock.ActivateNonDefault(httpClient.GetClient())
defer httpmock.DeactivateAndReset()
httpmock.RegisterResponder("GET", agentServiceUrl+"/agent/status",
httpmock.RegisterResponder("GET", agentServiceURL+"/agent/status",
func(req *http.Request) (*http.Response, error) {
statusResponse := AgentStatusResponse{
ServerStartMs: 1000,
Workers: map[string]AgentStatusWorkers{
"task-with-error-no-status": {
State: "DONE",
TaskId: "1",
TaskID: "1",
StartedMs: 1649460862398,
DoneMs: 1649460862431,
Status: nil,
Error: "worker expired",
},
"task-with-status-and-error": {
State: "DONE",
TaskId: "2",
TaskID: "2",
StartedMs: 1649460862398,
DoneMs: 1649460862431,
Status: "Creating 5 topic(s)",
Error: "Unable to create topic(s): mytopic1, mytopic2, mytopic3, mytopic4, mytopic5after 3 attempt(s)",
},
"task-with-results": {
State: "DONE",
TaskId: "3",
TaskID: "3",
StartedMs: 1649460862398,
DoneMs: 1649460862431,
Status: map[string]interface{}{
Expand All @@ -65,7 +65,7 @@ func TestCollectWorkerTaskResult(t *testing.T) {
"task-with-results": {
status: &AgentStatusWorkers{
State: "DONE",
TaskId: "3",
TaskID: "3",
StartedMs: 1649460862398,
DoneMs: 1649460862431,
Status: map[string]interface{}{
Expand Down
Loading

0 comments on commit fe530dd

Please sign in to comment.