Skip to content

Commit

Permalink
fix stability issues (#433)
Browse files Browse the repository at this point in the history
* fix stability:
* sst truncate case
* block writer channel size to 100
* fix poll interval error
* fix pprof server port error
* fix backup query count issue
  • Loading branch information
weekface authored Apr 28, 2019
1 parent 9cbb992 commit c7b154e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 77 deletions.
8 changes: 4 additions & 4 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/pingcap/tidb-operator/tests/pkg/metrics"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/golang/glog"
pingcapErrors "github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb-operator/tests/pkg/webhook"
admissionV1beta1 "k8s.io/api/admissionregistration/v1beta1"
"k8s.io/api/apps/v1beta1"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -49,7 +47,9 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/tests/pkg/blockwriter"
"github.com/pingcap/tidb-operator/tests/pkg/metrics"
"github.com/pingcap/tidb-operator/tests/pkg/util"
"github.com/pingcap/tidb-operator/tests/pkg/webhook"
)

const (
Expand Down Expand Up @@ -443,7 +443,7 @@ func (oa *operatorActions) CheckTidbClusterStatus(info *TidbClusterConfig) error

ns := info.Namespace
tcName := info.ClusterName
if err := wait.Poll(oa.pollInterval, DefaultPollTimeout, func() (bool, error) {
if err := wait.Poll(oa.pollInterval, 30*time.Minute, func() (bool, error) {
var tc *v1alpha1.TidbCluster
var err error
if tc, err = oa.cli.PingcapV1alpha1().TidbClusters(ns).Get(tcName, metav1.GetOptions{}); err != nil {
Expand Down Expand Up @@ -1524,7 +1524,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster
return true, nil
}

err := wait.Poll(oa.pollInterval, DefaultPollTimeout, fn)
err := wait.Poll(oa.pollInterval, 30*time.Minute, fn)
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}
Expand Down
42 changes: 27 additions & 15 deletions tests/backup/backupcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/tests"
"k8s.io/apimachinery/pkg/util/wait"
)

type BackupCase struct {
Expand Down Expand Up @@ -110,25 +111,36 @@ func (bc *BackupCase) Run() error {
glog.Infof("cluster[%s] stop insert data", bc.srcCluster.ClusterName)
bc.operator.StopInsertDataTo(bc.srcCluster)

time.Sleep(5 * time.Second)

srcCount, err := bc.srcCluster.QueryCount()
if err != nil {
return err
}
desCount, err := bc.desCluster.QueryCount()
if err != nil {
return err
}
if srcCount != desCount {
return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

return nil
return bc.EnsureBackupDataIsCorrect()
}

func (bc *BackupCase) RunOrDie() {
if err := bc.Run(); err != nil {
panic(err)
}
}

func (bc *BackupCase) EnsureBackupDataIsCorrect() error {
fn := func() (bool, error) {
srcCount, err := bc.srcCluster.QueryCount()
if err != nil {
glog.Infof("failed to query count from src cluster: %s/%s",
bc.srcCluster.Namespace, bc.srcCluster.ClusterName)
return false, nil
}
desCount, err := bc.desCluster.QueryCount()
if err != nil {
glog.Infof("failed to query count from dest cluster: %s/%s",
bc.desCluster.Namespace, bc.desCluster.ClusterName)
return false, nil
}

if srcCount != desCount {
return false, fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

return true, nil
}

return wait.Poll(tests.DefaultPollInterval, tests.DefaultPollTimeout, fn)
}
4 changes: 2 additions & 2 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func main() {
logs.InitLogs()
defer logs.FlushLogs()
go func() {
glog.Info(http.ListenAndServe("localhost:6060", nil))
glog.Info(http.ListenAndServe(":6060", nil))
}()

conf := tests.ParseConfigOrDie()
cli, kubeCli := client.NewCliOrDie()
oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollTimeout, conf)
oa := tests.NewOperatorActions(cli, kubeCli, tests.DefaultPollInterval, conf)
fta := tests.NewFaultTriggerAction(cli, kubeCli, conf)
fta.CheckAndRecoverEnvOrDie()

Expand Down
65 changes: 14 additions & 51 deletions tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/tests/pkg/client"
"github.com/pingcap/tidb-operator/tests/pkg/ops"
"github.com/pingcap/tidb-operator/tests/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -52,7 +51,7 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon
return err
}
maxStoreDownTime := pdCfg.Schedule.MaxStoreDownTime.Duration
glog.Infof("failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod)
glog.Infof("truncate sst file failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod)

// find an up store
var store v1alpha1.TiKVStore
Expand All @@ -67,69 +66,33 @@ func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterCon
glog.Errorf("failed to find an up store")
return errors.New("no up store for truncating sst file")
} else {
glog.Infof("target store: id=%s pod=%s", store.ID, store.PodName)
glog.Infof("truncate sst file target store: id=%s pod=%s", store.ID, store.PodName)
}

// checkout pod status
podBeforeRestart, err := cli.CoreV1().Pods(info.Namespace).Get(store.PodName, metav1.GetOptions{})
glog.Infof("deleting pod: [%s/%s] and wait 1 minute for the pod to terminate", info.Namespace, store.PodName)
err = cli.CoreV1().Pods(info.Namespace).Delete(store.PodName, nil)
if err != nil {
glog.Errorf("failed to get target pod: pod=%s err=%s", store.PodName, err.Error())
return err
}

var rc int32
if c := util.GetContainerStatusFromPod(podBeforeRestart, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
}); c != nil {
rc = c.RestartCount
} else {
glog.Errorf("failed to get container status from tikv pod")
return errors.New("failed to get container status from tikv pod")
}

oa.emitEvent(info, fmt.Sprintf("TruncateSSTFile: tikv: %s", store.PodName))
// restart tikv to ensure sst files
err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", "tikv-server")
if err != nil {
glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error())
return err
}

err = tikvOps.PollPod(info.Namespace, store.PodName,
func(pod *corev1.Pod, err error) (bool, error) {
if pod == nil {
glog.Warningf("pod is nil: err=%s", err.Error())
return false, nil
}
tikv := util.GetContainerStatusFromPod(pod, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
})

if pod.Status.Phase == corev1.PodRunning && tikv != nil && tikv.RestartCount > rc {
return true, nil
}
return false, nil
})
if err != nil {
glog.Errorf("tikv process hasn't been restarted: err=%s", err.Error())
return err
}
time.Sleep(1 * time.Minute)

// truncate the sst file and wait for failover
err = tikvOps.TruncateSSTFile(ops.TruncateOptions{
Namespace: info.Namespace,
Cluster: info.ClusterName,
Store: store.ID,
})
if err != nil {
return err
}
oa.emitEvent(info, fmt.Sprintf("TruncateSSTFile: tikv: %s/%s", info.Namespace, store.PodName))

// make tikv crash
//err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", "tikv-server")
//if err != nil {
// glog.Errorf("cluster: [%s/%s] kill tikv: pod=%s err=%s",
// info.Namespace, info.ClusterName,
// store.PodName, err.Error())
// return err
//}
glog.Infof("deleting pod: [%s/%s] again", info.Namespace, store.PodName)
err = cli.CoreV1().Pods(info.Namespace).Delete(store.PodName, nil)
if err != nil {
return err
}

tikvOps.SetPoll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout)

Expand Down
2 changes: 1 addition & 1 deletion tests/pkg/blockwriter/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
queryChanSize int = 10000
queryChanSize int = 100
)

// BlockWriterCase is for concurrent writing blocks.
Expand Down
9 changes: 5 additions & 4 deletions tests/pkg/metrics/annotation_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

//Client request grafana API on a set of resource paths.
Expand Down Expand Up @@ -133,11 +134,11 @@ func (cli *Client) AddAnnotation(annotation Annotation) error {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("add annotation faield, statusCode=%v", resp.Status)
}
all, err := ioutil.ReadAll(resp.Body)
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
fmt.Println(all)
// fmt.Println(all)

return nil
}
Expand Down

0 comments on commit c7b154e

Please sign in to comment.