Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #951 from obourdon/fix_4_issue_950
Browse files Browse the repository at this point in the history
Add tasks creation and activation when some found in auto_discover_path
  • Loading branch information
pittma authored Jun 22, 2016
2 parents b234694 + dfc1e3c commit 0995570
Show file tree
Hide file tree
Showing 20 changed files with 982 additions and 357 deletions.
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions control/control_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ func (pc *ControlGRPCServer) MatchQueryToNamespaces(ctx context.Context, r *rpc.
return reply, nil
}

func (pc *ControlGRPCServer) GetAutodiscoverPaths(ctx context.Context, _ *common.Empty) (*rpc.GetAutodiscoverPathsReply, error) {
paths := pc.control.GetAutodiscoverPaths()
reply := &rpc.GetAutodiscoverPathsReply{
Paths: paths,
}
return reply, nil
}

//-------- util ---------------

func convertNSS(nss []core.Namespace) []*rpc.ArrString {
Expand Down
90 changes: 90 additions & 0 deletions core/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package core

import (
"errors"
"time"

"github.com/intelsdi-x/snap/pkg/schedule"
)

type Schedule struct {
Type string `json:"type,omitempty"`
Interval string `json:"interval,omitempty"`
StartTimestamp *int64 `json:"start_timestamp,omitempty"`
StopTimestamp *int64 `json:"stop_timestamp,omitempty"`
}

func makeSchedule(s Schedule) (schedule.Schedule, error) {
switch s.Type {
case "simple":
d, err := time.ParseDuration(s.Interval)
if err != nil {
return nil, err
}
sch := schedule.NewSimpleSchedule(d)

err = sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
case "windowed":
d, err := time.ParseDuration(s.Interval)
if err != nil {
return nil, err
}

var start, stop *time.Time
if s.StartTimestamp != nil {
t := time.Unix(*s.StartTimestamp, 0)
start = &t
}
if s.StopTimestamp != nil {
t := time.Unix(*s.StopTimestamp, 0)
stop = &t
}
sch := schedule.NewWindowedSchedule(
d,
start,
stop,
)

err = sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
case "cron":
if s.Interval == "" {
return nil, errors.New("missing cron entry")
}
sch := schedule.NewCronSchedule(s.Interval)

err := sch.Validate()
if err != nil {
return nil, err
}
return sch, nil
default:
return nil, errors.New("unknown schedule type " + s.Type)
}
}
85 changes: 85 additions & 0 deletions core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ limitations under the License.
package core

import (
"encoding/json"
"errors"
"io"
"io/ioutil"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -150,3 +154,84 @@ func SetTaskID(id string) TaskOption {
type TaskErrors interface {
Errors() []serror.SnapError
}

type TaskCreationRequest struct {
Name string `json:"name"`
Deadline string `json:"deadline"`
Workflow *wmap.WorkflowMap `json:"workflow"`
Schedule Schedule `json:"schedule"`
Start bool `json:"start"`
}

// Function used to create a task according to content (1st parameter)
// . Content can be retrieved from a configuration file or a HTTP REST request body
// . Mode is used to specify if the created task should start right away or not
// . function pointer is responsible for effectively creating and returning the created task
func CreateTaskFromContent(body io.ReadCloser,
mode *bool,
fp func(sch schedule.Schedule,
wfMap *wmap.WorkflowMap,
startOnCreate bool,
opts ...TaskOption) (Task, TaskErrors)) (Task, error) {

tr, err := marshalTask(body)
if err != nil {
return nil, err
}

sch, err := makeSchedule(tr.Schedule)
if err != nil {
return nil, err
}

var opts []TaskOption
if tr.Deadline != "" {
dl, err := time.ParseDuration(tr.Deadline)
if err != nil {
return nil, err
}
opts = append(opts, TaskDeadlineDuration(dl))
}

if tr.Name != "" {
opts = append(opts, SetTaskName(tr.Name))
}
opts = append(opts, OptionStopOnFailure(10))

if mode == nil {
mode = &tr.Start
}
if fp == nil {
return nil, errors.New("Missing workflow creation routine")
}
task, errs := fp(sch, tr.Workflow, *mode, opts...)
if errs != nil && len(errs.Errors()) != 0 {
var errMsg string
for _, e := range errs.Errors() {
errMsg = errMsg + e.Error() + " -- "
}
return nil, errors.New(errMsg[:len(errMsg)-4])
}
return task, nil
}

func marshalTask(body io.ReadCloser) (*TaskCreationRequest, error) {
var tr TaskCreationRequest
errCode, err := MarshalBody(&tr, body)
if errCode != 0 && err != nil {
return nil, err
}
return &tr, nil
}

func MarshalBody(in interface{}, body io.ReadCloser) (int, error) {
b, err := ioutil.ReadAll(body)
if err != nil {
return 500, err
}
err = json.Unmarshal(b, in)
if err != nil {
return 400, err
}
return 0, nil
}
Loading

0 comments on commit 0995570

Please sign in to comment.