Skip to content

Commit

Permalink
Merge pull request #31 from helen-frank/helen-frank/dev-general
Browse files Browse the repository at this point in the history
feat: implement instance provider
  • Loading branch information
helen-frank authored Oct 8, 2024
2 parents 9d399b1 + 7dfc0d3 commit 5f59314
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 22 deletions.
13 changes: 11 additions & 2 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/karpenter/pkg/operator"

"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/instance"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/providers/pricing"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils/client"
)
Expand All @@ -31,7 +32,8 @@ import (
type Operator struct {
*operator.Operator

PricingProvider pricing.Provider
InstanceProvider instance.Provider
PricingProvider pricing.Provider
}

func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) {
Expand All @@ -47,9 +49,16 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
*ecsClient.RegionId,
)

instanceProvider := instance.NewDefaultProvider(
ctx,
*ecsClient.RegionId,
ecsClient,
)

return ctx, &Operator{
Operator: operator,

PricingProvider: pricingProvider,
InstanceProvider: instanceProvider,
PricingProvider: pricingProvider,
}
}
75 changes: 75 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2024 The CloudPilot AI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"context"
"errors"
"flag"
"fmt"
"os"

coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/utils/env"
)

func init() {
coreoptions.Injectables = append(coreoptions.Injectables, &Options{})
}

type optionsKey struct{}

type Options struct {
ClusterCABundle string
ClusterName string
ClusterEndpoint string
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.ClusterCABundle, "cluster-ca-bundle", env.WithDefaultString("CLUSTER_CA_BUNDLE", ""), "Cluster CA bundle for nodes to use for TLS connections with the API server. If not set, this is taken from the controller's TLS configuration.")
fs.StringVar(&o.ClusterName, "cluster-name", env.WithDefaultString("CLUSTER_NAME", ""), "[REQUIRED] The kubernetes cluster name for resource discovery.")
fs.StringVar(&o.ClusterEndpoint, "cluster-endpoint", env.WithDefaultString("CLUSTER_ENDPOINT", ""), "The external kubernetes cluster endpoint for new nodes to connect with. If not specified, will discover the cluster endpoint using DescribeCluster API.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
if err := fs.Parse(args); err != nil {
if errors.Is(err, flag.ErrHelp) {
os.Exit(0)
}
return fmt.Errorf("parsing flags, %w", err)
}
if err := o.Validate(); err != nil {
return fmt.Errorf("validating options, %w", err)
}
return nil
}

func (o *Options) ToContext(ctx context.Context) context.Context {
return ToContext(ctx, o)
}

func ToContext(ctx context.Context, opts *Options) context.Context {
return context.WithValue(ctx, optionsKey{}, opts)
}

func FromContext(ctx context.Context) *Options {
retval := ctx.Value(optionsKey{})
if retval == nil {
return nil
}
return retval.(*Options)
}
51 changes: 51 additions & 0 deletions pkg/operator/options/options_validation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2024 The CloudPilot AI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"fmt"
"net/url"

"go.uber.org/multierr"
)

func (o Options) Validate() error {
return multierr.Combine(
o.validateEndpoint(),
o.validateRequiredFields(),
)
}

func (o Options) validateEndpoint() error {
if o.ClusterEndpoint == "" {
return nil
}
endpoint, err := url.Parse(o.ClusterEndpoint)
// url.Parse() will accept a lot of input without error; make
// sure it's a real URL
if err != nil || !endpoint.IsAbs() || endpoint.Hostname() == "" {
return fmt.Errorf("%q is not a valid cluster-endpoint URL", o.ClusterEndpoint)
}
return nil
}

func (o Options) validateRequiredFields() error {
if o.ClusterName == "" {
return fmt.Errorf("missing field, cluster-name")
}
return nil
}
126 changes: 116 additions & 10 deletions pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ package instance

import (
"context"
"fmt"

ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
"go.uber.org/multierr"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"

"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/apis/v1alpha1"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/operator/options"
"github.com/cloudpilot-ai/karpenter-provider-alicloud/pkg/utils/alierrors"
)

