diff --git a/br/pkg/lightning/backend/local/mockserver/import_server.go b/br/pkg/lightning/backend/local/mockserver/import_server.go new file mode 100644 index 0000000000000..37defe7955062 --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/import_server.go @@ -0,0 +1,160 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockserver + +import ( + "context" + "fmt" + "io" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "google.golang.org/grpc/metadata" +) + +type WriteServer struct{} + +func (w *WriteServer) SendAndClose(response *import_sstpb.WriteResponse) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) Recv() (*import_sstpb.WriteRequest, error) { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SetHeader(md metadata.MD) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SendHeader(md metadata.MD) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SetTrailer(md metadata.MD) { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) Context() context.Context { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) SendMsg(m interface{}) error { + //TODO implement me + panic("implement me") +} + +func (w *WriteServer) RecvMsg(m interface{}) error { + //TODO implement me + panic("implement me") +} + +type MockImportSSTServer struct{} + +func (m *MockImportSSTServer) SwitchMode(ctx context.Context, request *import_sstpb.SwitchModeRequest) (*import_sstpb.SwitchModeResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) GetMode(ctx context.Context, request *import_sstpb.GetModeRequest) (*import_sstpb.GetModeResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Upload(server import_sstpb.ImportSST_UploadServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Ingest(ctx context.Context, request *import_sstpb.IngestRequest) (*import_sstpb.IngestResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Compact(ctx context.Context, request *import_sstpb.CompactRequest) (*import_sstpb.CompactResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) SetDownloadSpeedLimit(ctx context.Context, request *import_sstpb.SetDownloadSpeedLimitRequest) (*import_sstpb.SetDownloadSpeedLimitResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Download(ctx context.Context, request *import_sstpb.DownloadRequest) (*import_sstpb.DownloadResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Write(server import_sstpb.ImportSST_WriteServer) error { + fmt.Println("Write") + for { + request, err := server.Recv() + if err == io.EOF { + fmt.Println("EOF") + return nil + } + if err != nil { + fmt.Printf("err: %v\n", err) + return err + } + if meta := request.GetMeta(); meta != nil { + fmt.Printf("meta: %v\n", meta.String()) + } + if batch := request.GetBatch(); batch != nil { + fmt.Printf("chunk: %v\n", batch.String()) + } + err = server.SendAndClose(&import_sstpb.WriteResponse{ + Metas: []*import_sstpb.SSTMeta{ + { + Uuid: []byte("got"), + }, + }, + }) + if err != nil { + fmt.Printf("err: %v\n", err) + return err + } + } +} + +func (m *MockImportSSTServer) RawWrite(server import_sstpb.ImportSST_RawWriteServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) MultiIngest(ctx context.Context, request *import_sstpb.MultiIngestRequest) (*import_sstpb.IngestResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) DuplicateDetect(request *import_sstpb.DuplicateDetectRequest, server import_sstpb.ImportSST_DuplicateDetectServer) error { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) Apply(ctx context.Context, request *import_sstpb.ApplyRequest) (*import_sstpb.ApplyResponse, error) { + //TODO implement me + panic("implement me") +} + +func (m *MockImportSSTServer) ClearFiles(ctx context.Context, request *import_sstpb.ClearRequest) (*import_sstpb.ClearResponse, error) { + //TODO implement me + panic("implement me") +} diff --git a/br/pkg/lightning/backend/local/mockserver/main/main.go b/br/pkg/lightning/backend/local/mockserver/main/main.go new file mode 100644 index 0000000000000..47cb4628bc891 --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/main/main.go @@ -0,0 +1,42 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "net" + "os" + + "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockserver" + "google.golang.org/grpc" +) + +func main() { + portStr := os.Args[1] + lis, err := net.Listen("tcp", ":"+portStr) + if err != nil { + log.Fatal(err) + } + fmt.Println(lis.Addr().String()) + fmt.Printf("PID: %d\n", os.Getpid()) + + server := grpc.NewServer() + import_sstpb.RegisterImportSSTServer(server, &mockserver.MockImportSSTServer{}) + if err := server.Serve(lis); err != nil { + log.Fatal(err) + } +} diff --git a/br/pkg/lightning/backend/local/mockserver/main2/main.go b/br/pkg/lightning/backend/local/mockserver/main2/main.go new file mode 100644 index 0000000000000..7306ba71a43c1 --- /dev/null +++ b/br/pkg/lightning/backend/local/mockserver/main2/main.go @@ -0,0 +1,60 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "os" + + sst "github.com/pingcap/kvproto/pkg/import_sstpb" + "github.com/pingcap/log" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func main() { + ctx := context.Background() + + conn, err := grpc.DialContext(ctx, os.Args[1], grpc.WithInsecure()) + if err != nil { + log.Fatal("fail to dial", zap.Error(err)) + } + for { + client := sst.NewImportSSTClient(conn) + stream, err := client.Write(ctx) + if err != nil { + log.Error("fail to write", zap.Error(err)) + continue + } + err = stream.Send(&sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: &sst.SSTMeta{ + Uuid: []byte("test"), + }, + }, + }) + if err != nil { + log.Error("fail to send", zap.Error(err)) + continue + } + resp, err := stream.CloseAndRecv() + if err != nil { + log.Error("fail to close and recv", zap.Error(err)) + continue + } + fmt.Printf("resp: %v\n", resp) + } +} diff --git a/br/pkg/lightning/backend/local/region_job_test.go b/br/pkg/lightning/backend/local/region_job_test.go index fd508372ef9f9..a342e61893729 100644 --- a/br/pkg/lightning/backend/local/region_job_test.go +++ b/br/pkg/lightning/backend/local/region_job_test.go @@ -16,6 +16,7 @@ package local import ( "context" + "fmt" "sync" "testing" "time" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/stretchr/testify/require" + "google.golang.org/grpc" ) func TestIsIngestRetryable(t *testing.T) { @@ -257,3 +259,27 @@ func TestRegionJobRetryer(t *testing.T) { cancel() jobWg.Wait() } + +func TestWriteWhenServerIsGone(t *testing.T) { + //t.Skip("need mock server") + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "[::]:40401", grpc.WithInsecure()) + require.NoError(t, err) + for { + client := sst.NewImportSSTClient(conn) + stream, err := client.Write(ctx) + require.NoError(t, err) + err = stream.Send(&sst.WriteRequest{ + Chunk: &sst.WriteRequest_Meta{ + Meta: &sst.SSTMeta{ + Uuid: []byte("test"), + }, + }, + }) + require.NoError(t, err) + resp, err := stream.CloseAndRecv() + require.NoError(t, err) + fmt.Printf("resp: %v\n", resp) + } +}