Skip to content

Commit

Permalink
pkg/security(ticdc): support online load tls config (#7927)
Browse files Browse the repository at this point in the history
close #7908
  • Loading branch information
CharlesCheung96 authored Dec 20, 2022
1 parent 45ee743 commit ee7f68f
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 152 deletions.
101 changes: 69 additions & 32 deletions cdc/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/retry"
security2 "github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/tempurl"
Expand Down Expand Up @@ -174,12 +174,12 @@ const retryTime = 20
func TestServerTLSWithoutCommonName(t *testing.T) {
addr := tempurl.Alloc()[len("http://"):]
// Do not specify common name
security, err := security2.NewCredential4Test("")
_, securityCfg, err := security.NewServerCredential4Test("")
require.Nil(t, err)
conf := config.GetDefaultServerConfig()
conf.Addr = addr
conf.AdvertiseAddr = addr
conf.Security = &security
conf.Security = securityCfg
config.StoreGlobalServerConfig(conf)

server, err := New([]string{"https://127.0.0.1:2379"})
Expand All @@ -205,7 +205,7 @@ func TestServerTLSWithoutCommonName(t *testing.T) {
go func() {
defer wg.Done()
err := server.tcpServer.Run(ctx)
require.Contains(t, err.Error(), "ErrTCPServerClosed")
require.ErrorContains(t, err, "ErrTCPServerClosed")
}()

// test cli sends request without a cert will success
Expand All @@ -227,12 +227,13 @@ func TestServerTLSWithoutCommonName(t *testing.T) {
require.Nil(t, err)
require.Equal(t, info.ID, captureInfo.ID)
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
require.Nil(t, err)

// test cli sends request with a cert will success
err = retry.Do(ctx, func() error {
cli, err := httputil.NewClient(&security)
cli, err := httputil.NewClient(securityCfg)
require.Nil(t, err)
resp, err := cli.Get(ctx, statusURL)
if err != nil {
Expand All @@ -247,22 +248,25 @@ func TestServerTLSWithoutCommonName(t *testing.T) {
require.Equal(t, info.ID, captureInfo.ID)
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
require.Nil(t, err)

cancel()
wg.Wait()
}

func TestServerTLSWithCommonName(t *testing.T) {
func TestServerTLSWithCommonNameAndRotate(t *testing.T) {
addr := tempurl.Alloc()[len("http://"):]
// specify a common name
security, err := security2.NewCredential4Test("test")
ca, securityCfg, err := security.NewServerCredential4Test("server")
securityCfg.CertAllowedCN = append(securityCfg.CertAllowedCN, "client1")
require.Nil(t, err)

conf := config.GetDefaultServerConfig()
conf.Addr = addr
conf.AdvertiseAddr = addr
conf.Security = &security
conf.Security = securityCfg
config.StoreGlobalServerConfig(conf)

server, err := New([]string{"https://127.0.0.1:2379"})
Expand All @@ -280,15 +284,15 @@ func TestServerTLSWithCommonName(t *testing.T) {
}()

statusURL := fmt.Sprintf("https://%s/api/v1/status", addr)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := server.tcpServer.Run(ctx)
require.Contains(t, err.Error(), "ErrTCPServerClosed")
require.ErrorContains(t, err, "ErrTCPServerClosed")
}()

// test cli sends request without a cert will fail
Expand All @@ -311,26 +315,59 @@ func TestServerTLSWithCommonName(t *testing.T) {
require.Equal(t, info.ID, captureInfo.ID)
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
require.Contains(t, err.Error(), "remote error: tls: bad certificate")
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
require.ErrorContains(t, err, "remote error: tls: bad certificate")

testTlSClient := func(securityCfg *security.Credential) error {
return retry.Do(ctx, func() error {
cli, err := httputil.NewClient(securityCfg)
require.Nil(t, err)
resp, err := cli.Get(ctx, statusURL)
if err != nil {
return err
}
decoder := json.NewDecoder(resp.Body)
captureInfo := &model.CaptureInfo{}
err = decoder.Decode(captureInfo)
require.Nil(t, err)
info, err := server.capture.Info()
require.Nil(t, err)
require.Equal(t, info.ID, captureInfo.ID)
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
}

// test cli sends request with a cert will success
err = retry.Do(ctx, func() error {
cli, err := httputil.NewClient(&security)
require.Nil(t, err)
resp, err := cli.Get(ctx, statusURL)
if err != nil {
return err
}
decoder := json.NewDecoder(resp.Body)
captureInfo := &model.CaptureInfo{}
err = decoder.Decode(captureInfo)
require.Nil(t, err)
info, err := server.capture.Info()
require.Nil(t, err)
require.Equal(t, info.ID, captureInfo.ID)
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
require.Nil(t, err)

// test peer success
require.NoError(t, testTlSClient(securityCfg))

// test rotate
serverCert, serverkey, err := ca.GenerateCerts("rotate")
require.NoError(t, err)
err = os.WriteFile(securityCfg.CertPath, serverCert, 0o600)
require.NoError(t, err)
err = os.WriteFile(securityCfg.KeyPath, serverkey, 0o600)
require.NoError(t, err)
// peer fail due to invalid common name `rotate`
require.ErrorContains(t, testTlSClient(securityCfg), "client certificate authentication failed")

cert, key, err := ca.GenerateCerts("client1")
require.NoError(t, err)
certPath, err := security.WriteFile("ticdc-test-client-cert", cert)
require.NoError(t, err)
keyPath, err := security.WriteFile("ticdc-test-client-key", key)
require.NoError(t, err)
require.NoError(t, testTlSClient(&security.Credential{
CAPath: securityCfg.CAPath,
CertPath: certPath,
KeyPath: keyPath,
CertAllowedCN: []string{"rotate"},
}))

cancel()
wg.Wait()
}
10 changes: 8 additions & 2 deletions engine/pkg/client/executor_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package client

import (
"os"
"testing"
"time"

Expand All @@ -34,10 +35,15 @@ func TestNewExecutorClientNotBlocking(t *testing.T) {
func TestExecutorClientFactoryIllegalCredentials(t *testing.T) {
t.Parallel()

dir := t.TempDir()
caPath := dir + "/ca.pem"
err := os.WriteFile(caPath, []byte("invalid ca pem"), 0o600)
require.NoError(t, err)

credentials := &security.Credential{
CAPath: "/dev/null", // illegal CA path to trigger an error
CAPath: caPath, // illegal CA path to trigger an error
}
factory := newExecutorClientFactory(credentials, nil)
_, err := factory.NewExecutorClient("127.0.0.1:1234")
_, err = factory.NewExecutorClient("127.0.0.1:1234")
require.Error(t, err)
}
9 changes: 4 additions & 5 deletions engine/test/e2e/e2e_dm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -387,7 +386,7 @@ func queryStatus(ctx context.Context, client *httputil.Client, jobID string, tas
return nil, err
}

respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -434,7 +433,7 @@ func getJobCfg(ctx context.Context, client *httputil.Client, jobID string) (stri
return "", err
}

respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -475,7 +474,7 @@ func getBinlogOperator(ctx context.Context, client *httputil.Client, jobID strin
return nil, err
}

respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -537,7 +536,7 @@ func getBinlogSchema(ctx context.Context, client *httputil.Client, jobID string,
return nil, err
}

respBody, err := ioutil.ReadAll(resp.Body)
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/cli/cli_changefeed_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package cli

import (
"bytes"
"io/ioutil"
"io"
"os"
"testing"

Expand Down Expand Up @@ -88,7 +88,7 @@ func TestChangefeedListCli(t *testing.T) {
// when --all=false, should contains StateNormal, StateError, StateFailed, StateStopped changefeed
os.Args = []string{"list", "--all=false"}
require.Nil(t, cmd.Execute())
out, err := ioutil.ReadAll(b)
out, err := io.ReadAll(b)
require.Nil(t, err)
require.Contains(t, string(out), "error-1")
require.Contains(t, string(out), "normal-2")
Expand All @@ -98,7 +98,7 @@ func TestChangefeedListCli(t *testing.T) {
// when --all=true, should contains all changefeed
os.Args = []string{"list", "--all=true"}
require.Nil(t, cmd.Execute())
out, err = ioutil.ReadAll(b)
out, err = io.ReadAll(b)
require.Nil(t, err)
require.Contains(t, string(out), "error-1")
require.Contains(t, string(out), "normal-2")
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/cli/cli_changefeed_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package cli

import (
"bytes"
"io/ioutil"
"io"
"os"
"testing"

Expand Down Expand Up @@ -83,7 +83,7 @@ func TestChangefeedQueryCli(t *testing.T) {
b := bytes.NewBufferString("")
cmd.SetOut(b)
require.Nil(t, o.run(cmd))
out, err := ioutil.ReadAll(b)
out, err := io.ReadAll(b)
require.Nil(t, err)
// make sure config is printed
require.Contains(t, string(out), "config")
Expand Down
3 changes: 1 addition & 2 deletions pkg/fsutil/preallocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
package fsutil

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func TestPreAllocate(t *testing.T) {
f, err := ioutil.TempFile("", "preallocate-test")
f, err := os.CreateTemp("", "preallocate-test")
defer os.Remove(f.Name())
require.Nil(t, err)

Expand Down
Loading

0 comments on commit ee7f68f

Please sign in to comment.