diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0c076fe5..58f21170 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,7 +1,7 @@ { "ImportPath": "github.com/hyperhq/hyperd", "GoVersion": "go1.5", - "GodepVersion": "v75", + "GodepVersion": "v74", "Packages": [ ".", "github.com/hyperhq/hyperd/cmds/protoc-gen-gogo", @@ -1238,130 +1238,134 @@ "Comment": "v0.6.4", "Rev": "7151adcef72687bf95f451a2e0ba15cb19412bf2" }, + { + "ImportPath": "github.com/hyperhq/hypercontainer-utils/hlog", + "Rev": "e979ff3608ee60c3e00d73d8578b57a1eba778c6" + }, + { + "ImportPath": "github.com/hyperhq/runv/api", + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" + }, { "ImportPath": "github.com/hyperhq/runv/driverloader", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/base", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/cache", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/direct", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/multi", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/single", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/factory/template", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hyperstart/api/json", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/libvirt", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/network", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/network/ipallocator", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/network/iptables", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/network/portmapper", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/qemu", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/types", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/vbox", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/hypervisor/xen", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/lib/govbox", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" - }, - { - "ImportPath": "github.com/hyperhq/runv/lib/linuxsignal", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/lib/telnet", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/lib/term", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/lib/utils", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/hyperhq/runv/template", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" + "Comment": "v0.7.0-67-gdee0472", + "Rev": "dee04726fcd6edf1cb66424a5f0ad6fc2f121a74" }, { "ImportPath": "github.com/imdario/mergo", @@ -1667,15 +1671,6 @@ { "ImportPath": "gopkg.in/yaml.v2", "Rev": "7ad95dd0798a40da1ccdff6dff35fd177b5edf40" - }, - { - "ImportPath": "github.com/hyperhq/runv/api", - "Comment": "v0.7.0-57-ge5a94c2", - "Rev": "e5a94c2fbb26111456c5d0e2408c778f8003a304" - }, - { - "ImportPath": "github.com/hyperhq/hypercontainer-utils/hlog", - "Rev": "e979ff3608ee60c3e00d73d8578b57a1eba778c6" } ] } diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/constants.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/constants.go index 0f9bcd0d..37b37564 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/constants.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/constants.go @@ -28,4 +28,8 @@ const ( INIT_SETUPROUTE INIT_REMOVECONTAINER INIT_PROCESSASYNCEVENT + INIT_SIGNALPROCESS ) + +// "hyperstart" is the special container ID for adding processes. +const HYPERSTART_EXEC_CONTAINER = "hyperstart" diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/types.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/types.go index bb66846e..02b1a427 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/types.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hyperstart/api/json/types.go @@ -15,6 +15,12 @@ type KillCommand struct { Signal syscall.Signal `json:"signal"` } +type SignalCommand struct { + Container string `json:"container"` + Process string `json:"process"` + Signal syscall.Signal `json:"signal"` +} + type ExecCommand struct { Container string `json:"container,omitempty"` Process Process `json:"process"` diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/hypervisor.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/hypervisor.go index bc253c2c..275cf80e 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/hypervisor.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/hypervisor.go @@ -1,6 +1,8 @@ package hypervisor import ( + "fmt" + "github.com/golang/glog" "github.com/hyperhq/runv/hypervisor/network" "github.com/hyperhq/runv/hypervisor/types" @@ -38,7 +40,7 @@ func (ctx *VmContext) Launch() { ctx.loop() } -func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) { +func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, pack []byte) (*VmContext, error) { if glog.V(1) { glog.Infof("VM %s trying to reload with serialized data: %s", vmId, string(pack)) @@ -46,36 +48,16 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p pinfo, err := vmDeserialize(pack) if err != nil { - client <- &types.VmResponse{ - VmId: vmId, - Code: types.E_BAD_REQUEST, - Cause: err.Error(), - } - return + return nil, err } if pinfo.Id != vmId { - client <- &types.VmResponse{ - VmId: vmId, - Code: types.E_BAD_REQUEST, - Cause: "VM ID mismatch", - } - return + return nil, fmt.Errorf("VM ID mismatch, %v vs %v", vmId, pinfo.Id) } context, err := pinfo.vmContext(hub, client) if err != nil { - client <- &types.VmResponse{ - VmId: vmId, - Code: types.E_BAD_REQUEST, - Cause: err.Error(), - } - return - } - - client <- &types.VmResponse{ - VmId: vmId, - Code: types.E_OK, + return nil, err } context.DCtx.Associate(context) @@ -94,6 +76,7 @@ func VmAssociate(vmId string, hub chan VmEvent, client chan *types.VmResponse, p //} go context.loop() + return context, nil } func InitNetwork(bIface, bIP string, disableIptables bool) error { diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/network.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/network.go index 22201a90..b80a3016 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/network.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/network.go @@ -70,13 +70,13 @@ func (nc *NetworkContext) freeSlot(slot int) { if inf, ok := nc.eth[slot]; !ok { nc.sandbox.Log(WARNING, "Freeing an unoccupied eth slot %d", slot) return - } else { + } else if inf != nil { if _, ok := nc.idMap[inf.Id]; ok { delete(nc.idMap, inf.Id) } - nc.sandbox.Log(DEBUG, "Free slot %d of eth", slot) - delete(nc.eth, slot) } + nc.sandbox.Log(DEBUG, "Free slot %d of eth", slot) + delete(nc.eth, slot) } func (nc *NetworkContext) addInterface(inf *api.InterfaceDescription, result chan api.Result) { diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/types/types.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/types/types.go index 709c4cb9..bf9b550f 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/types/types.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/types/types.go @@ -9,7 +9,6 @@ const ( E_FAILED E_EXEC_FINISHED E_CONTAINER_FINISHED - E_BUSY E_NO_TTY E_JSON_PARSE_FAIL E_UNEXPECTED diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm.go index 4edb7034..bc0ee507 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm.go @@ -1,9 +1,11 @@ package hypervisor import ( + "bytes" "encoding/json" "errors" "fmt" + "io/ioutil" "os" "strings" "syscall" @@ -64,7 +66,6 @@ func (vm *Vm) Launch(b *BootConfig) (err error) { go ctx.Launch() vm.ctx = ctx - //} vm.Hub = vmEvent vm.clients = CreateFanout(Status, 128, false) @@ -78,15 +79,15 @@ func (vm *Vm) AssociateVm(data []byte) error { var ( PodEvent = make(chan VmEvent, 128) Status = make(chan *types.VmResponse, 128) + err error ) - VmAssociate(vm.Id, PodEvent, Status, data) - - ass := <-Status - if ass.Code != types.E_OK { - glog.Errorf("cannot associate with vm: %s, error status %d (%s)", vm.Id, ass.Code, ass.Cause) - return errors.New("load vm status failed") + vm.ctx, err = VmAssociate(vm.Id, PodEvent, Status, data) + if err != nil { + glog.Errorf("cannot associate with vm: %v", err) + return err } + // go vm.handlePodEvent(mypod) // vm.Hub = PodEvent @@ -102,85 +103,76 @@ func (vm *Vm) AssociateVm(data []byte) error { return nil } -func (vm *Vm) ReleaseVm() (int, error) { - var Response *types.VmResponse - - Status, err := vm.GetResponseChan() - if err != nil { - return -1, err - } - defer vm.ReleaseResponseChan(Status) +type matchResponse func(response *types.VmResponse) (error, bool) - if vm.ctx.current == StateRunning { - releasePodEvent := &ReleaseVMCommand{} - vm.Hub <- releasePodEvent - for { - Response = <-Status - if Response.Code == types.E_VM_SHUTDOWN || - Response.Code == types.E_OK { - break - } - if Response.Code == types.E_BUSY { - return types.E_BUSY, fmt.Errorf("VM busy") - } +func (vm *Vm) WaitResponse(match matchResponse, timeout int) chan error { + result := make(chan error) + go func() { + var timeoutChan <-chan time.Time + if timeout >= 0 { + timeoutChan = time.After(time.Duration(timeout) * time.Second) + } else { + timeoutChan = make(chan time.Time, 1) } - } - return types.E_OK, nil -} - -func (vm *Vm) WaitVm(timeout int) <-chan bool { - var ( - result = make(chan bool) - timeoutChan <-chan time.Time - ) - - if timeout >= 0 { - timeoutChan = time.After(time.Duration(timeout) * time.Second) - } else { - timeoutChan = make(chan time.Time, 1) - } - - Status, err := vm.GetResponseChan() - if err != nil { - vm.ctx.Log(ERROR, "fail to get response channel: %v", err) - return nil - } - - go func() { + Status, err := vm.GetResponseChan() + if err != nil { + result <- err + return + } defer vm.ReleaseResponseChan(Status) + for { select { case response, ok := <-Status: if !ok { - vm.ctx.Log(WARNING, "status chan broken during waiting vm, it should be closed") - result <- false + result <- fmt.Errorf("Response Chan is broken") return } - if response.Code == types.E_VM_SHUTDOWN { - vm.ctx.Log(INFO, "wait vm: vm exited") - result <- true + if err, exit := match(response); exit { + result <- err return - } case <-timeoutChan: - vm.ctx.Log(WARNING, "timeout while waiting vm") - close(result) + result <- fmt.Errorf("timeout for waiting response") return } } - }() - return result } +func (vm *Vm) ReleaseVm() error { + if vm.ctx.current != StateRunning { + return nil + } + + result := vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_VM_SHUTDOWN || response.Code == types.E_OK { + return nil, true + } + return nil, false + }, -1) + + releasePodEvent := &ReleaseVMCommand{} + vm.Hub <- releasePodEvent + return <-result +} + +func (vm *Vm) WaitVm(timeout int) <-chan error { + return vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_VM_SHUTDOWN { + return nil, true + } + return nil, false + }, timeout) +} + func (vm *Vm) WaitProcess(isContainer bool, ids []string, timeout int) <-chan *api.ProcessExit { var ( - waiting = make(map[string]struct{}) - result = make(chan *api.ProcessExit, len(ids)) - timeoutChan <-chan time.Time - waitEvent = types.E_CONTAINER_FINISHED + waiting = make(map[string]struct{}) + result = make(chan *api.ProcessExit, len(ids)) + waitEvent = types.E_CONTAINER_FINISHED ) if !isContainer { @@ -191,52 +183,40 @@ func (vm *Vm) WaitProcess(isContainer bool, ids []string, timeout int) <-chan *a waiting[id] = struct{}{} } - if timeout >= 0 { - timeoutChan = time.After(time.Duration(timeout) * time.Second) - } else { - timeoutChan = make(chan time.Time, 1) - } - - Status, err := vm.GetResponseChan() - if err != nil { - vm.ctx.Log(ERROR, "fail to get response channel: %v", err) - return nil - } - - go func() { - defer vm.ReleaseResponseChan(Status) - for len(waiting) > 0 { + resChan := vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_VM_SHUTDOWN { + return fmt.Errorf("get shutdown event"), true + } + if response.Code != waitEvent { + return nil, false + } + ps, _ := response.Data.(*types.ProcessFinished) + if _, ok := waiting[ps.Id]; ok { + result <- &api.ProcessExit{ + Id: ps.Id, + Code: int(ps.Code), + FinishedAt: time.Now().UTC(), + } select { - case response, ok := <-Status: - if !ok || response.Code == types.E_VM_SHUTDOWN { - vm.ctx.Log(WARNING, "status chan broken during waiting containers: %#v", waiting) - close(result) - return - } - if response.Code == waitEvent { - ps, _ := response.Data.(*types.ProcessFinished) - if _, ok := waiting[ps.Id]; ok { - result <- &api.ProcessExit{ - Id: ps.Id, - Code: int(ps.Code), - FinishedAt: time.Now().UTC(), - } - delete(waiting, ps.Id) - select { - case ps.Ack <- true: - vm.ctx.Log(TRACE, "got shut down msg, acked here") - default: - vm.ctx.Log(TRACE, "got shut down msg, acked somewhere") - } - } - } - case <-timeoutChan: - vm.ctx.Log(WARNING, "timeout while waiting result of containers: %#v", waiting) - close(result) - return + case ps.Ack <- true: + vm.ctx.Log(TRACE, "got shut down msg, acked here") + default: + vm.ctx.Log(TRACE, "got shut down msg, acked somewhere") } + delete(waiting, ps.Id) + if len(waiting) == 0 { + // got all of processexit event, exit + return nil, true + } + } + // continue to wait other processexit event + return nil, false + }, timeout) + + go func() { + if err := <-resChan; err != nil { + close(result) } - close(result) }() return result @@ -305,51 +285,37 @@ func (vm *Vm) InitSandbox(config *api.SandboxConfig) { } func (vm *Vm) WaitInit() api.Result { - Status, err := vm.GetResponseChan() - if err != nil { - vm.ctx.Log(ERROR, "failed to get status chan to monitor startpod: %v", err) - return api.NewResultBase(vm.Id, false, err.Error()) - } - defer vm.ReleaseResponseChan(Status) - - for { - s, ok := <-Status - if !ok { - return api.NewResultBase(vm.Id, false, "status channel broken") + if err := <-vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_OK { + return nil, true } - - switch s.Code { - case types.E_OK: - return api.NewResultBase(vm.Id, true, "set sandbox config successfully") - case types.E_FAILED, types.E_VM_SHUTDOWN: - return api.NewResultBase(vm.Id, false, "set sandbox config failed") - default: - vm.ctx.Log(DEBUG, "got message %#v while waiting start pod command finish") + if response.Code == types.E_FAILED || response.Code == types.E_VM_SHUTDOWN { + return fmt.Errorf("got failed event when wait init message"), true } + return nil, false + }, -1); err != nil { + return api.NewResultBase(vm.Id, false, err.Error()) } + return api.NewResultBase(vm.Id, true, "wait init message successfully") } func (vm *Vm) Shutdown() api.Result { if vm.ctx.current != StateRunning { return api.NewResultBase(vm.Id, false, "not in running state") } - Status, err := vm.GetResponseChan() - if err != nil { - return api.NewResultBase(vm.Id, false, "fail to get response chan") - } - defer vm.ReleaseResponseChan(Status) - vm.Hub <- &ShutdownCommand{} - for { - Response, ok := <-Status - if !ok { - return api.NewResultBase(vm.Id, false, "status channel broken") - } - glog.V(1).Infof("Got response: %d: %s", Response.Code, Response.Cause) - if Response.Code == types.E_VM_SHUTDOWN { - return api.NewResultBase(vm.Id, true, "set sandbox config successfully") + result := vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_VM_SHUTDOWN { + return nil, true } + return nil, false + }, -1) + + vm.Hub <- &ShutdownCommand{} + if err := <-result; err != nil { + return api.NewResultBase(vm.Id, false, err.Error()) } + return api.NewResultBase(vm.Id, true, "shutdown vm successfully") } // TODO: should we provide a method to force kill vm @@ -399,17 +365,21 @@ func (vm *Vm) ReadFile(container, target string) ([]byte, error) { return cmd.retMsg, err } -func (vm *Vm) KillContainer(container string, signal syscall.Signal) error { - return vm.GenericOperation("KillContainer", func(ctx *VmContext, result chan<- error) { +func (vm *Vm) SignalProcess(container, process string, signal syscall.Signal) error { + return vm.GenericOperation("SignalProcess", func(ctx *VmContext, result chan<- error) { if ctx.current != StateRunning { glog.V(1).Infof("container %s is already stopped, in %s", container, ctx.current) result <- fmt.Errorf("container %s is already stopped", container) return } - ctx.killCmd(container, signal, result) + ctx.signalProcess(container, process, signal, result) }, StateRunning, StateTerminating) } +func (vm *Vm) KillContainer(container string, signal syscall.Signal) error { + return vm.SignalProcess(container, "init", signal) +} + func (vm *Vm) AddRoute() error { return vm.GenericOperation("AddRoute", func(ctx *VmContext, result chan<- error) { routes := ctx.networks.getRoutes() @@ -513,6 +483,47 @@ func (vm *Vm) OnlineCpuMem() error { return nil } +func (vm *Vm) HyperstartExecSync(cmd []string, stdin []byte) (stdout, stderr []byte, err error) { + if len(cmd) == 0 { + return nil, nil, fmt.Errorf("'hyperstart-exec' without command") + } + + execId := fmt.Sprintf("hyperstart-exec-%s", utils.RandStr(10, "alpha")) + + var stdoutBuf, stderrBuf bytes.Buffer + tty := &TtyIO{ + Stdin: ioutil.NopCloser(bytes.NewReader(stdin)), + Stdout: &stdoutBuf, + Stderr: &stderrBuf, + } + + result := vm.WaitProcess(false, []string{execId}, -1) + if result == nil { + err = fmt.Errorf("can not wait hyperstart-exec %q", execId) + glog.Error(err) + return nil, nil, err + } + + err = vm.AddProcess(hyperstartapi.HYPERSTART_EXEC_CONTAINER, execId, false, cmd, []string{}, "/", tty) + if err != nil { + return nil, nil, err + } + + r, ok := <-result + if !ok { + err = fmt.Errorf("wait hyperstart-exec %q interrupted", execId) + glog.Error(err) + return nil, nil, err + } + + glog.V(3).Infof("hyperstart-exec %q terminated at %v with code %d", execId, r.FinishedAt, r.Code) + + if r.Code != 0 { + return stdoutBuf.Bytes(), stderrBuf.Bytes(), fmt.Errorf("exit with error code:%d", r.Code) + } + return stdoutBuf.Bytes(), stderrBuf.Bytes(), nil +} + func (vm *Vm) Exec(container, execId, cmd string, terminal bool, tty *TtyIO) error { var command []string @@ -789,42 +800,35 @@ func NewVm(vmId string, cpu, memory int, lazy bool) *Vm { } } -func GetVm(vmId string, b *BootConfig, waitStarted, lazy bool) (vm *Vm, err error) { +func GetVm(vmId string, b *BootConfig, waitStarted, lazy bool) (*Vm, error) { id := vmId if id == "" { for { id = fmt.Sprintf("vm-%s", utils.RandStr(10, "alpha")) - if _, err = os.Stat(BaseDir + "/" + id); os.IsNotExist(err) { + if _, err := os.Stat(BaseDir + "/" + id); os.IsNotExist(err) { break } } } - vm = NewVm(id, b.CPU, b.Memory, lazy) - if err = vm.Launch(b); err != nil { + vm := NewVm(id, b.CPU, b.Memory, lazy) + if err := vm.Launch(b); err != nil { return nil, err } if waitStarted { - // wait init connected - Status, err := vm.GetResponseChan() - if err != nil { - vm.Kill() - return nil, err - } - defer vm.ReleaseResponseChan(Status) - for { - vmResponse, ok := <-Status - if !ok || vmResponse.Code == types.E_FAILED { - vm.Kill() - return nil, err + if err := <-vm.WaitResponse(func(response *types.VmResponse) (error, bool) { + if response.Code == types.E_FAILED { + return fmt.Errorf("vm start failed"), true } - - if vmResponse.Code == types.E_VM_RUNNING { - break + if response.Code == types.E_VM_RUNNING { + return nil, true } + return nil, false + }, -1); err != nil { + vm.Kill() } } - return vm, nil + return vm, nil } diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm_states.go b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm_states.go index 292d3e6e..db9f8176 100644 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm_states.go +++ b/Godeps/_workspace/src/github.com/hyperhq/runv/hypervisor/vm_states.go @@ -142,11 +142,27 @@ func (ctx *VmContext) execCmd(execId string, cmd *hyperstartapi.ExecCommand, tty } } -func (ctx *VmContext) killCmd(container string, signal syscall.Signal, result chan<- error) { +func (ctx *VmContext) signalProcess(container, process string, signal syscall.Signal, result chan<- error) { + if ctx.vmHyperstartAPIVersion <= 4242 { + if process == "init" { + ctx.vm <- &hyperstartCmd{ + Code: hyperstartapi.INIT_KILLCONTAINER, + Message: hyperstartapi.KillCommand{ + Container: container, + Signal: signal, + }, + result: result, + } + } else { + result <- fmt.Errorf("only the init process of the container can be signaled") + } + return + } ctx.vm <- &hyperstartCmd{ - Code: hyperstartapi.INIT_KILLCONTAINER, - Message: hyperstartapi.KillCommand{ + Code: hyperstartapi.INIT_SIGNALPROCESS, + Message: hyperstartapi.SignalCommand{ Container: container, + Process: process, Signal: signal, }, result: result, diff --git a/Godeps/_workspace/src/github.com/hyperhq/runv/lib/linuxsignal/signal.go b/Godeps/_workspace/src/github.com/hyperhq/runv/lib/linuxsignal/signal.go deleted file mode 100644 index 2957fd22..00000000 --- a/Godeps/_workspace/src/github.com/hyperhq/runv/lib/linuxsignal/signal.go +++ /dev/null @@ -1,42 +0,0 @@ -package linuxsignal - -import "syscall" - -// Linux Signals -const ( - SIGABRT = syscall.Signal(0x6) - SIGALRM = syscall.Signal(0xe) - SIGBUS = syscall.Signal(0x7) - SIGCHLD = syscall.Signal(0x11) - SIGCLD = syscall.Signal(0x11) - SIGCONT = syscall.Signal(0x12) - SIGFPE = syscall.Signal(0x8) - SIGHUP = syscall.Signal(0x1) - SIGILL = syscall.Signal(0x4) - SIGINT = syscall.Signal(0x2) - SIGIO = syscall.Signal(0x1d) - SIGIOT = syscall.Signal(0x6) - SIGKILL = syscall.Signal(0x9) - SIGPIPE = syscall.Signal(0xd) - SIGPOLL = syscall.Signal(0x1d) - SIGPROF = syscall.Signal(0x1b) - SIGPWR = syscall.Signal(0x1e) - SIGQUIT = syscall.Signal(0x3) - SIGSEGV = syscall.Signal(0xb) - SIGSTKFLT = syscall.Signal(0x10) - SIGSTOP = syscall.Signal(0x13) - SIGSYS = syscall.Signal(0x1f) - SIGTERM = syscall.Signal(0xf) - SIGTRAP = syscall.Signal(0x5) - SIGTSTP = syscall.Signal(0x14) - SIGTTIN = syscall.Signal(0x15) - SIGTTOU = syscall.Signal(0x16) - SIGUNUSED = syscall.Signal(0x1f) - SIGURG = syscall.Signal(0x17) - SIGUSR1 = syscall.Signal(0xa) - SIGUSR2 = syscall.Signal(0xc) - SIGVTALRM = syscall.Signal(0x1a) - SIGWINCH = syscall.Signal(0x1c) - SIGXCPU = syscall.Signal(0x18) - SIGXFSZ = syscall.Signal(0x19) -) diff --git a/daemon/pod/decommission.go b/daemon/pod/decommission.go index bf5705da..a03e4ccc 100644 --- a/daemon/pod/decommission.go +++ b/daemon/pod/decommission.go @@ -71,7 +71,6 @@ func (p *XPod) Remove(force bool) error { p.status = S_POD_NONE p.statusLock.Unlock() - os.RemoveAll(path.Join(utils.HYPER_ROOT, "services", p.Id())) os.RemoveAll(path.Join(utils.HYPER_ROOT, "hosts", p.Id())) //TODO get created volumes and remove them diff --git a/daemon/pod/factory.go b/daemon/pod/factory.go index 824e4aac..9e88e378 100644 --- a/daemon/pod/factory.go +++ b/daemon/pod/factory.go @@ -58,6 +58,7 @@ type LogStatus struct { func NewPodFactory(vmFactory factory.Factory, registry *PodList, db *daemondb.DaemonDB, sd PodStorage, eng ContainerEngine, logCfg *GlobalLogConfig) *PodFactory { return &PodFactory{ sd: sd, + db: db, registry: registry, engine: eng, vmFactory: vmFactory, diff --git a/daemon/pod/networks.go b/daemon/pod/networks.go index f994240a..9b0f9137 100644 --- a/daemon/pod/networks.go +++ b/daemon/pod/networks.go @@ -34,8 +34,9 @@ func (inf *Interface) Log(level hlog.LogLevel, args ...interface{}) { } func (inf *Interface) prepare() error { - - defer inf.Log(DEBUG, "prepare inf info: %#v", inf.descript) + defer func() { + inf.Log(DEBUG, "prepare inf info: %#v", inf.descript) + }() if inf.spec.Ip == "" && inf.spec.Bridge != "" { err := fmt.Errorf("if configured a bridge, must specify the IP address") diff --git a/daemon/pod/persist.go b/daemon/pod/persist.go index fe69f620..6a14868f 100644 --- a/daemon/pod/persist.go +++ b/daemon/pod/persist.go @@ -182,7 +182,7 @@ func loadGloabalSpec(db *daemondb.DaemonDB, id string) (*types.UserPod, error) { func (p *XPod) savePodMeta() error { meta := &types.PersistPodMeta{ Id: p.Id(), - Services: p.services, + Services: p.services.get(), Labels: p.labels, } if p.info != nil { @@ -202,7 +202,7 @@ func (p *XPod) loadPodMeta() error { p.info.CreatedAt = meta.CreatedAt } p.labels = meta.Labels - p.services = meta.Services + p.services = newServices(p, meta.Services) return nil } diff --git a/daemon/pod/pod.go b/daemon/pod/pod.go index e616b481..d307524e 100644 --- a/daemon/pod/pod.go +++ b/daemon/pod/pod.go @@ -57,7 +57,7 @@ type XPod struct { containers map[string]*Container volumes map[string]*Volume interfaces map[string]*Interface - services []*apitypes.UserService + services *Services portMappings []*apitypes.PortMapping labels map[string]string resourceLock *sync.Mutex @@ -373,10 +373,6 @@ func (p *XPod) updatePodInfo() error { return nil } -func (p *XPod) HasServiceContainer() bool { - return p.globalSpec.Type == "service-discovery" || len(p.services) > 0 -} - func (p *XPod) ContainerLogger(id string) logger.Logger { c, ok := p.containers[id] if ok { diff --git a/daemon/pod/provision.go b/daemon/pod/provision.go index 825f17bb..d0c1411d 100644 --- a/daemon/pod/provision.go +++ b/daemon/pod/provision.go @@ -7,7 +7,6 @@ import ( "time" "github.com/hyperhq/hypercontainer-utils/hlog" - "github.com/hyperhq/hyperd/servicediscovery" apitypes "github.com/hyperhq/hyperd/types" "github.com/hyperhq/hyperd/utils" runv "github.com/hyperhq/runv/api" @@ -205,9 +204,6 @@ func (p *XPod) Start() error { } if p.status == S_POD_RUNNING { - if err := p.setupServiceInf(); err != nil { - return err - } if err := p.startAll(); err != nil { return err } @@ -334,10 +330,6 @@ func (p *XPod) releaseNames(containers []*apitypes.UserContainer) { // This function will do resource op and update the spec. and won't // access sandbox. func (p *XPod) initResources(spec *apitypes.UserPod, allowCreate bool) error { - if sc := p.ParseServiceDiscovery(spec); sc != nil { - spec.Containers = append([]*apitypes.UserContainer{sc}, spec.Containers...) - } - for _, cspec := range spec.Containers { c, err := newContainer(p, cspec, allowCreate) if err != nil { @@ -362,13 +354,13 @@ func (p *XPod) initResources(spec *apitypes.UserPod, allowCreate bool) error { p.interfaces[nspec.Ifname] = inf } - p.services = spec.Services + p.services = newServices(p, spec.Services) p.portMappings = spec.Portmappings return nil } -// prepareResources() will allocate IP, generate service discovery config file etc. +// prepareResources() will allocate IP. // This apply for creating and restart a stopped pod. func (p *XPod) prepareResources() error { var ( @@ -377,15 +369,6 @@ func (p *XPod) prepareResources() error { //generate /etc/hosts p.factory.hosts.Do() - // gernerate service discovery config - if len(p.services) > 0 { - if err = servicediscovery.PrepareServices(p.services, p.Id()); err != nil { - p.Log(ERROR, "PrepareServices failed %v", err) - return err - } - p.globalSpec.Type = "service-discovery" - } - defer func() { if err != nil { for _, inf := range p.interfaces { @@ -430,6 +413,10 @@ func (p *XPod) addResourcesToSandbox() error { future.Add(ic, c.addToSandbox) } + if p.services.size() != 0 { + future.Add("serivce", p.services.apply) + } + if err := future.Wait(ProvisionTimeout); err != nil { p.Log(ERROR, "error during add resources to sandbox: %v", err) return err @@ -452,31 +439,6 @@ func (p *XPod) startAll() error { return nil } -// only necessary for startup with service -func (p *XPod) setupServiceInf() error { - if len(p.services) == 0 || p.sandbox == nil { - return nil - } - var existing = make(map[string]bool) - for _, srv := range p.services { - if existing[srv.ServiceIP] { - continue - } - p.Log(DEBUG, "init service ip %s", srv.ServiceIP) - existing[srv.ServiceIP] = true - desc := &runv.InterfaceDescription{ - Id: srv.ServiceIP, - Lo: true, - Ip: srv.ServiceIP, - } - if err := p.sandbox.AddNic(desc); err != nil { - p.Log(ERROR, "failed to inf for init service %#v: %v", srv, err) - return err - } - } - return nil -} - func (p *XPod) sandboxShareDir() string { if p.sandbox == nil { // the /dev/null is not a dir, then, can not create or open it diff --git a/daemon/pod/sandbox.go b/daemon/pod/sandbox.go index fa4288e1..34dcb021 100644 --- a/daemon/pod/sandbox.go +++ b/daemon/pod/sandbox.go @@ -1,12 +1,9 @@ package pod import ( - "time" - "github.com/hyperhq/hypercontainer-utils/hlog" "github.com/hyperhq/runv/factory" "github.com/hyperhq/runv/hypervisor" - runvtypes "github.com/hyperhq/runv/hypervisor/types" ) const ( @@ -65,17 +62,9 @@ func dissociateSandbox(sandbox *hypervisor.Vm, retry int) error { return nil } - rval, err := sandbox.ReleaseVm() + err := sandbox.ReleaseVm() if err != nil { hlog.Log(WARNING, "SB[%s] failed to release sandbox: %v", sandbox.Id, err) - if rval == runvtypes.E_BUSY && retry < maxReleaseRetry { - retry++ - hlog.Log(DEBUG, "SB[%s] retry release %d", sandbox.Id, retry) - time.AfterFunc(100*time.Millisecond, func() { - dissociateSandbox(sandbox, retry) - }) - return nil - } hlog.Log(INFO, "SB[%s] shutdown because of failed release", sandbox.Id) sandbox.Kill() return err diff --git a/daemon/pod/servicediscovery.go b/daemon/pod/servicediscovery.go index abfc1117..45ece893 100644 --- a/daemon/pod/servicediscovery.go +++ b/daemon/pod/servicediscovery.go @@ -2,188 +2,309 @@ package pod import ( "fmt" - "path" + "strings" - "github.com/hyperhq/hyperd/servicediscovery" + "github.com/hyperhq/hypercontainer-utils/hlog" apitypes "github.com/hyperhq/hyperd/types" - "github.com/hyperhq/hyperd/utils" ) -func (p *XPod) ParseServiceDiscovery(spec *apitypes.UserPod) *apitypes.UserContainer { - var serviceType string = "service-discovery" - - if len(spec.Services) == 0 || spec.Type == serviceType { - return nil - } +const ( + // set default mode to masquerading + // Others are `i` tunneling and `g` gatewaying + DEFAULT_MODE string = "m" + DEFAULT_WEITHT int = 1 + // Others are wrr|lc|wlc|lblc|lblcr|dh|sh|sed|nq + DEFAULT_SCHEDULER = "rr" +) - spec.Type = serviceType +type Services struct { + p *XPod - return p.generateServiceContainer(spec.Services) + spec []*apitypes.UserService } -func (p *XPod) generateServiceContainer(srvs []*apitypes.UserService) *apitypes.UserContainer { - var serviceDir string = path.Join(utils.HYPER_ROOT, "services", p.Id()) - - /* PrepareServices will check service volume */ - serviceVolume := &apitypes.UserVolume{ - Name: "service-volume", - Source: serviceDir, - Format: "vfs", - Fstype: "dir", - } - - serviceVolRef := &apitypes.UserVolumeReference{ - Volume: "service-volume", - Path: servicediscovery.ServiceVolume, - ReadOnly: false, - Detail: serviceVolume, +func newServices(p *XPod, spec []*apitypes.UserService) *Services { + return &Services{ + p: p, + spec: spec, } +} - return &apitypes.UserContainer{ - Name: ServiceDiscoveryContainerName(p.Id()), - Image: servicediscovery.ServiceImage, - Command: []string{"haproxy", "-D", "-f", "/usr/local/etc/haproxy/haproxy.cfg", "-p", "/var/run/haproxy.pid"}, - Volumes: []*apitypes.UserVolumeReference{serviceVolRef}, - Type: apitypes.UserContainer_SERVICE, - StopSignal: "KILL", - } +func (s *Services) LogPrefix() string { + return fmt.Sprintf("%s[Serv] ", s.p.LogPrefix()) } -func ServiceDiscoveryContainerName(podName string) string { - return podName + "-" + utils.RandStr(10, "alpha") + "-service-discovery" +func (s *Services) Log(level hlog.LogLevel, args ...interface{}) { + hlog.HLog(level, s, 1, args...) } -func (p *XPod) GetServices() ([]*apitypes.UserService, error) { - return p.services, nil +type serviceKey struct { + IP string + Port int32 + Protocol string } -func (p *XPod) NewServiceContainer(srvs []*apitypes.UserService) error { - if !p.IsRunning() { - err := fmt.Errorf("unable to add service container when pod is not running") - p.Log(ERROR, err) - return err +func generateIPVSCmd(service *apitypes.UserService, op string) ([]byte, error) { + if service == nil { + return nil, nil } - p.resourceLock.Lock() - defer p.resourceLock.Unlock() - - p.Log(INFO, "create new service container") + var ( + cmd string + protoFlag string + ) + cmds := []byte{} + DEFAULT_POSTFIX := fmt.Sprintf("-%s -w %d", DEFAULT_MODE, DEFAULT_WEITHT) - p.services = srvs + if strings.ToLower(service.Protocol) == "tcp" { + protoFlag = "-t" + } else if strings.ToLower(service.Protocol) == "udp" { + protoFlag = "-u" + } else { + return nil, fmt.Errorf("unsupported service protocol type: %s", service.Protocol) + } + sConf := fmt.Sprintf("%s %s:%d", protoFlag, service.ServiceIP, service.ServicePort) + switch op { + case "add": + if service.ServiceIP == "" || service.ServicePort == 0 { + return nil, fmt.Errorf("invlide service format, missing service IP or Port") + } + cmd = fmt.Sprintf("-A %s -s %s\n", sConf, DEFAULT_SCHEDULER) + cmds = append(cmds, cmd...) + for _, b := range service.Hosts { + cmd = fmt.Sprintf("-a %s -r %s:%d %s\n", sConf, b.HostIP, b.HostPort, DEFAULT_POSTFIX) + cmds = append(cmds, cmd...) + } + case "del": + cmd = fmt.Sprintf("-D %s\n", sConf) + cmds = append(cmds, cmd...) + default: + return nil, fmt.Errorf("undefined operation type: %s", op) + } + return cmds, nil +} - if err := p.setupServiceInf(); err != nil { - p.Log(ERROR, "failed to create service interfaces: %v", err) - return err +// add will only add services in list that don't exist, else failed +func (s *Services) add(newServs []*apitypes.UserService) error { + var err error + // check if adding service conflict with existing ones + exist := make(map[serviceKey]bool, s.size()) + for _, srv := range s.spec { + exist[serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol}] = true } - sc := p.generateServiceContainer(srvs) - cid, err := p.doContainerCreate(sc) - if err != nil { - p.Log(ERROR, "failed to create service container") - return err + for _, srv := range newServs { + if exist[serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol}] { + err = fmt.Errorf("service %v conflicts with existing ones", newServs) + s.Log(ERROR, err) + return err + } + exist[serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol}] = true } - err = p.ContainerStart(cid) - if err != nil { - p.Log(ERROR, "failed to start service container") - return err + + // if pod is running, convert service to patch and send to vm + if s.p.IsRunning() { + if err = s.commit(newServs, "add"); err != nil { + return err + } } + s.spec = append(s.spec, newServs...) return nil } -func (p *XPod) UpdateService(srvs []*apitypes.UserService) error { - if p.globalSpec.Type != "service-discovery" { - p.globalSpec.Type = "service-discovery" - p.Log(INFO, "change pod type to service discovery") - return p.NewServiceContainer(srvs) +func (s *Services) del(srvs []*apitypes.UserService) error { + var err error + tbd := make(map[serviceKey]bool, len(srvs)) + for _, srv := range srvs { + tbd[serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol}] = true + } + target := make([]*apitypes.UserService, 0, len(srvs)) + remain := make([]*apitypes.UserService, 0, s.size()) + for _, srv := range s.spec { + if tbd[serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol}] { + s.Log(TRACE, "remove serivce %#v", srv) + target = append(target, srv) + } else { + remain = append(remain, srv) + } } - p.Log(INFO, "update service %v", srvs) - p.resourceLock.Lock() - defer p.resourceLock.Unlock() - if p.IsRunning() { - sc := p.getServiceContainer() - p.Log(DEBUG, "apply service to service container") - if err := servicediscovery.ApplyServices(p.sandbox, sc, srvs); err != nil { - p.Log(ERROR, "failed to update services %#v: %v", srvs, err) + if s.p.IsRunning() { + if err = s.commit(target, "del"); err != nil { return err } } - p.services = srvs + s.spec = remain + return nil } -func (p *XPod) AddService(srvs []*apitypes.UserService) error { - if p.globalSpec.Type != "service-discovery" { - p.globalSpec.Type = "service-discovery" - p.Log(INFO, "change pod type to service discovery") - return p.NewServiceContainer(srvs) +// update removes services in list that already exist, and add with new ones +// or just add new ones if they are not exist already +func (s *Services) update(srvs []*apitypes.UserService) error { + var err error + // check if update service list conflicts + tbd := make(map[serviceKey]bool, len(srvs)) + for _, srv := range srvs { + key := serviceKey{srv.ServiceIP, srv.ServicePort, srv.Protocol} + if tbd[key] { + err = fmt.Errorf("given service list conflict: %v", srv) + s.Log(ERROR, err) + return err + } + tbd[key] = true } - p.resourceLock.Lock() - defer p.resourceLock.Unlock() - target := append(p.services, srvs...) + if s.p.IsRunning() { + if err = s.commit(srvs, "update"); err != nil { + return err + } + } + s.spec = srvs + + return nil +} + +func (s *Services) apply() error { + return s.commit(s.spec, "add") +} - if p.IsRunning() { - sc := p.getServiceContainer() - if err := servicediscovery.ApplyServices(p.sandbox, sc, target); err != nil { - p.Log(ERROR, "failed to update services %#v: %v", target, err) +func (s *Services) commit(srvs []*apitypes.UserService, operation string) error { + var ( + err error + patch []byte + ) + if operation == "update" { + // clear all rules first + patch = append(patch, []byte("-C\n")...) + operation = "add" + } + // generate patch + for _, srv := range srvs { + cmd, err := generateIPVSCmd(srv, operation) + if err != nil { + s.Log(ERROR, "faild to generate IPVS command: %v", err) return err } + patch = append(patch, cmd...) + } + // send to vm + if err = s.commitToVm(patch); err != nil { + s.Log(ERROR, "faild to apply IPVS service patch: %v", err) + return err } - p.services = target + return nil } -func (p *XPod) DeleteService(srvs []*apitypes.UserService) error { - if p.globalSpec.Type != "service-discovery" { - err := fmt.Errorf("pod does not support service discovery") - p.Log(ERROR, err) +func (s *Services) commitToVm(patch []byte) error { + s.Log(TRACE, "commit IPVS service patch: \n%s", string(patch)) + + saveData, err := s.getFromVm() + if err != nil { return err } - p.resourceLock.Lock() - defer p.resourceLock.Unlock() - tbd := make(map[struct { - IP string - Port int32 - }]bool, len(srvs)) - for _, srv := range srvs { - tbd[struct { - IP string - Port int32 - }{srv.ServiceIP, srv.ServicePort}] = true - } - target := make([]*apitypes.UserService, 0, len(p.services)) - for _, srv := range p.services { - if tbd[struct { - IP string - Port int32 - }{srv.ServiceIP, srv.ServicePort}] { - p.Log(TRACE, "remove service %#v", srv) - continue + clear := func() error { + cmd := []string{"ipvsadm", "-C"} + _, stderr, err := s.p.sandbox.HyperstartExecSync(cmd, nil) + if err != nil { + s.Log(ERROR, "clear ipvs rules failed: %v, %s", err, stderr) + return err + } + + return nil + } + + apply := func(rules []byte) error { + cmd := []string{"ipvsadm", "-R"} + _, stderr, err := s.p.sandbox.HyperstartExecSync(cmd, rules) + if err != nil { + s.Log(ERROR, "apply ipvs rules failed: %v, %s", err, stderr) + return err } - target = append(target, srv) + + return nil } - if p.IsRunning() { - sc := p.getServiceContainer() - if err := servicediscovery.ApplyServices(p.sandbox, sc, target); err != nil { - p.Log(ERROR, "failed to update services %#v: %v", target, err) + if err = apply(patch); err != nil { + // restore original ipvs services + err1 := clear() + if err1 != nil { + s.Log(ERROR, "restore original ipvs services failed in clear stage: %v", err1) return err } + err1 = apply(saveData) + if err1 != nil { + s.Log(ERROR, "restore original ipvs services failed in apply stage: %v", err1) + } + return err } - p.services = target + return nil } -func (p *XPod) getServiceContainer() string { - if p.globalSpec.Type != "service-discovery" { - return "" +func (s *Services) getFromVm() ([]byte, error) { + cmd := []string{"ipvsadm", "-Ln"} + stdout, stderr, err := s.p.sandbox.HyperstartExecSync(cmd, nil) + if err != nil { + s.Log(ERROR, "get ipvs service from vm failed: %v, %s", err, stderr) + return nil, err } - for _, c := range p.containers { - if c.spec.Type == apitypes.UserContainer_SERVICE { - return c.Id() - } + + return stdout, nil +} + +func (s *Services) size() int { + if s.spec == nil { + return 0 + } + return len(s.spec) +} + +func (s *Services) get() []*apitypes.UserService { + return s.spec +} + +func (p *XPod) GetServices() ([]*apitypes.UserService, error) { + p.resourceLock.Lock() + defer p.resourceLock.Unlock() + + return p.services.get(), nil +} + +func (p *XPod) UpdateService(srvs []*apitypes.UserService) error { + p.resourceLock.Lock() + defer p.resourceLock.Unlock() + + if err := p.services.update(srvs); err != nil { + p.Log(ERROR, "failed to update services: %v", err) + return err + } + + return p.savePodMeta() +} + +func (p *XPod) AddService(srvs []*apitypes.UserService) error { + p.resourceLock.Lock() + defer p.resourceLock.Unlock() + + if err := p.services.add(srvs); err != nil { + p.Log(ERROR, "failed to add services: %v", err) + return err + } + + return p.savePodMeta() +} + +func (p *XPod) DeleteService(srvs []*apitypes.UserService) error { + p.resourceLock.Lock() + defer p.resourceLock.Unlock() + + if err := p.services.del(srvs); err != nil { + p.Log(ERROR, "failed to delete service: %v", err) + return err } - return "" + + return p.savePodMeta() } diff --git a/integration/hyper_test.go b/integration/hyper_test.go index 4cf16d93..dfcb0662 100644 --- a/integration/hyper_test.go +++ b/integration/hyper_test.go @@ -254,9 +254,6 @@ func (s *TestSuite) TestPullImage(c *C) { } func (s *TestSuite) TestAddListDeleteService(c *C) { - err := s.client.PullImage("haproxy", "1.4", nil) - c.Assert(err, IsNil) - spec := types.UserPod{ Containers: []*types.UserContainer{ { diff --git a/servicediscovery/servicediscovery.go b/servicediscovery/servicediscovery.go deleted file mode 100644 index b6e37a0f..00000000 --- a/servicediscovery/servicediscovery.go +++ /dev/null @@ -1,232 +0,0 @@ -package servicediscovery - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "strconv" - "strings" - - "github.com/golang/glog" - apitypes "github.com/hyperhq/hyperd/types" - "github.com/hyperhq/hyperd/utils" - "github.com/hyperhq/runv/hypervisor" - "github.com/hyperhq/runv/lib/linuxsignal" -) - -var ( - ServiceVolume string = "/usr/local/etc/haproxy/" - ServiceImage string = "haproxy:1.5" - ServiceConfig string = "haproxy.cfg" -) - -func UpdateLoopbackAddress(vm *hypervisor.Vm, container string, oldServices, newServices []*apitypes.UserService) error { - addedIPs := make([]string, 0, 1) - deletedIPs := make([]string, 0, 1) - - for _, n := range newServices { - found := 0 - for _, o := range oldServices { - if n.ServiceIP == o.ServiceIP { - found = 1 - } - } - if found == 0 { - addedIPs = append(addedIPs, n.ServiceIP) - } - } - - for _, o := range oldServices { - found := 0 - for _, n := range newServices { - if n.ServiceIP == o.ServiceIP { - found = 1 - } - } - if found == 0 { - deletedIPs = append(deletedIPs, o.ServiceIP) - } - } - - for _, ip := range addedIPs { - err := SetupLoopbackAddress(vm, container, ip, "add") - if err != nil { - return err - } - } - - for _, ip := range deletedIPs { - err := SetupLoopbackAddress(vm, container, ip, "del") - if err != nil { - return err - } - } - - return nil -} - -// Setup lo ip address -// options for operation: add or del -func SetupLoopbackAddress(vm *hypervisor.Vm, container, ip, operation string) error { - execId := fmt.Sprintf("exec-%s", utils.RandStr(10, "alpha")) - command := "ip addr " + operation + " dev lo " + ip + "/32" - execcmd, err := json.Marshal(strings.Split(command, " ")) - if err != nil { - return err - } - - result := vm.WaitProcess(false, []string{execId}, 60) - if result == nil { - return fmt.Errorf("can not wait %s, id: %s", command, execId) - } - - if err := vm.Exec(container, execId, string(execcmd), false, &hypervisor.TtyIO{}); err != nil { - return err - } - - r, ok := <-result - if !ok { - return fmt.Errorf("exec failed %s: %s", command, execId) - } - if r.Code != 0 { - return fmt.Errorf("exec %s on container %s failed with exit code %d", command, container, r.Code) - } - - return nil -} - -func ApplyServices(vm *hypervisor.Vm, container string, services []*apitypes.UserService) error { - // Update lo ip addresses - oldServices, err := GetServices(vm, container) - if err != nil { - return err - } - err = UpdateLoopbackAddress(vm, container, oldServices, services) - if err != nil { - return err - } - - // Update haproxy config - config := path.Join(ServiceVolume, ServiceConfig) - vm.WriteFile(container, config, GenerateServiceConfig(services)) - - return vm.KillContainer(container, linuxsignal.SIGHUP) -} - -func GetServices(vm *hypervisor.Vm, container string) ([]*apitypes.UserService, error) { - var services []*apitypes.UserService - config := path.Join(ServiceVolume, ServiceConfig) - - data, err := vm.ReadFile(container, config) - if err != nil { - return nil, err - } - - // if there's no data read, token will be empty and this method will return an empty service list - token := bytes.Split(data, []byte("\n")) - - for _, tok := range token { - first := bytes.Split(tok, []byte(" ")) - reader := bytes.NewReader(tok) - if len(first) > 0 { - var t1, t2, t3, t4 string - if string(first[0][:]) == "frontend" { - s := &apitypes.UserService{ - Protocol: "TCP", - } - - _, err := fmt.Fscanf(reader, "%s %s %s", &t1, &t2, &t3) - if err != nil { - return nil, err - } - - hostport := strings.Split(t3, ":") - s.ServiceIP = hostport[0] - port, err := strconv.ParseInt(hostport[1], 10, 32) - if err != nil { - return nil, err - } - s.ServicePort = int32(port) - - services = append(services, s) - } else if string(first[0][:]) == "\tserver" { - var idx int - var h = &apitypes.UserServiceBackend{} - _, err := fmt.Fscanf(reader, "%s %s %s %s", &t1, &t2, &t3, &t4) - if err != nil { - return nil, err - } - - hostport := strings.Split(t3, ":") - h.HostIP = hostport[0] - port, err := strconv.ParseInt(hostport[1], 10, 32) - if err != nil { - return nil, err - } - h.HostPort = int32(port) - - idxs := strings.Split(t2, "-") - idxLong, err := strconv.ParseInt(idxs[1], 10, 32) - if err != nil { - return nil, err - } - idx = int(idxLong) - - services[idx].Hosts = append(services[idx].Hosts, h) - } - } - } - return services, nil -} - -func GenerateServiceConfig(services []*apitypes.UserService) []byte { - data := []byte{} - - globalConfig := fmt.Sprintf("global\n\t#chroot\t/var/lib/haproxy\n\tpidfile\t/var/run/haproxy.pid\n\tmaxconn\t4000\n\t#user\thaproxy\n\t#group\thaproxy\n\tdaemon\ndefaults\n\tmode\ttcp\n\tretries\t3\n\ttimeout queue\t1m\n\ttimeout connect\t10s\n\ttimeout client\t1m\n\ttimeout server\t1m\n\ttimeout check\t10s\n\tmaxconn\t3000\n") - - data = append(data, globalConfig...) - for idx, srv := range services { - front := fmt.Sprintf("frontend front%d %s:%d\n\tdefault_backend\tback%d\n", - idx, srv.ServiceIP, srv.ServicePort, idx) - data = append(data, front...) - back := fmt.Sprintf("backend back%d\n\tbalance\troundrobin\n", idx) - data = append(data, back...) - for hostid, host := range srv.Hosts { - back := fmt.Sprintf("\tserver back-%d-%d %s:%d check\n", - idx, hostid, host.HostIP, host.HostPort) - data = append(data, back...) - } - } - - glog.V(1).Infof("haproxy config: %s", data[:]) - return data -} - -func checkHaproxyConfig(services []*apitypes.UserService, config string) error { - var err error - glog.V(1).Infof("haproxy config: %s\n", config) - if _, err = os.Stat(config); err != nil && os.IsNotExist(err) { - /* Generate haproxy config from service and write to config */ - return ioutil.WriteFile(config, GenerateServiceConfig(services), 0644) - } - return err -} - -func PrepareServices(services []*apitypes.UserService, podId string) error { - var serviceDir string = path.Join(utils.HYPER_ROOT, "services", podId) - var config string = path.Join(serviceDir, ServiceConfig) - var err error - - if len(services) == 0 { - return nil - } - - if err = os.MkdirAll(serviceDir, 0755); err != nil && !os.IsExist(err) { - return err - } - - return checkHaproxyConfig(services, config) -} diff --git a/types/validate.go b/types/validate.go index 3cbfb917..f97a4672 100644 --- a/types/validate.go +++ b/types/validate.go @@ -57,6 +57,14 @@ func (pod *UserPod) Validate() error { return errors.New("Files name does not unique") } } + + uniq, sset := keySet(pod.Services) + if !uniq { + if len(sset) > 0 { + return errors.New("Services IP:Port@Protocol combination does not unique") + } + } + var permReg = regexp.MustCompile("0[0-7]{3}") for idx, container := range pod.Containers { @@ -138,6 +146,9 @@ func (vol UserVolume) key() string { return vol.Name } func (vol UserVolumeReference) key() string { return vol.Volume } func (f UserFile) key() string { return f.Name } func (env EnvironmentVar) key() string { return env.Env } +func (srv UserService) key() string { + return fmt.Sprintf("%s:%s@%s", srv.ServiceIP, srv.ServicePort, strings.ToLower(srv.Protocol)) +} func InterfaceSlice(slice interface{}) ([]interface{}, error) { s := reflect.ValueOf(slice)