Skip to content

Commit

Permalink
initial prowjob informer framework
Browse files Browse the repository at this point in the history
  • Loading branch information
krzyzacy committed Aug 21, 2018
1 parent b79279f commit 8e20118
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 0 deletions.
2 changes: 2 additions & 0 deletions prow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ filegroup(
"//prow/cmd/build:all-srcs",
"//prow/cmd/checkconfig:all-srcs",
"//prow/cmd/clonerefs:all-srcs",
"//prow/cmd/crier:all-srcs",
"//prow/cmd/deck:all-srcs",
"//prow/cmd/entrypoint:all-srcs",
"//prow/cmd/gcsupload:all-srcs",
Expand All @@ -86,6 +87,7 @@ filegroup(
"//prow/cmd/tot:all-srcs",
"//prow/commentpruner:all-srcs",
"//prow/config:all-srcs",
"//prow/crier:all-srcs",
"//prow/cron:all-srcs",
"//prow/deck/jobs:all-srcs",
"//prow/entrypoint:all-srcs",
Expand Down
9 changes: 9 additions & 0 deletions prow/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ARTIFACT_UPLOADER_VERSION ?= $(TAG)
NEEDS_REBASE_VERSION ?= $(TAG)
# CHECKCONFIG_VERSION is the version of the checkconfig image
CHECKCONFIG_VERSION ?= $(TAG)
# CRIER_VERSION is the version of the crier image
CRIER_VERSION ?= $(TAG)

# These are the usual GKE variables.
PROJECT ?= k8s-prow
Expand Down Expand Up @@ -269,3 +271,10 @@ checkconfig-image: alpine-image
$(PUSH) "$(REGISTRY)/$(PROJECT)/checkconfig:$(CHECKCONFIG_VERSION)"

.PHONY: checkconfig-image

crier-image: alpine-image
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o cmd/crier/crier k8s.io/test-infra/prow/cmd/crier
docker build -t "$(REGISTRY)/$(PROJECT)/crier:$(CRIER_VERSION)" $(DOCKER_LABELS) cmd/crier
$(PUSH) "$(REGISTRY)/$(PROJECT)/crier:$(CRIER_VERSION)"

.PHONY: crier-image
4 changes: 4 additions & 0 deletions prow/apis/prowjobs/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ type ProwJobStatus struct {
// identifier that Jenkins gave to the build for this
// ProwJob.
JenkinsBuildID string `json:"jenkins_build_id,omitempty"`

// PrevReportState stores the previous reported prowjob state
// So crier won't make duplicated report attempt
PrevReportState ProwJobState `json:"prev_report_state, omitempty"`
}

// Complete returns true if the prow job has finished
Expand Down
40 changes: 40 additions & 0 deletions prow/cmd/crier/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")

go_library(
name = "go_default_library",
srcs = ["main.go"],
importpath = "k8s.io/test-infra/prow/cmd/crier",
visibility = ["//visibility:private"],
deps = [
"//prow/client/clientset/versioned:go_default_library",
"//prow/client/informers/externalversions:go_default_library",
"//prow/crier:go_default_library",
"//prow/logrusutil:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/golang.org/x/time/rate:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

go_binary(
name = "crier",
embed = [":go_default_library"],
visibility = ["//visibility:public"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
19 changes: 19 additions & 0 deletions prow/cmd/crier/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2018 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM gcr.io/k8s-prow/git:0.1
LABEL maintainer="senlu@google.com"

COPY crier /crier
ENTRYPOINT ["/crier"]
132 changes: 132 additions & 0 deletions prow/cmd/crier/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

prowjobclientset "k8s.io/test-infra/prow/client/clientset/versioned"
prowjobinformer "k8s.io/test-infra/prow/client/informers/externalversions"

"k8s.io/test-infra/prow/crier"
"k8s.io/test-infra/prow/logrusutil"
)

const (
resync = 0 * time.Second
)

// TODO(krzyzacy): copy & paste, refactor this
// loadClusterConfig loads connection configuration
// for the cluster we're deploying to. We prefer to
// use in-cluster configuration if possible, but will
// fall back to using default rules otherwise.
func loadClusterConfig() (*rest.Config, error) {
clusterConfig, err := rest.InClusterConfig()
if err == nil {
return clusterConfig, nil
}

credentials, err := clientcmd.NewDefaultClientConfigLoadingRules().Load()
if err != nil {
return nil, fmt.Errorf("could not load credentials from config: %v", err)
}

clusterConfig, err = clientcmd.NewDefaultClientConfig(*credentials, &clientcmd.ConfigOverrides{}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("could not load client configuration: %v", err)
}
return clusterConfig, nil
}

// TODO(krzyzacy): move this to kube???
// getKubernetesClient retrieves the Kubernetes cluster
// client from within the cluster
func getKubernetesClient() (kubernetes.Interface, prowjobclientset.Interface) {
config, err := loadClusterConfig()
if err != nil {
logrus.Fatalf("failed to load cluster config: %v", err)
}

// generate the client based off of the config
client, err := kubernetes.NewForConfig(config)
if err != nil {
logrus.Fatalf("getClusterConfig: %v", err)
}

prowjobClient, err := prowjobclientset.NewForConfig(config)
if err != nil {
logrus.Fatalf("getClusterConfig: %v", err)
}

logrus.Info("Successfully constructed k8s client")
return client, prowjobClient
}

func main() {
logrus.SetFormatter(
logrusutil.NewDefaultFieldsFormatter(nil, logrus.Fields{"component": "crier"}),
)

// get the Kubernetes client for connectivity
client, prowjobClient := getKubernetesClient()

prowjobInformerFactory := prowjobinformer.NewSharedInformerFactory(prowjobClient, resync)

// create a new queue so that when the informer gets a resource that is either
// a result of listing or watching, we can add an idenfitying key to the queue
// so that it can be handled in the handler
queue := workqueue.NewRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, 60*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
))

// construct the Controller object which has all of the necessary components to
// handle logging, connections, informing (listing and watching), the queue,
// and the handler
controller := crier.NewController(client, queue, prowjobInformerFactory.Prow().V1().ProwJobs())

// use a channel to synchronize the finalization for a graceful shutdown
stopCh := make(chan struct{})
defer close(stopCh)

// run the controller loop to process items
prowjobInformerFactory.Start(stopCh)
go controller.Run(stopCh)

// use a channel to handle OS signals to terminate and gracefully shut
// down processing
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM)
signal.Notify(sigTerm, syscall.SIGINT)

// TODO(krzyzacy) : wait till all worker to finish first
<-sigTerm
}
32 changes: 32 additions & 0 deletions prow/crier/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["controller.go"],
importpath = "k8s.io/test-infra/prow/crier",
visibility = ["//visibility:public"],
deps = [
"//prow/client/informers/externalversions/prowjobs/v1:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
Loading

0 comments on commit 8e20118

Please sign in to comment.