type Provider interface {
Expand All @@ -34,41 +41,140 @@ type Provider interface {
}

type DefaultProvider struct {
region string
ecsClient *ecsclient.Client
region string
}

func NewDefaultProvider(ctx context.Context, region string) *DefaultProvider {
func NewDefaultProvider(ctx context.Context, region string, ecsClient *ecsclient.Client) *DefaultProvider {
return &DefaultProvider{
region: region,
ecsClient: ecsClient,
region: region,
}
}

func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim, instanceTypes []*cloudprovider.InstanceType) (*Instance, error) {
func (p *DefaultProvider) Create(ctx context.Context, nodeClass *v1alpha1.ECSNodeClass, nodeClaim *karpv1.NodeClaim,
instanceTypes []*cloudprovider.InstanceType) (*Instance, error) {

// TODO: implement me
return nil, nil
}

func (p *DefaultProvider) Get(ctx context.Context, id string) (*Instance, error) {
describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
RegionId: &p.region,
InstanceIds: tea.String("[\"" + id + "\"]"),
}
runtime := &util.RuntimeOptions{}

// TODO: implement me
return nil, nil
resp, err := p.ecsClient.DescribeInstancesWithOptions(describeInstancesRequest, runtime)
if err != nil {
return nil, err
}

if resp == nil || resp.Body == nil || resp.Body.Instances == nil {
return nil, fmt.Errorf("failed to get instance %s", id)
}

if len(resp.Body.Instances.Instance) != 1 {
return nil, fmt.Errorf("expected a single instance, %w", err)
}

return NewInstance(resp.Body.Instances.Instance[0]), nil
}

func (p *DefaultProvider) List(ctx context.Context) ([]*Instance, error) {
var instances []*Instance

describeInstancesRequest := &ecsclient.DescribeInstancesRequest{
Tag: []*ecsclient.DescribeInstancesRequestTag{
{
Key: tea.String(karpv1.NodePoolLabelKey),
},
{
Key: tea.String(v1alpha1.LabelNodeClass),
},
{
Key: tea.String("kubernetes.io/cluster"),
Value: tea.String(options.FromContext(ctx).ClusterName),
},
},
}

// TODO: implement me
return nil, nil
runtime := &util.RuntimeOptions{}

for {
// TODO: limit 1000
/* Refer https://api.aliyun.com/api/Ecs/2014-05-26/DescribeInstances
If you use one tag to filter resources, the number of resources queried under that tag cannot exceed 1000;
if you use multiple tags to filter resources, the number of resources queried with multiple tags bound at the
same time cannot exceed 1000. If the number of resources exceeds 1000, use the ListTagResources interface to query.
*/
resp, err := p.ecsClient.DescribeInstancesWithOptions(describeInstancesRequest, runtime)
if err != nil {
return nil, err
}

if resp == nil || resp.Body == nil || resp.Body.NextToken == nil || resp.Body.Instances == nil ||
*resp.Body.NextToken == "" || len(resp.Body.Instances.Instance) == 0 {
break
}

describeInstancesRequest.NextToken = resp.Body.NextToken
for i := range resp.Body.Instances.Instance {
instances = append(instances, NewInstance(resp.Body.Instances.Instance[i]))
}
}

return instances, nil
}

func (p *DefaultProvider) Delete(ctx context.Context, id string) error {
deleteInstanceRequest := &ecsclient.DeleteInstanceRequest{
InstanceId: tea.String(id),
}

runtime := &util.RuntimeOptions{}
if _, err := p.ecsClient.DeleteInstanceWithOptions(deleteInstanceRequest, runtime); err != nil {
if alierrors.IsNotFound(err) {
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("instance already terminated"))
}

if _, e := p.Get(ctx, id); e != nil {
if cloudprovider.IsNodeClaimNotFoundError(e) {
return e
}
err = multierr.Append(err, e)
}

return fmt.Errorf("terminating instance, %w", err)
}

// TODO: implement me
return nil
}

func (p *DefaultProvider) CreateTags(ctx context.Context, id string, tags map[string]string) error {
ecsTags := make([]*ecsclient.AddTagsRequestTag, 0, len(tags))
for k, v := range tags {
ecsTags = append(ecsTags, &ecsclient.AddTagsRequestTag{
Key: tea.String(k),
Value: tea.String(v),
})
}

addTagsRequest := &ecsclient.AddTagsRequest{
RegionId: &p.region,
ResourceType: tea.String("instance"),
ResourceId: tea.String(id),
Tag: ecsTags,
}

runtime := &util.RuntimeOptions{}
if _, err := p.ecsClient.AddTagsWithOptions(addTagsRequest, runtime); err != nil {
if alierrors.IsNotFound(err) {
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("tagging instance, %w", err))
}
return fmt.Errorf("tagging instance, %w", err)
}

// TODO: implement me
return nil
}
30 changes: 26 additions & 4 deletions pkg/providers/instance/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ limitations under the License.

package instance

import "time"
import (
ecsclient "github.com/alibabacloud-go/ecs-20140526/v4/client"
"github.com/samber/lo"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
)

// Instance is an internal data representation of either an ecs.Instance or an ecs.FleetInstance
// Instance is an internal data representation of either an ecsclient.DescribeInstancesResponseBodyInstancesInstance
// It contains all the common data that is needed to inject into the Machine from either of these responses
type Instance struct {
LaunchTime time.Time
CreationTime string
State string
ID string
ImageID string
Expand All @@ -31,5 +35,23 @@ type Instance struct {
SecurityGroupIDs []string
SubnetID string
Tags map[string]string
EFAEnabled bool
}

func NewInstance(out *ecsclient.DescribeInstancesResponseBodyInstancesInstance) *Instance {
return &Instance{
CreationTime: *out.CreationTime,
State: *out.Status,
ID: *out.InstanceId,
ImageID: *out.ImageId,
Type: *out.InstanceType,
Zone: *out.ZoneId,
CapacityType: lo.Ternary(out.SpotStrategy != nil && *out.SpotStrategy != "NoSpot", karpv1.CapacityTypeSpot, karpv1.CapacityTypeOnDemand),
SecurityGroupIDs: lo.Map(out.SecurityGroupIds.SecurityGroupId, func(securitygroup *string, _ int) string {
return *securitygroup
}),
SubnetID: *out.VpcAttributes.VSwitchId,
Tags: lo.SliceToMap(out.Tags.Tag, func(tag *ecsclient.DescribeInstancesResponseBodyInstancesInstanceTagsTag) (string, string) {
return *tag.TagKey, *tag.TagValue
}),
}
}
Loading

0 comments on commit 5f59314

Please sign in to comment.