Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tfevent collector #792

Merged
merged 2 commits into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/metricscollector/v1alpha3/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ import (
filemc "github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/file-metricscollector"
)

var metricsFileName = flag.String("f", "", "Metrics File Name")
var metricsFileName = flag.String("path", "", "Metrics File Path")
var trialName = flag.String("t", "", "Trial Name")
var managerService = flag.String("m", "", "Katib Manager service")
var metricNames = flag.String("mn", "", "Metric names")
var managerService = flag.String("s", "", "Katib Manager service")
var metricNames = flag.String("m", "", "Metric names")
var pollInterval = flag.Duration("p", common.DefaultPollInterval, "Poll interval to check if main process of worker container exit")
var timeout = flag.Duration("timeout", common.DefaultTimeout, "Timeout to check if main process of worker container exit")
var waitAll = flag.Bool("w", common.DefaultWaitAll, "Whether wait for all other main process of container exiting")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM tensorflow/tensorflow:1.11.0
RUN pip install rfc3339 grpcio googleapis-common-protos
ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/tfevent-metricscollector/v1alpha3
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/tfevent-metricscollector/
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/metricscollector/v1alpha3/tfevent-metricscollector/
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/tfevent-metricscollector/:/usr/src/app/github.com/kubeflow/katib/pkg/metricscollector/v1alpha3/common/
ENTRYPOINT ["python", "main.py"]
16 changes: 11 additions & 5 deletions cmd/metricscollector/v1alpha3/tfevent-metricscollector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import argparse
import api_pb2
import api_pb2_grpc
from pns import WaitOtherMainProcesses
from tfevent_loader import MetricsCollector
from logging import getLogger, StreamHandler, INFO

Expand All @@ -10,10 +11,9 @@ def parse_options():
description='TF-Event MetricsCollector',
add_help = True
)
parser.add_argument("-a", "--manager_addr", type = str, default = "katib-manager")
parser.add_argument("-p", "--manager_port", type = int, default = 6789 )
parser.add_argument("-s", "--manager_server_addr", type = str, default = "katib-manager:6789")
parser.add_argument("-t", "--trial_name", type = str, default = "")
parser.add_argument("-d", "--log_dir", type = str, default = "/log")
parser.add_argument("-path", "--dir_path", type = str, default = "/log")
parser.add_argument("-m", "--metric_names", type = str, default = "")
opt = parser.parse_args()
return opt
Expand All @@ -26,11 +26,17 @@ def parse_options():
logger.addHandler(handler)
logger.propagate = False
opt = parse_options()
manager_server = opt.manager_server_addr.split(':')
if len(manager_server) != 2:
raise Exception("Invalid katib manager service address: %s" % opt.manager_server_addr)

WaitOtherMainProcesses()

mc = MetricsCollector(opt.metric_names.split(','))
observation_log = mc.parse_file(opt.log_dir)
observation_log = mc.parse_file(opt.dir_path)

channel = grpc.beta.implementations.insecure_channel(manager_server[0], int(manager_server[1]))

channel = grpc.beta.implementations.insecure_channel(opt.manager_addr, opt.manager_port)
with api_pb2.beta_create_Manager_stub(channel) as client:
logger.info("In " + opt.trial_name + " " + str(len(observation_log.metric_logs)) + " metrics will be reported.")
client.ReportObservationLog(api_pb2.ReportObservationLogRequest(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
psutil==5.2.2
2 changes: 1 addition & 1 deletion cmd/tfevent-metricscollector/v1alpha2/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ def parse_options():
client.ReportObservationLog(api_pb2.ReportObservationLogRequest(
trial_name=opt.trial_name,
observation_log=observation_log
), 10)
), 10)
18 changes: 9 additions & 9 deletions examples/v1alpha3/tfjob-example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@ spec:
goal: 0.99
objectiveMetricName: accuracy_1
algorithm:
algorithmName: random
algorithmName: hyperopt-random
metricsCollectorSpec:
source:
fileSystemPath:
path: /train
kind: Directory
collector:
kind: TensorFlowEvent
parameters:
- name: --learning_rate
parameterType: double
Expand Down Expand Up @@ -47,16 +54,9 @@ spec:
command:
- "python"
- "/var/tf_mnist/mnist_with_summaries.py"
- "--log_dir=/train/{{.Trial}}"
- "--log_dir=/train/metrics"
{{- with .HyperParameters}}
{{- range .}}
- "{{.Name}}={{.Value}}"
{{- end}}
{{- end}}
volumeMounts:
- mountPath: "/train"
name: "train"
volumes:
- name: "train"
persistentVolumeClaim:
claimName: "tfevent-volume"
Empty file.
39 changes: 39 additions & 0 deletions pkg/metricscollector/v1alpha3/common/pns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
import psutil
import time

