Skip to content

Commit

Permalink
Merge branch 'master' of github.com:kubernetes/kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Aug 9, 2017
2 parents 33ba8aa + a212b8d commit 5f19c47
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 16 deletions.
8 changes: 4 additions & 4 deletions pkg/cloudprovider/providers/openstack/openstack_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (os *OpenStack) NewNetworkV2() (*gophercloud.ServiceClient, error) {
Region: os.region,
})
if err != nil {
glog.Warningf("Failed to find network v2 endpoint: %v", err)
glog.Warningf("Failed to find network v2 endpoint for region %s: %v", os.region, err)
return nil, err
}
return network, nil
Expand All @@ -39,7 +39,7 @@ func (os *OpenStack) NewComputeV2() (*gophercloud.ServiceClient, error) {
Region: os.region,
})
if err != nil {
glog.Warningf("Failed to find compute v2 endpoint: %v", err)
glog.Warningf("Failed to find compute v2 endpoint for region %s: %v", os.region, err)
return nil, err
}
return compute, nil
Expand All @@ -50,7 +50,7 @@ func (os *OpenStack) NewBlockStorageV1() (*gophercloud.ServiceClient, error) {
Region: os.region,
})
if err != nil {
glog.Errorf("Unable to initialize cinder v1 client for region: %s", os.region)
glog.Errorf("Unable to initialize cinder v1 client for region %s: %v", os.region, err)
return nil, err
}
return storage, nil
Expand All @@ -61,7 +61,7 @@ func (os *OpenStack) NewBlockStorageV2() (*gophercloud.ServiceClient, error) {
Region: os.region,
})
if err != nil {
glog.Errorf("Unable to initialize cinder v2 client for region: %s", os.region)
glog.Errorf("Unable to initialize cinder v2 client for region %s: %v", os.region, err)
return nil, err
}
return storage, nil
Expand Down
24 changes: 22 additions & 2 deletions pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ const (

activeStatus = "ACTIVE"
errorStatus = "ERROR"

ServiceAnnotationLoadBalancerFloatingNetworkId = "loadbalancer.openstack.org/floating-network-id"
)

// LoadBalancer implementation for LBaaS v1
Expand Down Expand Up @@ -581,6 +583,21 @@ func nodeAddressForLB(node *v1.Node) (string, error) {
return addrs[0].Address, nil
}

//getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
func getStringFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting string) string {
glog.V(4).Infof("getStringFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
if annotationValue, ok := service.Annotations[annotationKey]; ok {
//if there is an annotation for this setting, set the "setting" var to it
// annotationValue can be empty, it is working as designed
// it makes possible for instance provisioning loadbalancer without floatingip
glog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
return annotationValue
}
//if there is no annotation, set "settings" var to the value from cloud config
glog.V(4).Infof("Could not find a Service Annotation; falling back on cloud-config setting: %v = %v", annotationKey, defaultSetting)
return defaultSetting
}

// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
Expand All @@ -598,6 +615,9 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}

floatingPool := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkId, lbaas.opts.FloatingNetworkId)
glog.V(4).Infof("EnsureLoadBalancer using floatingPool: %v", floatingPool)

// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
for _, port := range ports {
Expand Down Expand Up @@ -827,10 +847,10 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("Error getting floating ip for port %s: %v", portID, err)
}
if floatIP == nil && lbaas.opts.FloatingNetworkId != "" {
if floatIP == nil && floatingPool != "" {
glog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: lbaas.opts.FloatingNetworkId,
FloatingNetworkID: floatingPool,
PortID: portID,
}
floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
Expand Down
20 changes: 10 additions & 10 deletions pkg/cloudprovider/providers/openstack/openstack_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,7 @@ func (os *OpenStack) AttachDisk(instanceID, volumeID string) (string, error) {
if err != nil {
return "", err
}
if volume.Status != VolumeAvailableStatus {
errmsg := fmt.Sprintf("volume %s status is %s, not %s, can not be attached to instance %s.", volume.Name, volume.Status, VolumeAvailableStatus, instanceID)
glog.Errorf(errmsg)
return "", errors.New(errmsg)
}

cClient, err := os.NewComputeV2()
if err != nil {
return "", err
Expand All @@ -230,11 +226,9 @@ func (os *OpenStack) AttachDisk(instanceID, volumeID string) (string, error) {
glog.V(4).Infof("Disk %s is already attached to instance %s", volumeID, instanceID)
return volume.ID, nil
}
glog.V(2).Infof("Disk %s is attached to a different instance (%s), detaching", volumeID, volume.AttachedServerId)
err = os.DetachDisk(volume.AttachedServerId, volumeID)
if err != nil {
return "", err
}
errmsg := fmt.Sprintf("Disk %s is attached to a different instance (%s)", volumeID, volume.AttachedServerId)
glog.V(2).Infof(errmsg)
return "", errors.New(errmsg)
}

startTime := time.Now()
Expand All @@ -258,6 +252,12 @@ func (os *OpenStack) DetachDisk(instanceID, volumeID string) error {
if err != nil {
return err
}
if volume.Status == VolumeAvailableStatus {
// "available" is fine since that means the volume is detached from instance already.
glog.V(2).Infof("volume: %s has been detached from compute: %s ", volume.ID, instanceID)
return nil
}

if volume.Status != VolumeInUseStatus {
errmsg := fmt.Sprintf("can not detach volume %s, its status is %s.", volume.Name, volume.Status)
glog.Errorf(errmsg)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package(default_visibility = ["//visibility:public"])

licenses(["notice"])

load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_test(
name = "go_default_test",
srcs = ["openstack_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
)

go_library(
name = "go_default_library",
srcs = ["openstack.go"],
tags = ["automanaged"],
deps = [
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package openstack

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/golang/glog"
"github.com/gophercloud/gophercloud/openstack"

restclient "k8s.io/client-go/rest"
)

func init() {
if err := restclient.RegisterAuthProviderPlugin("openstack", newOpenstackAuthProvider); err != nil {
glog.Fatalf("Failed to register openstack auth plugin: %s", err)
}
}

// DefaultTTLDuration is the time before a token gets expired.
const DefaultTTLDuration = 10 * time.Minute

// openstackAuthProvider is an authprovider for openstack. this provider reads
// the environment variables to determine the client identity, and generates a
// token which will be inserted into the request header later.
type openstackAuthProvider struct {
ttl time.Duration

tokenGetter TokenGetter
}

// TokenGetter returns a bearer token that can be inserted into request.
type TokenGetter interface {
Token() (string, error)
}

type tokenGetter struct{}

// Token creates a token by authenticate with keystone.
func (*tokenGetter) Token() (string, error) {
options, err := openstack.AuthOptionsFromEnv()
if err != nil {
return "", fmt.Errorf("failed to read openstack env vars: %s", err)
}
client, err := openstack.AuthenticatedClient(options)
if err != nil {
return "", fmt.Errorf("authentication failed: %s", err)
}
return client.TokenID, nil
}

// cachedGetter caches a token until it gets expired, after the expiration, it will
// generate another token and cache it.
type cachedGetter struct {
mutex sync.Mutex
tokenGetter TokenGetter

token string
born time.Time
ttl time.Duration
}

// Token returns the current available token, create a new one if expired.
func (c *cachedGetter) Token() (string, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

var err error
// no token or exceeds the TTL
if c.token == "" || time.Now().Sub(c.born) > c.ttl {
c.token, err = c.tokenGetter.Token()
if err != nil {
return "", fmt.Errorf("failed to get token: %s", err)
}
c.born = time.Now()
}
return c.token, nil
}

// tokenRoundTripper implements the RoundTripper interface: adding the bearer token
// into the request header.
type tokenRoundTripper struct {
http.RoundTripper

tokenGetter TokenGetter
}

// RoundTrip adds the bearer token into the request.
func (t *tokenRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
// if the authorization header already present, use it.
if req.Header.Get("Authorization") != "" {
return t.RoundTripper.RoundTrip(req)
}

token, err := t.tokenGetter.Token()
if err == nil {
req.Header.Set("Authorization", "Bearer "+token)
} else {
glog.V(4).Infof("failed to get token: %s", err)
}

return t.RoundTripper.RoundTrip(req)
}

// newOpenstackAuthProvider creates an auth provider which works with openstack
// environment.
func newOpenstackAuthProvider(clusterAddress string, config map[string]string, persister restclient.AuthProviderConfigPersister) (restclient.AuthProvider, error) {
var ttlDuration time.Duration
var err error

ttl, found := config["ttl"]
if !found {
ttlDuration = DefaultTTLDuration
// persist to config
config["ttl"] = ttlDuration.String()
if err = persister.Persist(config); err != nil {
return nil, fmt.Errorf("failed to persist config: %s", err)
}
} else {
ttlDuration, err = time.ParseDuration(ttl)
if err != nil {
return nil, fmt.Errorf("failed to parse ttl config: %s", err)
}
}

// TODO: read/persist client configuration(OS_XXX env vars) in config

return &openstackAuthProvider{
ttl: ttlDuration,
tokenGetter: &tokenGetter{},
}, nil
}

func (oap *openstackAuthProvider) WrapTransport(rt http.RoundTripper) http.RoundTripper {
return &tokenRoundTripper{
RoundTripper: rt,
tokenGetter: &cachedGetter{
tokenGetter: oap.tokenGetter,
ttl: oap.ttl,
},
}
}

func (oap *openstackAuthProvider) Login() error { return nil }
Loading

0 comments on commit 5f19c47

Please sign in to comment.