Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement file metrics collector #783

Merged
merged 1 commit into from
Sep 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/metricscollector/v1alpha3/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,8 @@ import (
filemc "github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/file-metricscollector"
)

var experimentName = flag.String("e", "", "Experiment Name")
var metricsFileName = flag.String("f", "", "Metrics File Name")
var trialName = flag.String("t", "", "Trial Name")
var jobKind = flag.String("k", "", "Job Kind")
var namespace = flag.String("n", "", "NameSpace")
var managerService = flag.String("m", "", "Katib Manager service")
var metricNames = flag.String("mn", "", "Metric names")
var pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval to check if main process of worker container exit")
Expand All @@ -62,7 +60,7 @@ var waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other

func main() {
flag.Parse()
klog.Infof("Experiment Name: %s, Trial Name: %s, Job Kind: %s", *experimentName, *trialName, *jobKind)
klog.Infof("Trial Name: %s", *trialName)

wopts := common.WaitPidsOpts{
PollInterval: *pollInterval,
Expand All @@ -84,7 +82,7 @@ func main() {
klog.Fatalf("Failed to create MetricsCollector: %v", err)
}
ctx := context.Background()
olog, err := mc.CollectObservationLog(*trialName, *jobKind, strings.Split(*metricNames, ";"), *namespace)
olog, err := mc.CollectObservationLog(*metricsFileName, strings.Split(*metricNames, ";"))
if err != nil {
klog.Fatalf("Failed to collect logs: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion examples/v1alpha3/hyperband-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ spec:
spec:
template:
spec:
serviceAccountName: metrics-collector
containers:
- name: {{.Trial}}
image: katib/mxnet-mnist-example
Expand Down
1 change: 0 additions & 1 deletion examples/v1alpha3/hyperopt-random-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ spec:
spec:
template:
spec:
serviceAccountName: metrics-collector # will be dropped
containers:
- name: {{.Trial}}
image: docker.io/katib/mxnet-mnist-example
Expand Down
1 change: 0 additions & 1 deletion examples/v1alpha3/hyperopt-tpe-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ spec:
spec:
template:
spec:
serviceAccountName: metrics-collector # will be dropped
containers:
- name: {{.Trial}}
image: docker.io/katib/mxnet-mnist-example
Expand Down
2 changes: 0 additions & 2 deletions examples/v1alpha3/pytorchjob-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ spec:
restartPolicy: OnFailure
template:
spec:
serviceAccountName: metrics-collector
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0
Expand All @@ -47,7 +46,6 @@ spec:
restartPolicy: OnFailure
template:
spec:
serviceAccountName: metrics-collector
containers:
- name: pytorch
image: gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0
Expand Down
39 changes: 1 addition & 38 deletions examples/v1alpha3/tfjob-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,41 +59,4 @@ spec:
volumes:
- name: "train"
persistentVolumeClaim:
claimName: "tfevent-volume"
metricsCollectorSpec:
goTemplate:
rawTemplate: |-
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: {{.Trial}}
namespace: {{.NameSpace}}
spec:
schedule: "*/1 * * * *"
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 1
jobTemplate:
spec:
template:
spec:
containers:
- name: {{.Trial}}
image: gcr.io/kubeflow-images-public/katib/v1alpha3/tfevent-metrics-collector
args:
- "python"
- "main.py"
- "-t"
- "{{.Trial}}"
- "-d"
- "/train/{{.Trial}}"
- "-m"
- "accuracy_1"
volumeMounts:
- mountPath: "/train"
name: "train"
volumes:
- name: "train"
persistentVolumeClaim:
claimName: "tfevent-volume"
restartPolicy: Never
serviceAccountName: metrics-collector
claimName: "tfevent-volume"
2 changes: 2 additions & 0 deletions pkg/apis/controller/common/v1alpha3/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ const (
// When model training source code persists metrics into persistent layer
// directly, metricsCollector isn't in need, and its kind is "noneCollector"
NoneCollector CollectorKind = "None"

MetricsVolume = "metrics-volume"
)

// +k8s:deepcopy-gen=true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,98 +1,59 @@
package sidecarmetricscollector

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"time"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
"sigs.k8s.io/controller-runtime/pkg/client/config"

v1alpha3 "github.com/kubeflow/katib/pkg/apis/manager/v1alpha3"
commonv1alpha3 "github.com/kubeflow/katib/pkg/common/v1alpha3"
"github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/common"
"k8s.io/klog"
)

type FileMetricsCollector struct {
clientset *kubernetes.Clientset
}

func NewFileMetricsCollector() (*FileMetricsCollector, error) {
config, err := config.GetConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &FileMetricsCollector{
clientset: clientset,
}, nil

}

// will be dropped, get logs from a file instead of k8s logs api
func getWorkerContainerName(pod apiv1.Pod) string {
for _, c := range pod.Spec.Containers {
if c.Name != common.MetricCollectorContainerName {
return c.Name
}
}
return ""
return &FileMetricsCollector{}, nil
}

func (d *FileMetricsCollector) CollectObservationLog(tId string, jobKind string, metrics []string, namespace string) (*v1alpha3.ObservationLog, error) {
labelMap := commonv1alpha3.GetJobLabelMap(jobKind, tId)
pl, err := d.clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{LabelSelector: labels.Set(labelMap).String(), IncludeUninitialized: true})
func (d *FileMetricsCollector) CollectObservationLog(fileName string, metrics []string) (*v1alpha3.ObservationLog, error) {
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
if len(pl.Items) == 0 {
return nil, fmt.Errorf("No Pods are found in Trial %v", tId)
}
logopt := apiv1.PodLogOptions{Container: getWorkerContainerName(pl.Items[0]), Timestamps: true, Follow: true}
reader, err := d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream()
for err != nil {
klog.Errorf("Retry to get logs, Error: %v", err)
time.Sleep(time.Duration(1) * time.Second)
reader, err = d.clientset.CoreV1().Pods(namespace).GetLogs(pl.Items[0].ObjectMeta.Name, &logopt).Stream()
}
buf := new(bytes.Buffer)
buf.ReadFrom(reader)
logs := buf.String()

olog, err := d.parseLogs(tId, strings.Split(logs, "\n"), metrics)
defer file.Close()
content, err := ioutil.ReadAll(file)
logs := string(content)
olog, err := d.parseLogs(strings.Split(logs, "\n"), metrics)
return olog, err
}

func (d *FileMetricsCollector) parseLogs(tId string, logs []string, metrics []string) (*v1alpha3.ObservationLog, error) {
func (d *FileMetricsCollector) parseLogs(logs []string, metrics []string) (*v1alpha3.ObservationLog, error) {
// TODO(hougangliu): handle custom filter string
var lasterr error
olog := &v1alpha3.ObservationLog{}
mlogs := []*v1alpha3.MetricLog{}
for _, logline := range logs {
if logline == "" {
continue
}
timestamp := time.Time{}.UTC().Format(time.RFC3339)
parseStr := logline
ls := strings.SplitN(logline, " ", 2)
if len(ls) != 2 {
klog.Errorf("Error parsing log: %s", logline)
lasterr = errors.New("Error parsing log")
continue
}
_, err := time.Parse(time.RFC3339Nano, ls[0])
if err != nil {
klog.Errorf("Error parsing time %s: %v", ls[0], err)
lasterr = err
continue
klog.Warningf("Metrics will not have timestamp since %s doesn't begin with timestamp string", logline)
} else {
_, err := time.Parse(time.RFC3339Nano, ls[0])
if err != nil {
klog.Warningf("Metrics will not have timestamp since error parsing time %s: %v", ls[0], err)
} else {
parseStr = ls[1]
timestamp = ls[0]
}
}
kvpairs := strings.Fields(ls[1])

kvpairs := strings.Fields(parseStr)
for _, kv := range kvpairs {
v := strings.Split(kv, "=")
if len(v) > 2 {
Expand All @@ -110,7 +71,6 @@ func (d *FileMetricsCollector) parseLogs(tId string, logs []string, metrics []st
if metricName == "" {
continue
}
timestamp := ls[0]
mlogs = append(mlogs, &v1alpha3.MetricLog{
TimeStamp: timestamp,
Metric: &v1alpha3.Metric{
Expand Down
8 changes: 8 additions & 0 deletions pkg/webhook/v1alpha3/pod/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ const (
MasterRole = "master"
MetricsCollectorSidecar = "metrics-collector-sidecar"
MetricsCollectorSidecarImage = "image"

PyTorchJob = "PyTorchJob"
PyTorchJobWorkerContainerName = "pytorch"

TFJob = "TFJob"
TFJobWorkerContainerName = "tensorflow"

BatchJob = "Job"
)

var JobRoleMap = map[string][]string{
Expand Down
Loading