Skip to content

Commit

Permalink
*: fix tpr client
Browse files Browse the repository at this point in the history
There is known issue with TPR in client-go:
   kubernetes/client-go#8
 Workarounds:
 - We include `Metadata` field in object explicitly.
 - Copy the solutions from client-go TPR examples to work around a
known problem with third-party resources and ugorji.
```go
type ClusterListCopy ClusterList
type ClusterCopy Cluster

func (c *Cluster) UnmarshalJSON(data []byte) error {
	tmp := ClusterCopy{}
	err := json.Unmarshal(data, &tmp)
	if err != nil {
		return err
	}
	tmp2 := Cluster(tmp)
	*c = tmp2
	return nil
}

func (cl *ClusterList) UnmarshalJSON(data []byte) error {
	tmp := ClusterListCopy{}
	err := json.Unmarshal(data, &tmp)
	if err != nil {
		return err
	}
	tmp2 := ClusterList(tmp)
	*cl = tmp2
	return nil
}
```
  • Loading branch information
hongchaodeng committed Feb 14, 2017
1 parent c7d5e7b commit 1ba20bc
Show file tree
Hide file tree
Showing 17 changed files with 162 additions and 126 deletions.
2 changes: 1 addition & 1 deletion client/experimentalclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewOperator(namespace string) (Operator, error) {

func (o *operator) Create(ctx context.Context, name string, cspec spec.ClusterSpec) error {
cluster := &spec.Cluster{
ObjectMeta: v1.ObjectMeta{
Metadata: v1.ObjectMeta{
Name: name,
},
Spec: cspec,
Expand Down
20 changes: 10 additions & 10 deletions pkg/cluster/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ func (bm *backupManager) setupStorage() (s backupstorage.Storage, err error) {
b := cl.Spec.Backup
switch b.StorageType {
case spec.BackupStorageTypePersistentVolume, spec.BackupStorageTypeDefault:
s, err = backupstorage.NewPVStorage(c.KubeCli, cl.Name, cl.Namespace, c.PVProvisioner, *b)
s, err = backupstorage.NewPVStorage(c.KubeCli, cl.Metadata.Name, cl.Metadata.Namespace, c.PVProvisioner, *b)
case spec.BackupStorageTypeS3:
s, err = backupstorage.NewS3Storage(c.S3Context, c.KubeCli, cl.Name, cl.Namespace, *b)
s, err = backupstorage.NewS3Storage(c.S3Context, c.KubeCli, cl.Metadata.Name, cl.Metadata.Namespace, *b)
}
return s, err
}

func (bm *backupManager) setup() error {
r := bm.cluster.Spec.Restore
restoreSameNameCluster := r != nil && r.BackupClusterName == bm.cluster.Name
restoreSameNameCluster := r != nil && r.BackupClusterName == bm.cluster.Metadata.Name

// There is only one case that we don't need to create underlying storage.
// That is, the storage already exists and we are restoring cluster from it.
Expand All @@ -77,7 +77,7 @@ func (bm *backupManager) setup() error {

if r != nil {
bm.logger.Infof("restoring cluster from existing backup (%s)", r.BackupClusterName)
if bm.cluster.Name != r.BackupClusterName {
if bm.cluster.Metadata.Name != r.BackupClusterName {
if err := bm.s.Clone(r.BackupClusterName); err != nil {
return err
}
Expand All @@ -89,13 +89,13 @@ func (bm *backupManager) setup() error {

func (bm *backupManager) runSidecar() error {
cl, c := bm.cluster, bm.config
podSpec, err := k8sutil.MakeBackupPodSpec(cl.Name, cl.Spec.Backup)
podSpec, err := k8sutil.MakeBackupPodSpec(cl.Metadata.Name, cl.Spec.Backup)
if err != nil {
return err
}
switch cl.Spec.Backup.StorageType {
case spec.BackupStorageTypeDefault, spec.BackupStorageTypePersistentVolume:
podSpec = k8sutil.PodSpecWithPV(podSpec, cl.Name)
podSpec = k8sutil.PodSpecWithPV(podSpec, cl.Metadata.Name)
case spec.BackupStorageTypeS3:
podSpec = k8sutil.PodSpecWithS3(podSpec, c.S3Context)
}
Expand All @@ -110,8 +110,8 @@ func (bm *backupManager) runSidecar() error {
}

func (bm *backupManager) createBackupReplicaSet(podSpec v1.PodSpec) error {
rs := k8sutil.NewBackupReplicaSetManifest(bm.cluster.Name, podSpec, bm.cluster.AsOwner())
_, err := bm.config.KubeCli.Extensions().ReplicaSets(bm.cluster.Namespace).Create(rs)
rs := k8sutil.NewBackupReplicaSetManifest(bm.cluster.Metadata.Name, podSpec, bm.cluster.AsOwner())
_, err := bm.config.KubeCli.Extensions().ReplicaSets(bm.cluster.Metadata.Namespace).Create(rs)
if err != nil {
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
return err
Expand All @@ -121,8 +121,8 @@ func (bm *backupManager) createBackupReplicaSet(podSpec v1.PodSpec) error {
}

func (bm *backupManager) createBackupService() error {
svc := k8sutil.NewBackupServiceManifest(bm.cluster.Name, bm.cluster.AsOwner())
_, err := bm.config.KubeCli.Core().Services(bm.cluster.Namespace).Create(svc)
svc := k8sutil.NewBackupServiceManifest(bm.cluster.Metadata.Name, bm.cluster.AsOwner())
_, err := bm.config.KubeCli.Core().Services(bm.cluster.Metadata.Namespace).Create(svc)
if err != nil {
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
return err
Expand Down
39 changes: 20 additions & 19 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ type Cluster struct {
}

func New(config Config, e *spec.Cluster, stopC <-chan struct{}, wg *sync.WaitGroup) *Cluster {
lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", e.Name)
lg := logrus.WithField("pkg", "cluster").WithField("cluster-name", e.Metadata.Name)
c := &Cluster{
logger: lg,
config: config,
cluster: e,
eventCh: make(chan *clusterEvent, 100),
stopCh: make(chan struct{}),
status: e.Status.Copy(),
gc: garbagecollection.New(config.KubeCli, e.Namespace),
gc: garbagecollection.New(config.KubeCli, e.Metadata.Namespace),
}

wg.Add(1)
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *Cluster) create() error {
return fmt.Errorf("cluster create: failed to update cluster phase (%v): %v", spec.ClusterPhaseCreating, err)
}

c.gc.CollectCluster(c.cluster.Name, c.cluster.UID)
c.gc.CollectCluster(c.cluster.Metadata.Name, c.cluster.Metadata.UID)

if c.bm != nil {
if err := c.bm.setup(); err != nil {
Expand Down Expand Up @@ -333,7 +333,7 @@ func isFatalError(err error) bool {
}

func (c *Cluster) makeSeedMember() *etcdutil.Member {
etcdName := etcdutil.CreateMemberName(c.cluster.Name, c.memberCounter)
etcdName := etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter)
return &etcdutil.Member{Name: etcdName}
}

Expand Down Expand Up @@ -365,7 +365,7 @@ func (c *Cluster) Update(e *spec.Cluster) {
}

func (c *Cluster) delete() {
c.gc.CollectCluster(c.cluster.Name, garbagecollection.NullUID)
c.gc.CollectCluster(c.cluster.Metadata.Name, garbagecollection.NullUID)

if c.bm != nil {
if err := c.bm.cleanup(); err != nil {
Expand All @@ -375,7 +375,7 @@ func (c *Cluster) delete() {
}

func (c *Cluster) createClientServiceLB() error {
if _, err := k8sutil.CreateEtcdService(c.config.KubeCli, c.cluster.Name, c.cluster.Namespace, c.cluster.AsOwner()); err != nil {
if _, err := k8sutil.CreateEtcdService(c.config.KubeCli, c.cluster.Metadata.Name, c.cluster.Metadata.Namespace, c.cluster.AsOwner()); err != nil {
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
return err
}
Expand All @@ -384,7 +384,7 @@ func (c *Cluster) createClientServiceLB() error {
}

func (c *Cluster) deleteClientServiceLB() error {
err := c.config.KubeCli.Core().Services(c.cluster.Namespace).Delete(c.cluster.Name, nil)
err := c.config.KubeCli.Core().Services(c.cluster.Metadata.Namespace).Delete(c.cluster.Metadata.Name, nil)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
Expand All @@ -399,11 +399,11 @@ func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Me
token = uuid.New()
}

pod := k8sutil.MakeEtcdPod(m, members.PeerURLPairs(), c.cluster.Name, state, token, c.cluster.Spec, c.cluster.AsOwner())
pod := k8sutil.MakeEtcdPod(m, members.PeerURLPairs(), c.cluster.Metadata.Name, state, token, c.cluster.Spec, c.cluster.AsOwner())
if needRecovery {
k8sutil.AddRecoveryToPod(pod, c.cluster.Name, m.Name, token, c.cluster.Spec)
k8sutil.AddRecoveryToPod(pod, c.cluster.Metadata.Name, m.Name, token, c.cluster.Spec)
}
p, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
p, err := c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).Create(pod)
if err != nil {
return err
}
Expand All @@ -413,14 +413,14 @@ func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Me
// Before that, this member is "partitioned".
// Failure case 2: service belongs to previous pod and waits to be GC-ed. On such case, we are OK to return on this method.
// Once the service is GC-ed, it's the same as case 1, and we relies on liveness probe to delete the pod.
svc := k8sutil.NewMemberServiceManifest(m.Name, c.cluster.Name, metatypes.OwnerReference{
svc := k8sutil.NewMemberServiceManifest(m.Name, c.cluster.Metadata.Name, metatypes.OwnerReference{
// The Pod result from kubecli doesn't contain TypeMeta.
APIVersion: "v1",
Kind: "Pod",
Name: p.Name,
UID: p.UID,
})
if _, err := k8sutil.CreateMemberService(c.config.KubeCli, c.cluster.Namespace, svc); err != nil {
if _, err := k8sutil.CreateMemberService(c.config.KubeCli, c.cluster.Metadata.Namespace, svc); err != nil {
if !k8sutil.IsKubernetesResourceAlreadyExistError(err) {
return err
}
Expand All @@ -429,14 +429,14 @@ func (c *Cluster) createPodAndService(members etcdutil.MemberSet, m *etcdutil.Me
}

func (c *Cluster) removePodAndService(name string) error {
err := c.config.KubeCli.Core().Services(c.cluster.Namespace).Delete(name, nil)
err := c.config.KubeCli.Core().Services(c.cluster.Metadata.Namespace).Delete(name, nil)
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
return err
}
}

err = c.config.KubeCli.Core().Pods(c.cluster.Namespace).Delete(
err = c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).Delete(
name,
api.NewDeleteOptions(podTerminationGracePeriod),
)
Expand All @@ -449,7 +449,7 @@ func (c *Cluster) removePodAndService(name string) error {
}

func (c *Cluster) pollPods() ([]*v1.Pod, []*v1.Pod, error) {
podList, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Name))
podList, err := c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).List(k8sutil.ClusterListOpt(c.cluster.Metadata.Name))
if err != nil {
return nil, nil, fmt.Errorf("failed to list running pods: %v", err)
}
Expand All @@ -462,8 +462,9 @@ func (c *Cluster) pollPods() ([]*v1.Pod, []*v1.Pod, error) {
c.logger.Warningf("pollPods: ignore pod %v: no owner", pod.Name)
continue
}
if pod.OwnerReferences[0].UID != c.cluster.UID {
c.logger.Warningf("pollPods: ignore pod %v: owner (%v) is not %v", pod.Name, pod.OwnerReferences[0].UID, c.cluster.UID)
if pod.OwnerReferences[0].UID != c.cluster.Metadata.UID {
c.logger.Warningf("pollPods: ignore pod %v: owner (%v) is not %v",
pod.Name, pod.OwnerReferences[0].UID, c.cluster.Metadata.UID)
continue
}
switch pod.Status.Phase {
Expand All @@ -484,7 +485,7 @@ func (c *Cluster) updateStatus() error {

newCluster := c.cluster
newCluster.Status = c.status
newCluster, err := k8sutil.UpdateClusterTPRObject(c.config.KubeCli.Core().GetRESTClient(), c.cluster.GetNamespace(), newCluster)
newCluster, err := k8sutil.UpdateClusterTPRObject(c.config.KubeCli.Core().GetRESTClient(), c.cluster.Metadata.Namespace, newCluster)
if err != nil {
return err
}
Expand All @@ -501,7 +502,7 @@ func (c *Cluster) reportFailedStatus() {
return true, nil
}
if apierrors.IsConflict(err) {
cl, err := k8sutil.GetClusterTPRObject(c.config.KubeCli.Core().GetRESTClient(), c.cluster.Namespace, c.cluster.Name)
cl, err := k8sutil.GetClusterTPRObject(c.config.KubeCli.Core().GetRESTClient(), c.cluster.Metadata.Namespace, c.cluster.Metadata.Name)
if err != nil {
// Update (PUT) with UID set will return conflict even if object is deleted.
// Because it will check UID first and return something like: "Precondition failed: UID in precondition: 0xc42712c0f0, UID in object meta: ".
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Cluster) addOneMember() error {
}
defer etcdcli.Close()

newMemberName := etcdutil.CreateMemberName(c.cluster.Name, c.memberCounter)
newMemberName := etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter)
newMember := &etcdutil.Member{Name: newMemberName}
ctx, _ := context.WithTimeout(context.Background(), constants.DefaultRequestTimeout)
resp, err := etcdcli.MemberAdd(ctx, []string{newMember.PeerAddr()})
Expand Down Expand Up @@ -195,7 +195,7 @@ func (c *Cluster) disasterRecovery(left etcdutil.MemberSet) error {
backupNow := false
if len(left) > 0 {
c.logger.Infof("pods are still running (%v). Will try to make a latest backup from one of them.", left)
err := requestBackup(c.cluster.Name)
err := requestBackup(c.cluster.Metadata.Name)
if err != nil {
c.logger.Errorln(err)
} else {
Expand All @@ -207,7 +207,7 @@ func (c *Cluster) disasterRecovery(left etcdutil.MemberSet) error {
} else {
// We don't return error if backupnow failed. Instead, we ask if there is previous backup.
// If so, we can still continue. Otherwise, it's fatal error.
exist, err := checkBackupExist(c.cluster.Name, c.cluster.Spec.Version)
exist, err := checkBackupExist(c.cluster.Metadata.Name, c.cluster.Spec.Version)
if err != nil {
c.logger.Errorln(err)
return err
Expand Down
18 changes: 9 additions & 9 deletions pkg/cluster/self_hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
func (c *Cluster) addOneSelfHostedMember() error {
c.status.AppendScalingUpCondition(c.members.Size(), c.cluster.Spec.Size)

newMemberName := etcdutil.CreateMemberName(c.cluster.Name, c.memberCounter)
newMemberName := etcdutil.CreateMemberName(c.cluster.Metadata.Name, c.memberCounter)
c.memberCounter++

peerURL := "http://$(MY_POD_IP):2380"
initialCluster := append(c.members.PeerURLPairs(), newMemberName+"="+peerURL)

pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Name, "existing", "", c.cluster.Spec, c.cluster.AsOwner())
pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Metadata.Name, "existing", "", c.cluster.Spec, c.cluster.AsOwner())
pod = k8sutil.PodWithAddMemberInitContainer(pod, c.members.ClientURLs(), newMemberName, []string{peerURL}, c.cluster.Spec)

_, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Create(pod)
_, err := c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).Create(pod)
if err != nil {
return err
}
Expand Down Expand Up @@ -67,12 +67,12 @@ func (c *Cluster) addOneSelfHostedMember() error {
}

func (c *Cluster) newSelfHostedSeedMember() error {
newMemberName := fmt.Sprintf("%s-%04d", c.cluster.Name, c.memberCounter)
newMemberName := fmt.Sprintf("%s-%04d", c.cluster.Metadata.Name, c.memberCounter)
c.memberCounter++
initialCluster := []string{newMemberName + "=http://$(MY_POD_IP):2380"}

pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Name, "new", uuid.New(), c.cluster.Spec, c.cluster.AsOwner())
_, err := k8sutil.CreateAndWaitPod(c.config.KubeCli, c.cluster.Namespace, pod, 30*time.Second)
pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Metadata.Name, "new", uuid.New(), c.cluster.Spec, c.cluster.AsOwner())
_, err := k8sutil.CreateAndWaitPod(c.config.KubeCli, c.cluster.Metadata.Namespace, pod, 30*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,15 +101,15 @@ func (c *Cluster) migrateBootMember() error {
}

// create the member inside Kubernetes for migration
newMemberName := fmt.Sprintf("%s-%04d", c.cluster.Name, c.memberCounter)
newMemberName := fmt.Sprintf("%s-%04d", c.cluster.Metadata.Name, c.memberCounter)
c.memberCounter++

peerURL := "http://$(MY_POD_IP):2380"
initialCluster = append(initialCluster, newMemberName+"="+peerURL)

pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Name, "existing", "", c.cluster.Spec, c.cluster.AsOwner())
pod := k8sutil.MakeSelfHostedEtcdPod(newMemberName, initialCluster, c.cluster.Metadata.Name, "existing", "", c.cluster.Spec, c.cluster.AsOwner())
pod = k8sutil.PodWithAddMemberInitContainer(pod, []string{endpoint}, newMemberName, []string{peerURL}, c.cluster.Spec)
pod, err = k8sutil.CreateAndWaitPod(c.config.KubeCli, c.cluster.Namespace, pod, 30*time.Second)
pod, err = k8sutil.CreateAndWaitPod(c.config.KubeCli, c.cluster.Metadata.Namespace, pod, 30*time.Second)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
func (c *Cluster) upgradeOneMember(m *etcdutil.Member) error {
c.status.AppendUpgradingCondition(c.cluster.Spec.Version, m.Name)

pod, err := c.config.KubeCli.Core().Pods(c.cluster.Namespace).Get(m.Name)
pod, err := c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).Get(m.Name)
if err != nil {
return fmt.Errorf("fail to get pod (%s): %v", m.Name, err)
}
c.logger.Infof("upgrading the etcd member %v from %s to %s", m.Name, k8sutil.GetEtcdVersion(pod), c.cluster.Spec.Version)
pod.Spec.Containers[0].Image = k8sutil.MakeEtcdImage(c.cluster.Spec.Version)
k8sutil.SetEtcdVersion(pod, c.cluster.Spec.Version)
_, err = c.config.KubeCli.Core().Pods(c.cluster.Namespace).Update(pod)
_, err = c.config.KubeCli.Core().Pods(c.cluster.Metadata.Namespace).Update(pod)
if err != nil {
return fmt.Errorf("fail to update the etcd member (%s): %v", m.Name, err)
}
Expand Down
Loading

0 comments on commit 1ba20bc

Please sign in to comment.