def GetOtherMainProcesses():
this_pid = psutil.Process().pid
pids = set()
for proc in psutil.process_iter():
pid = proc.pid
ppid = proc.ppid()
if pid == 1 or pid == this_pid or ppid != 0:
# ignore the pause container, our own pid, and non-root processes
continue
pids.add(pid)
return pids

def WaitPIDs(pids, poll_interval_seconds=1, timeout_seconds=0, is_wait_all=False):
start = 0
pids = set(pids)
if poll_interval_seconds <= 0:
raise Exception("Poll interval seconds must be a positive integer")
while (timeout_seconds <= 0 or start < timeout_seconds) and len(pids) > 0:
stop_pids = set()
for pid in pids:
path = "/proc/%d" % pid
if os.path.isdir(path):
continue
else:
if is_wait_all:
stop_pids.add(pid)
else:
return
if is_wait_all:
pids = pids - stop_pids
time.sleep(poll_interval_seconds)
start = start + poll_interval_seconds

def WaitOtherMainProcesses(poll_interval_seconds=1, timeout_seconds=0, is_wait_all=False):
return WaitPIDs(GetOtherMainProcesses(), poll_interval_seconds, timeout_seconds, is_wait_all)
24 changes: 14 additions & 10 deletions pkg/webhook/v1alpha3/pod/inject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error)
mutatedPod.Spec.ServiceAccountName = pod.Spec.ServiceAccountName
mutatedPod.Spec.ShareProcessNamespace = pointer.BoolPtr(true)

if mountFile := getMountFile(trial.Spec.MetricsCollector); mountFile != "" {
wrapWorkerContainer(mutatedPod, kind, mountFile, trial.Spec.MetricsCollector)
if err = mutateVolume(mutatedPod, kind, mountFile); err != nil {
if mountPath := getMountPath(trial.Spec.MetricsCollector); mountPath != "" {
wrapWorkerContainer(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector)
if err = mutateVolume(mutatedPod, kind, mountPath, trial.Spec.MetricsCollector.Source.FileSystemPath.Kind); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -203,17 +203,17 @@ func (s *sidecarInjector) getMetricsCollectorImage(cKind common.CollectorKind) (
}

func getMetricsCollectorArgs(trialName, metricName string, mc trialsv1alpha3.MetricsCollectorSpec) []string {
args := []string{"-t", trialName, "-m", katibmanagerv1alpha3.GetManagerAddr(), "-mn", metricName}
if mountFile := getMountFile(mc); mountFile != "" {
args = append(args, "-f", mountFile)
args := []string{"-t", trialName, "-m", metricName, "-s", katibmanagerv1alpha3.GetManagerAddr()}
if mountPath := getMountPath(mc); mountPath != "" {
args = append(args, "-path", mountPath)
}
return args
}

func getMountFile(mc trialsv1alpha3.MetricsCollectorSpec) string {
func getMountPath(mc trialsv1alpha3.MetricsCollectorSpec) string {
if mc.Collector.Kind == common.StdOutCollector {
return common.DefaultFilePath
} else if mc.Collector.Kind == common.FileCollector {
} else if mc.Collector.Kind == common.FileCollector || mc.Collector.Kind == common.TfEventCollector {
return mc.Source.FileSystemPath.Path
} else {
return ""
Expand Down Expand Up @@ -273,16 +273,20 @@ func isWorkerContainer(jobKind string, index int, c v1.Container) bool {
return false
}

func mutateVolume(pod *v1.Pod, jobKind, mountFile string) error {
func mutateVolume(pod *v1.Pod, jobKind, mountPath string, pathKind common.FileSystemKind) error {
metricsVol := v1.Volume{
Name: common.MetricsVolume,
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
}
dir := mountPath
if pathKind == common.FileKind {
dir = filepath.Dir(mountPath)
}
vm := v1.VolumeMount{
Name: metricsVol.Name,
MountPath: filepath.Dir(mountFile),
MountPath: dir,
}
index_list := []int{}
for i, c := range pod.Spec.Containers {
Expand Down