From 41db812474b3f81e02a0ef82761d6d0f006afc34 Mon Sep 17 00:00:00 2001 From: Leonardo Luz Almeida Date: Tue, 15 Mar 2022 15:06:21 -0400 Subject: [PATCH] feat: remove shared repo volume between repo-server and cmp-server (#8600) feat: remove shared repo volume between repo-server and cmp-server (#8600) Signed-off-by: Leonardo Luz Almeida --- cmpserver/apiclient/plugin.pb.go | 1276 ++++++++--------- cmpserver/plugin/plugin.go | 153 +- cmpserver/plugin/plugin.proto | 52 +- cmpserver/plugin/plugin_test.go | 173 ++- cmpserver/server.go | 12 +- common/common.go | 40 + docs/user-guide/config-management-plugins.md | 8 +- reposerver/repository/repository.go | 167 ++- test/cmp/plugin.yaml | 4 +- test/e2e/testdata/cmp-fileName/plugin.yaml | 4 +- test/testutil.go | 13 + util/app/discovery/discovery.go | 43 +- util/cmp/stream.go | 298 ++++ util/cmp/stream_test.go | 97 ++ util/cmp/testdata/app/README.md | 1 + .../applicationset/latest/kustomization.yaml | 4 + .../applicationset/stable/kustomization.yaml | 4 + util/cmp/testdata/example.tar.gz | Bin 0 -> 322 bytes util/helm/client.go | 18 +- util/io/files/tar.go | 199 +++ util/io/files/tar_test.go | 238 +++ util/io/files/testdata/app/README.md | 1 + .../applicationset/latest/kustomization.yaml | 4 + .../app/applicationset/readme-symlink | 1 + .../applicationset/stable/kustomization.yaml | 4 + .../testdata/symlink-exploit/tar_test_symlink | 1 + util/io/files/util.go | 70 + util/io/files/util_test.go | 91 ++ 28 files changed, 2158 insertions(+), 818 deletions(-) create mode 100644 util/cmp/stream.go create mode 100644 util/cmp/stream_test.go create mode 100644 util/cmp/testdata/app/README.md create mode 100644 util/cmp/testdata/app/applicationset/latest/kustomization.yaml create mode 100644 util/cmp/testdata/app/applicationset/stable/kustomization.yaml create mode 100644 util/cmp/testdata/example.tar.gz create mode 100644 util/io/files/tar.go create mode 100644 util/io/files/tar_test.go create mode 100644 util/io/files/testdata/app/README.md create mode 100644 util/io/files/testdata/app/applicationset/latest/kustomization.yaml create mode 120000 util/io/files/testdata/app/applicationset/readme-symlink create mode 100644 util/io/files/testdata/app/applicationset/stable/kustomization.yaml create mode 120000 util/io/files/testdata/symlink-exploit/tar_test_symlink create mode 100644 util/io/files/util.go create mode 100644 util/io/files/util_test.go diff --git a/cmpserver/apiclient/plugin.pb.go b/cmpserver/apiclient/plugin.pb.go index e05ce2cb974fd..bcffb01cf55bc 100644 --- a/cmpserver/apiclient/plugin.pb.go +++ b/cmpserver/apiclient/plugin.pb.go @@ -27,31 +27,126 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -// ManifestRequest is a query for manifest generation. -type ManifestRequest struct { - // Name of the application for which the request is triggered - AppName string `protobuf:"bytes,1,opt,name=appName,proto3" json:"appName,omitempty"` - AppPath string `protobuf:"bytes,2,opt,name=appPath,proto3" json:"appPath,omitempty"` - RepoPath string `protobuf:"bytes,3,opt,name=repoPath,proto3" json:"repoPath,omitempty"` - NoCache bool `protobuf:"varint,4,opt,name=noCache,proto3" json:"noCache,omitempty"` +// AppStreamRequest is the request object used to send the application's +// files over a stream. +type AppStreamRequest struct { + // Types that are valid to be assigned to Request: + // *AppStreamRequest_Metadata + // *AppStreamRequest_File + Request isAppStreamRequest_Request `protobuf_oneof:"request"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AppStreamRequest) Reset() { *m = AppStreamRequest{} } +func (m *AppStreamRequest) String() string { return proto.CompactTextString(m) } +func (*AppStreamRequest) ProtoMessage() {} +func (*AppStreamRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_b21875a7079a06ed, []int{0} +} +func (m *AppStreamRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AppStreamRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AppStreamRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AppStreamRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AppStreamRequest.Merge(m, src) +} +func (m *AppStreamRequest) XXX_Size() int { + return m.Size() +} +func (m *AppStreamRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AppStreamRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AppStreamRequest proto.InternalMessageInfo + +type isAppStreamRequest_Request interface { + isAppStreamRequest_Request() + MarshalTo([]byte) (int, error) + Size() int +} + +type AppStreamRequest_Metadata struct { + Metadata *ManifestRequestMetadata `protobuf:"bytes,1,opt,name=metadata,proto3,oneof" json:"metadata,omitempty"` +} +type AppStreamRequest_File struct { + File *File `protobuf:"bytes,2,opt,name=file,proto3,oneof" json:"file,omitempty"` +} + +func (*AppStreamRequest_Metadata) isAppStreamRequest_Request() {} +func (*AppStreamRequest_File) isAppStreamRequest_Request() {} + +func (m *AppStreamRequest) GetRequest() isAppStreamRequest_Request { + if m != nil { + return m.Request + } + return nil +} + +func (m *AppStreamRequest) GetMetadata() *ManifestRequestMetadata { + if x, ok := m.GetRequest().(*AppStreamRequest_Metadata); ok { + return x.Metadata + } + return nil +} + +func (m *AppStreamRequest) GetFile() *File { + if x, ok := m.GetRequest().(*AppStreamRequest_File); ok { + return x.File + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*AppStreamRequest) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*AppStreamRequest_Metadata)(nil), + (*AppStreamRequest_File)(nil), + } +} + +// ManifestRequestMetadata defines the metada related to the file being sent +// to the CMP server. +type ManifestRequestMetadata struct { + // appName refers to the ArgoCD Application name + AppName string `protobuf:"bytes,1,opt,name=appName,proto3" json:"appName,omitempty"` + // appRelPath points to the application relative path inside the tarball + AppRelPath string `protobuf:"bytes,2,opt,name=appRelPath,proto3" json:"appRelPath,omitempty"` + // checksum is used to verify the integrity of the file + Checksum string `protobuf:"bytes,3,opt,name=checksum,proto3" json:"checksum,omitempty"` + // size relates to the file size in bytes + Size_ int64 `protobuf:"varint,4,opt,name=size,proto3" json:"size,omitempty"` + // env is a list with the environment variables needed to generate manifests Env []*EnvEntry `protobuf:"bytes,5,rep,name=env,proto3" json:"env,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *ManifestRequest) Reset() { *m = ManifestRequest{} } -func (m *ManifestRequest) String() string { return proto.CompactTextString(m) } -func (*ManifestRequest) ProtoMessage() {} -func (*ManifestRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_b21875a7079a06ed, []int{0} +func (m *ManifestRequestMetadata) Reset() { *m = ManifestRequestMetadata{} } +func (m *ManifestRequestMetadata) String() string { return proto.CompactTextString(m) } +func (*ManifestRequestMetadata) ProtoMessage() {} +func (*ManifestRequestMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_b21875a7079a06ed, []int{1} } -func (m *ManifestRequest) XXX_Unmarshal(b []byte) error { +func (m *ManifestRequestMetadata) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *ManifestRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *ManifestRequestMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_ManifestRequest.Marshal(b, m, deterministic) + return xxx_messageInfo_ManifestRequestMetadata.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -61,47 +156,47 @@ func (m *ManifestRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, err return b[:n], nil } } -func (m *ManifestRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ManifestRequest.Merge(m, src) +func (m *ManifestRequestMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_ManifestRequestMetadata.Merge(m, src) } -func (m *ManifestRequest) XXX_Size() int { +func (m *ManifestRequestMetadata) XXX_Size() int { return m.Size() } -func (m *ManifestRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ManifestRequest.DiscardUnknown(m) +func (m *ManifestRequestMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_ManifestRequestMetadata.DiscardUnknown(m) } -var xxx_messageInfo_ManifestRequest proto.InternalMessageInfo +var xxx_messageInfo_ManifestRequestMetadata proto.InternalMessageInfo -func (m *ManifestRequest) GetAppName() string { +func (m *ManifestRequestMetadata) GetAppName() string { if m != nil { return m.AppName } return "" } -func (m *ManifestRequest) GetAppPath() string { +func (m *ManifestRequestMetadata) GetAppRelPath() string { if m != nil { - return m.AppPath + return m.AppRelPath } return "" } -func (m *ManifestRequest) GetRepoPath() string { +func (m *ManifestRequestMetadata) GetChecksum() string { if m != nil { - return m.RepoPath + return m.Checksum } return "" } -func (m *ManifestRequest) GetNoCache() bool { +func (m *ManifestRequestMetadata) GetSize_() int64 { if m != nil { - return m.NoCache + return m.Size_ } - return false + return 0 } -func (m *ManifestRequest) GetEnv() []*EnvEntry { +func (m *ManifestRequestMetadata) GetEnv() []*EnvEntry { if m != nil { return m.Env } @@ -123,7 +218,7 @@ func (m *EnvEntry) Reset() { *m = EnvEntry{} } func (m *EnvEntry) String() string { return proto.CompactTextString(m) } func (*EnvEntry) ProtoMessage() {} func (*EnvEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_b21875a7079a06ed, []int{1} + return fileDescriptor_b21875a7079a06ed, []int{2} } func (m *EnvEntry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -178,7 +273,7 @@ func (m *ManifestResponse) Reset() { *m = ManifestResponse{} } func (m *ManifestResponse) String() string { return proto.CompactTextString(m) } func (*ManifestResponse) ProtoMessage() {} func (*ManifestResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_b21875a7079a06ed, []int{2} + return fileDescriptor_b21875a7079a06ed, []int{3} } func (m *ManifestResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -221,61 +316,6 @@ func (m *ManifestResponse) GetSourceType() string { return "" } -type RepositoryRequest struct { - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` - Env []*EnvEntry `protobuf:"bytes,2,rep,name=env,proto3" json:"env,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *RepositoryRequest) Reset() { *m = RepositoryRequest{} } -func (m *RepositoryRequest) String() string { return proto.CompactTextString(m) } -func (*RepositoryRequest) ProtoMessage() {} -func (*RepositoryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_b21875a7079a06ed, []int{3} -} -func (m *RepositoryRequest) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *RepositoryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_RepositoryRequest.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *RepositoryRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_RepositoryRequest.Merge(m, src) -} -func (m *RepositoryRequest) XXX_Size() int { - return m.Size() -} -func (m *RepositoryRequest) XXX_DiscardUnknown() { - xxx_messageInfo_RepositoryRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_RepositoryRequest proto.InternalMessageInfo - -func (m *RepositoryRequest) GetPath() string { - if m != nil { - return m.Path - } - return "" -} - -func (m *RepositoryRequest) GetEnv() []*EnvEntry { - if m != nil { - return m.Env - } - return nil -} - type RepositoryResponse struct { IsSupported bool `protobuf:"varint,1,opt,name=isSupported,proto3" json:"isSupported,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -323,24 +363,25 @@ func (m *RepositoryResponse) GetIsSupported() bool { return false } -type ConfigRequest struct { +type File struct { + Chunk []byte `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } -func (m *ConfigRequest) Reset() { *m = ConfigRequest{} } -func (m *ConfigRequest) String() string { return proto.CompactTextString(m) } -func (*ConfigRequest) ProtoMessage() {} -func (*ConfigRequest) Descriptor() ([]byte, []int) { +func (m *File) Reset() { *m = File{} } +func (m *File) String() string { return proto.CompactTextString(m) } +func (*File) ProtoMessage() {} +func (*File) Descriptor() ([]byte, []int) { return fileDescriptor_b21875a7079a06ed, []int{5} } -func (m *ConfigRequest) XXX_Unmarshal(b []byte) error { +func (m *File) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *ConfigRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *File) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_ConfigRequest.Marshal(b, m, deterministic) + return xxx_messageInfo_File.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalToSizedBuffer(b) @@ -350,119 +391,70 @@ func (m *ConfigRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error return b[:n], nil } } -func (m *ConfigRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConfigRequest.Merge(m, src) +func (m *File) XXX_Merge(src proto.Message) { + xxx_messageInfo_File.Merge(m, src) } -func (m *ConfigRequest) XXX_Size() int { +func (m *File) XXX_Size() int { return m.Size() } -func (m *ConfigRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ConfigRequest.DiscardUnknown(m) +func (m *File) XXX_DiscardUnknown() { + xxx_messageInfo_File.DiscardUnknown(m) } -var xxx_messageInfo_ConfigRequest proto.InternalMessageInfo - -type ConfigResponse struct { - AllowConcurrency bool `protobuf:"varint,1,opt,name=allowConcurrency,proto3" json:"allowConcurrency,omitempty"` - LockRepo bool `protobuf:"varint,2,opt,name=lockRepo,proto3" json:"lockRepo,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *ConfigResponse) Reset() { *m = ConfigResponse{} } -func (m *ConfigResponse) String() string { return proto.CompactTextString(m) } -func (*ConfigResponse) ProtoMessage() {} -func (*ConfigResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_b21875a7079a06ed, []int{6} -} -func (m *ConfigResponse) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *ConfigResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_ConfigResponse.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *ConfigResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConfigResponse.Merge(m, src) -} -func (m *ConfigResponse) XXX_Size() int { - return m.Size() -} -func (m *ConfigResponse) XXX_DiscardUnknown() { - xxx_messageInfo_ConfigResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_ConfigResponse proto.InternalMessageInfo - -func (m *ConfigResponse) GetAllowConcurrency() bool { - if m != nil { - return m.AllowConcurrency - } - return false -} +var xxx_messageInfo_File proto.InternalMessageInfo -func (m *ConfigResponse) GetLockRepo() bool { +func (m *File) GetChunk() []byte { if m != nil { - return m.LockRepo + return m.Chunk } - return false + return nil } func init() { - proto.RegisterType((*ManifestRequest)(nil), "plugin.ManifestRequest") + proto.RegisterType((*AppStreamRequest)(nil), "plugin.AppStreamRequest") + proto.RegisterType((*ManifestRequestMetadata)(nil), "plugin.ManifestRequestMetadata") proto.RegisterType((*EnvEntry)(nil), "plugin.EnvEntry") proto.RegisterType((*ManifestResponse)(nil), "plugin.ManifestResponse") - proto.RegisterType((*RepositoryRequest)(nil), "plugin.RepositoryRequest") proto.RegisterType((*RepositoryResponse)(nil), "plugin.RepositoryResponse") - proto.RegisterType((*ConfigRequest)(nil), "plugin.ConfigRequest") - proto.RegisterType((*ConfigResponse)(nil), "plugin.ConfigResponse") + proto.RegisterType((*File)(nil), "plugin.File") } func init() { proto.RegisterFile("cmpserver/plugin/plugin.proto", fileDescriptor_b21875a7079a06ed) } var fileDescriptor_b21875a7079a06ed = []byte{ - // 501 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0xdd, 0x6e, 0xd3, 0x30, - 0x18, 0x25, 0x6b, 0x37, 0xda, 0x6f, 0x82, 0x16, 0x8b, 0x9f, 0x10, 0xb1, 0xaa, 0xca, 0x55, 0x85, - 0x44, 0x23, 0x0a, 0x42, 0x5c, 0xa2, 0x55, 0x53, 0x25, 0xd0, 0x50, 0x95, 0x71, 0x81, 0xb8, 0xf3, - 0xdc, 0x6f, 0xa9, 0x59, 0x6a, 0x1b, 0xc7, 0x09, 0xea, 0xd3, 0xf0, 0x3a, 0x5c, 0xf2, 0x08, 0xa8, - 0x4f, 0xc1, 0x25, 0x4a, 0x62, 0x37, 0x65, 0x45, 0xbb, 0x8a, 0xcf, 0x39, 0xf6, 0xd1, 0x39, 0x5f, - 0x6c, 0x38, 0x61, 0x2b, 0x95, 0xa1, 0x2e, 0x50, 0x47, 0x2a, 0xcd, 0x13, 0x2e, 0xec, 0x67, 0xac, - 0xb4, 0x34, 0x92, 0x1c, 0xd5, 0x28, 0x08, 0xaf, 0xdf, 0x66, 0x63, 0x2e, 0x23, 0xaa, 0x78, 0xc4, - 0xa4, 0xc6, 0xa8, 0x78, 0x19, 0x25, 0x28, 0x50, 0x53, 0x83, 0x8b, 0x7a, 0x6f, 0xf8, 0xc3, 0x83, - 0xde, 0x39, 0x15, 0xfc, 0x0a, 0x33, 0x13, 0xe3, 0xb7, 0x1c, 0x33, 0x43, 0x7c, 0xb8, 0x4b, 0x95, - 0xfa, 0x48, 0x57, 0xe8, 0x7b, 0x43, 0x6f, 0xd4, 0x8d, 0x1d, 0xb4, 0xca, 0x9c, 0x9a, 0xa5, 0x7f, - 0xb0, 0x55, 0x4a, 0x48, 0x02, 0xe8, 0x68, 0x54, 0xb2, 0x92, 0x5a, 0x95, 0xb4, 0xc5, 0xe5, 0x29, - 0x21, 0xa7, 0x94, 0x2d, 0xd1, 0x6f, 0x0f, 0xbd, 0x51, 0x27, 0x76, 0x90, 0x84, 0xd0, 0x42, 0x51, - 0xf8, 0x87, 0xc3, 0xd6, 0xe8, 0x78, 0xd2, 0x1f, 0xdb, 0x16, 0x67, 0xa2, 0x38, 0x13, 0x46, 0xaf, - 0xe3, 0x52, 0x0c, 0x5f, 0x43, 0xc7, 0x11, 0x84, 0x40, 0x5b, 0x34, 0xb1, 0xaa, 0x35, 0x79, 0x08, - 0x87, 0x05, 0x4d, 0x73, 0xb4, 0x89, 0x6a, 0x10, 0xce, 0xa1, 0xdf, 0xd4, 0xca, 0x94, 0x14, 0x19, - 0x92, 0x67, 0xd0, 0x5d, 0x59, 0x2e, 0xf3, 0xbd, 0x61, 0x6b, 0xd4, 0x8d, 0x1b, 0x82, 0x0c, 0x00, - 0x32, 0x99, 0x6b, 0x86, 0x9f, 0xd6, 0xca, 0x99, 0xed, 0x30, 0xe1, 0x07, 0x78, 0x10, 0xa3, 0x92, - 0x19, 0x37, 0x52, 0xaf, 0xdd, 0xa8, 0x08, 0xb4, 0x55, 0x59, 0xd9, 0x06, 0x2a, 0xd7, 0xae, 0xd4, - 0xc1, 0x6d, 0xa5, 0xde, 0x00, 0xd9, 0x35, 0xb3, 0x01, 0x87, 0x70, 0xcc, 0xb3, 0x8b, 0x5c, 0x29, - 0xa9, 0x0d, 0x2e, 0x2a, 0xd3, 0x4e, 0xbc, 0x4b, 0x85, 0x3d, 0xb8, 0x37, 0x95, 0xe2, 0x8a, 0x27, - 0x36, 0x40, 0xf8, 0x19, 0xee, 0x3b, 0xc2, 0x9a, 0x3c, 0x87, 0x3e, 0x4d, 0x53, 0xf9, 0x7d, 0x2a, - 0x05, 0xcb, 0xb5, 0x46, 0xc1, 0xd6, 0xd6, 0x69, 0x8f, 0x2f, 0xff, 0x5a, 0x2a, 0xd9, 0x75, 0x19, - 0xa5, 0x6a, 0xdc, 0x89, 0xb7, 0x78, 0xf2, 0xc7, 0x83, 0x93, 0xda, 0xfa, 0x9c, 0x0a, 0x9a, 0xe0, - 0x0a, 0x85, 0x99, 0x57, 0x5d, 0x2e, 0x50, 0x17, 0x9c, 0x21, 0x99, 0x41, 0x7f, 0x66, 0xaf, 0x93, - 0x9b, 0x35, 0x79, 0xe2, 0xfa, 0xde, 0xb8, 0x54, 0x81, 0xbf, 0x2f, 0xd4, 0x81, 0xc3, 0x3b, 0xe4, - 0x7d, 0x79, 0x07, 0x0d, 0x5b, 0x36, 0x23, 0x21, 0x4f, 0xdd, 0xf6, 0xbd, 0x99, 0x07, 0xc1, 0xff, - 0xa4, 0xad, 0xd7, 0x29, 0xf4, 0x66, 0x68, 0x83, 0xd6, 0xf1, 0xc9, 0x23, 0x77, 0xe0, 0x9f, 0xd1, - 0x05, 0x8f, 0x6f, 0xd2, 0xce, 0xe3, 0xf4, 0xdd, 0xcf, 0xcd, 0xc0, 0xfb, 0xb5, 0x19, 0x78, 0xbf, - 0x37, 0x03, 0xef, 0xcb, 0x24, 0xe1, 0x66, 0x99, 0x5f, 0x8e, 0x99, 0x5c, 0x45, 0x54, 0x27, 0x52, - 0x69, 0xf9, 0xb5, 0x5a, 0xbc, 0x60, 0x8b, 0xa8, 0x98, 0x44, 0xcd, 0x63, 0xa4, 0x8a, 0xb3, 0x94, - 0xa3, 0x30, 0x97, 0x47, 0xd5, 0xeb, 0x7a, 0xf5, 0x37, 0x00, 0x00, 0xff, 0xff, 0x6e, 0xfe, 0x31, - 0x8a, 0xaa, 0x03, 0x00, 0x00, + // 506 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0xc1, 0x8e, 0xd3, 0x3c, + 0x10, 0xde, 0xfc, 0xed, 0xee, 0xb6, 0xb3, 0x2b, 0xfd, 0x95, 0x85, 0x44, 0x54, 0xed, 0x96, 0x2a, + 0xa7, 0x5e, 0x68, 0x44, 0x41, 0x88, 0x0b, 0x12, 0x2c, 0x5a, 0x58, 0x81, 0x8a, 0x2a, 0x97, 0x13, + 0x37, 0xaf, 0x3b, 0x4d, 0x4d, 0x13, 0xdb, 0xd8, 0x4e, 0xa4, 0x72, 0xe2, 0x6d, 0x78, 0x05, 0x1e, + 0x81, 0x23, 0x8f, 0x80, 0xfa, 0x24, 0x28, 0x4e, 0xd2, 0x56, 0xac, 0xe0, 0x94, 0x99, 0x6f, 0x66, + 0x3e, 0x7f, 0x9f, 0x33, 0x86, 0x4b, 0x9e, 0x69, 0x8b, 0xa6, 0x40, 0x13, 0xeb, 0x34, 0x4f, 0x84, + 0xac, 0x3f, 0x63, 0x6d, 0x94, 0x53, 0xe4, 0xa4, 0xca, 0xfa, 0xd1, 0xfa, 0x99, 0x1d, 0x0b, 0x15, + 0x33, 0x2d, 0x62, 0xae, 0x0c, 0xc6, 0xc5, 0xa3, 0x38, 0x41, 0x89, 0x86, 0x39, 0x5c, 0x54, 0xbd, + 0xd1, 0xd7, 0x00, 0x7a, 0x2f, 0xb5, 0x9e, 0x3b, 0x83, 0x2c, 0xa3, 0xf8, 0x39, 0x47, 0xeb, 0xc8, + 0x73, 0xe8, 0x64, 0xe8, 0xd8, 0x82, 0x39, 0x16, 0x06, 0xc3, 0x60, 0x74, 0x36, 0x79, 0x30, 0xae, + 0x4f, 0x98, 0x32, 0x29, 0x96, 0x68, 0x5d, 0xdd, 0x3a, 0xad, 0xdb, 0x6e, 0x8e, 0xe8, 0x6e, 0x84, + 0x44, 0xd0, 0x5e, 0x8a, 0x14, 0xc3, 0xff, 0xfc, 0xe8, 0x79, 0x33, 0xfa, 0x5a, 0xa4, 0x78, 0x73, + 0x44, 0x7d, 0xed, 0xaa, 0x0b, 0xa7, 0xa6, 0xa2, 0x88, 0xbe, 0x05, 0x70, 0xff, 0x2f, 0xb4, 0x24, + 0x84, 0x53, 0xa6, 0xf5, 0x7b, 0x96, 0xa1, 0x17, 0xd2, 0xa5, 0x4d, 0x4a, 0x06, 0x00, 0x4c, 0x6b, + 0x8a, 0xe9, 0x8c, 0xb9, 0x95, 0x3f, 0xaa, 0x4b, 0x0f, 0x10, 0xd2, 0x87, 0x0e, 0x5f, 0x21, 0x5f, + 0xdb, 0x3c, 0x0b, 0x5b, 0xbe, 0xba, 0xcb, 0x09, 0x81, 0xb6, 0x15, 0x5f, 0x30, 0x6c, 0x0f, 0x83, + 0x51, 0x8b, 0xfa, 0x98, 0x44, 0xd0, 0x42, 0x59, 0x84, 0xc7, 0xc3, 0xd6, 0xe8, 0x6c, 0xd2, 0x6b, + 0x34, 0x5f, 0xcb, 0xe2, 0x5a, 0x3a, 0xb3, 0xa1, 0x65, 0x31, 0x7a, 0x02, 0x9d, 0x06, 0x28, 0x39, + 0xe4, 0x5e, 0x96, 0x8f, 0xc9, 0x3d, 0x38, 0x2e, 0x58, 0x9a, 0x63, 0x2d, 0xa7, 0x4a, 0xa2, 0x19, + 0xf4, 0xf6, 0xf6, 0xac, 0x56, 0xd2, 0x22, 0xb9, 0x80, 0x6e, 0x56, 0x63, 0x36, 0x0c, 0x86, 0xad, + 0x51, 0x97, 0xee, 0x81, 0xd2, 0x9b, 0x55, 0xb9, 0xe1, 0xf8, 0x61, 0xa3, 0x1b, 0xb2, 0x03, 0x24, + 0x7a, 0x0a, 0x84, 0xa2, 0x56, 0x56, 0x38, 0x65, 0x36, 0x3b, 0xce, 0x21, 0x9c, 0x09, 0x3b, 0xcf, + 0xb5, 0x56, 0xc6, 0xe1, 0xc2, 0x0b, 0xeb, 0xd0, 0x43, 0x28, 0xba, 0x80, 0x76, 0xf9, 0x13, 0x4a, + 0x9d, 0x7c, 0x95, 0xcb, 0xb5, 0xef, 0x39, 0xa7, 0x55, 0x32, 0xf9, 0x1e, 0xc0, 0xe5, 0x2b, 0x25, + 0x97, 0x22, 0x99, 0x32, 0xc9, 0x12, 0xcc, 0x50, 0xba, 0x99, 0xbf, 0x86, 0x39, 0x9a, 0x42, 0x70, + 0x24, 0x6f, 0xa1, 0xf7, 0xa6, 0xde, 0x9f, 0xc6, 0x11, 0x09, 0x9b, 0xab, 0xfa, 0x73, 0x8b, 0xfa, + 0xe1, 0xdd, 0x9d, 0xa9, 0x94, 0x46, 0x47, 0xa3, 0x80, 0xbc, 0x83, 0xff, 0xa7, 0xcc, 0xf1, 0xd5, + 0xde, 0xc8, 0x3f, 0xa8, 0xfa, 0x4d, 0xe5, 0xae, 0xed, 0x92, 0xec, 0xea, 0xc5, 0x8f, 0xed, 0x20, + 0xf8, 0xb9, 0x1d, 0x04, 0xbf, 0xb6, 0x83, 0xe0, 0xe3, 0x24, 0x11, 0x6e, 0x95, 0xdf, 0x8e, 0xb9, + 0xca, 0x62, 0x66, 0x12, 0xa5, 0x8d, 0xfa, 0xe4, 0x83, 0x87, 0x7c, 0x11, 0x17, 0x93, 0x78, 0xff, + 0x7a, 0x98, 0x16, 0x3c, 0x15, 0x28, 0xdd, 0xed, 0x89, 0x7f, 0x0e, 0x8f, 0x7f, 0x07, 0x00, 0x00, + 0xff, 0xff, 0x75, 0x01, 0x97, 0x9d, 0x5b, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -477,12 +469,11 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ConfigManagementPluginServiceClient interface { - // GenerateManifest generates manifest for application in specified repo name and revision - GenerateManifest(ctx context.Context, in *ManifestRequest, opts ...grpc.CallOption) (*ManifestResponse, error) - // MatchRepository returns whether or not the given path is supported by the plugin - MatchRepository(ctx context.Context, in *RepositoryRequest, opts ...grpc.CallOption) (*RepositoryResponse, error) - // Get configuration of the plugin - GetPluginConfig(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) + // GenerateManifests receive a stream containing a tgz archive with all required files necessary + // to generate manifests + GenerateManifest(ctx context.Context, opts ...grpc.CallOption) (ConfigManagementPluginService_GenerateManifestClient, error) + // MatchRepository returns whether or not the given application is supported by the plugin + MatchRepository(ctx context.Context, opts ...grpc.CallOption) (ConfigManagementPluginService_MatchRepositoryClient, error) } type configManagementPluginServiceClient struct { @@ -493,137 +484,248 @@ func NewConfigManagementPluginServiceClient(cc *grpc.ClientConn) ConfigManagemen return &configManagementPluginServiceClient{cc} } -func (c *configManagementPluginServiceClient) GenerateManifest(ctx context.Context, in *ManifestRequest, opts ...grpc.CallOption) (*ManifestResponse, error) { - out := new(ManifestResponse) - err := c.cc.Invoke(ctx, "/plugin.ConfigManagementPluginService/GenerateManifest", in, out, opts...) +func (c *configManagementPluginServiceClient) GenerateManifest(ctx context.Context, opts ...grpc.CallOption) (ConfigManagementPluginService_GenerateManifestClient, error) { + stream, err := c.cc.NewStream(ctx, &_ConfigManagementPluginService_serviceDesc.Streams[0], "/plugin.ConfigManagementPluginService/GenerateManifest", opts...) if err != nil { return nil, err } - return out, nil + x := &configManagementPluginServiceGenerateManifestClient{stream} + return x, nil } -func (c *configManagementPluginServiceClient) MatchRepository(ctx context.Context, in *RepositoryRequest, opts ...grpc.CallOption) (*RepositoryResponse, error) { - out := new(RepositoryResponse) - err := c.cc.Invoke(ctx, "/plugin.ConfigManagementPluginService/MatchRepository", in, out, opts...) - if err != nil { +type ConfigManagementPluginService_GenerateManifestClient interface { + Send(*AppStreamRequest) error + CloseAndRecv() (*ManifestResponse, error) + grpc.ClientStream +} + +type configManagementPluginServiceGenerateManifestClient struct { + grpc.ClientStream +} + +func (x *configManagementPluginServiceGenerateManifestClient) Send(m *AppStreamRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *configManagementPluginServiceGenerateManifestClient) CloseAndRecv() (*ManifestResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(ManifestResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } - return out, nil + return m, nil } -func (c *configManagementPluginServiceClient) GetPluginConfig(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) { - out := new(ConfigResponse) - err := c.cc.Invoke(ctx, "/plugin.ConfigManagementPluginService/GetPluginConfig", in, out, opts...) +func (c *configManagementPluginServiceClient) MatchRepository(ctx context.Context, opts ...grpc.CallOption) (ConfigManagementPluginService_MatchRepositoryClient, error) { + stream, err := c.cc.NewStream(ctx, &_ConfigManagementPluginService_serviceDesc.Streams[1], "/plugin.ConfigManagementPluginService/MatchRepository", opts...) if err != nil { return nil, err } - return out, nil + x := &configManagementPluginServiceMatchRepositoryClient{stream} + return x, nil +} + +type ConfigManagementPluginService_MatchRepositoryClient interface { + Send(*AppStreamRequest) error + CloseAndRecv() (*RepositoryResponse, error) + grpc.ClientStream +} + +type configManagementPluginServiceMatchRepositoryClient struct { + grpc.ClientStream +} + +func (x *configManagementPluginServiceMatchRepositoryClient) Send(m *AppStreamRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *configManagementPluginServiceMatchRepositoryClient) CloseAndRecv() (*RepositoryResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(RepositoryResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // ConfigManagementPluginServiceServer is the server API for ConfigManagementPluginService service. type ConfigManagementPluginServiceServer interface { - // GenerateManifest generates manifest for application in specified repo name and revision - GenerateManifest(context.Context, *ManifestRequest) (*ManifestResponse, error) - // MatchRepository returns whether or not the given path is supported by the plugin - MatchRepository(context.Context, *RepositoryRequest) (*RepositoryResponse, error) - // Get configuration of the plugin - GetPluginConfig(context.Context, *ConfigRequest) (*ConfigResponse, error) + // GenerateManifests receive a stream containing a tgz archive with all required files necessary + // to generate manifests + GenerateManifest(ConfigManagementPluginService_GenerateManifestServer) error + // MatchRepository returns whether or not the given application is supported by the plugin + MatchRepository(ConfigManagementPluginService_MatchRepositoryServer) error } // UnimplementedConfigManagementPluginServiceServer can be embedded to have forward compatible implementations. type UnimplementedConfigManagementPluginServiceServer struct { } -func (*UnimplementedConfigManagementPluginServiceServer) GenerateManifest(ctx context.Context, req *ManifestRequest) (*ManifestResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GenerateManifest not implemented") -} -func (*UnimplementedConfigManagementPluginServiceServer) MatchRepository(ctx context.Context, req *RepositoryRequest) (*RepositoryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method MatchRepository not implemented") +func (*UnimplementedConfigManagementPluginServiceServer) GenerateManifest(srv ConfigManagementPluginService_GenerateManifestServer) error { + return status.Errorf(codes.Unimplemented, "method GenerateManifest not implemented") } -func (*UnimplementedConfigManagementPluginServiceServer) GetPluginConfig(ctx context.Context, req *ConfigRequest) (*ConfigResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetPluginConfig not implemented") +func (*UnimplementedConfigManagementPluginServiceServer) MatchRepository(srv ConfigManagementPluginService_MatchRepositoryServer) error { + return status.Errorf(codes.Unimplemented, "method MatchRepository not implemented") } func RegisterConfigManagementPluginServiceServer(s *grpc.Server, srv ConfigManagementPluginServiceServer) { s.RegisterService(&_ConfigManagementPluginService_serviceDesc, srv) } -func _ConfigManagementPluginService_GenerateManifest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ManifestRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ConfigManagementPluginServiceServer).GenerateManifest(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/plugin.ConfigManagementPluginService/GenerateManifest", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ConfigManagementPluginServiceServer).GenerateManifest(ctx, req.(*ManifestRequest)) - } - return interceptor(ctx, in, info, handler) +func _ConfigManagementPluginService_GenerateManifest_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ConfigManagementPluginServiceServer).GenerateManifest(&configManagementPluginServiceGenerateManifestServer{stream}) } -func _ConfigManagementPluginService_MatchRepository_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(RepositoryRequest) - if err := dec(in); err != nil { +type ConfigManagementPluginService_GenerateManifestServer interface { + SendAndClose(*ManifestResponse) error + Recv() (*AppStreamRequest, error) + grpc.ServerStream +} + +type configManagementPluginServiceGenerateManifestServer struct { + grpc.ServerStream +} + +func (x *configManagementPluginServiceGenerateManifestServer) SendAndClose(m *ManifestResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *configManagementPluginServiceGenerateManifestServer) Recv() (*AppStreamRequest, error) { + m := new(AppStreamRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(ConfigManagementPluginServiceServer).MatchRepository(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/plugin.ConfigManagementPluginService/MatchRepository", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ConfigManagementPluginServiceServer).MatchRepository(ctx, req.(*RepositoryRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil +} + +func _ConfigManagementPluginService_MatchRepository_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ConfigManagementPluginServiceServer).MatchRepository(&configManagementPluginServiceMatchRepositoryServer{stream}) +} + +type ConfigManagementPluginService_MatchRepositoryServer interface { + SendAndClose(*RepositoryResponse) error + Recv() (*AppStreamRequest, error) + grpc.ServerStream +} + +type configManagementPluginServiceMatchRepositoryServer struct { + grpc.ServerStream +} + +func (x *configManagementPluginServiceMatchRepositoryServer) SendAndClose(m *RepositoryResponse) error { + return x.ServerStream.SendMsg(m) } -func _ConfigManagementPluginService_GetPluginConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ConfigRequest) - if err := dec(in); err != nil { +func (x *configManagementPluginServiceMatchRepositoryServer) Recv() (*AppStreamRequest, error) { + m := new(AppStreamRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(ConfigManagementPluginServiceServer).GetPluginConfig(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/plugin.ConfigManagementPluginService/GetPluginConfig", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ConfigManagementPluginServiceServer).GetPluginConfig(ctx, req.(*ConfigRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } var _ConfigManagementPluginService_serviceDesc = grpc.ServiceDesc{ ServiceName: "plugin.ConfigManagementPluginService", HandlerType: (*ConfigManagementPluginServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GenerateManifest", - Handler: _ConfigManagementPluginService_GenerateManifest_Handler, - }, + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "MatchRepository", - Handler: _ConfigManagementPluginService_MatchRepository_Handler, + StreamName: "GenerateManifest", + Handler: _ConfigManagementPluginService_GenerateManifest_Handler, + ClientStreams: true, }, { - MethodName: "GetPluginConfig", - Handler: _ConfigManagementPluginService_GetPluginConfig_Handler, + StreamName: "MatchRepository", + Handler: _ConfigManagementPluginService_MatchRepository_Handler, + ClientStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "cmpserver/plugin/plugin.proto", } -func (m *ManifestRequest) Marshal() (dAtA []byte, err error) { +func (m *AppStreamRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AppStreamRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AppStreamRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Request != nil { + { + size := m.Request.Size() + i -= size + if _, err := m.Request.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *AppStreamRequest_Metadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AppStreamRequest_Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Metadata != nil { + { + size, err := m.Metadata.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintPlugin(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} +func (m *AppStreamRequest_File) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AppStreamRequest_File) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.File != nil { + { + size, err := m.File.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintPlugin(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} +func (m *ManifestRequestMetadata) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -633,12 +735,12 @@ func (m *ManifestRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *ManifestRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *ManifestRequestMetadata) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *ManifestRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *ManifestRequestMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -661,27 +763,22 @@ func (m *ManifestRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0x2a } } - if m.NoCache { - i-- - if m.NoCache { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } + if m.Size_ != 0 { + i = encodeVarintPlugin(dAtA, i, uint64(m.Size_)) i-- dAtA[i] = 0x20 } - if len(m.RepoPath) > 0 { - i -= len(m.RepoPath) - copy(dAtA[i:], m.RepoPath) - i = encodeVarintPlugin(dAtA, i, uint64(len(m.RepoPath))) + if len(m.Checksum) > 0 { + i -= len(m.Checksum) + copy(dAtA[i:], m.Checksum) + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Checksum))) i-- dAtA[i] = 0x1a } - if len(m.AppPath) > 0 { - i -= len(m.AppPath) - copy(dAtA[i:], m.AppPath) - i = encodeVarintPlugin(dAtA, i, uint64(len(m.AppPath))) + if len(m.AppRelPath) > 0 { + i -= len(m.AppRelPath) + copy(dAtA[i:], m.AppRelPath) + i = encodeVarintPlugin(dAtA, i, uint64(len(m.AppRelPath))) i-- dAtA[i] = 0x12 } @@ -779,54 +876,6 @@ func (m *ManifestResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *RepositoryRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *RepositoryRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *RepositoryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.XXX_unrecognized != nil { - i -= len(m.XXX_unrecognized) - copy(dAtA[i:], m.XXX_unrecognized) - } - if len(m.Env) > 0 { - for iNdEx := len(m.Env) - 1; iNdEx >= 0; iNdEx-- { - { - size, err := m.Env[iNdEx].MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintPlugin(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x12 - } - } - if len(m.Path) > 0 { - i -= len(m.Path) - copy(dAtA[i:], m.Path) - i = encodeVarintPlugin(dAtA, i, uint64(len(m.Path))) - i-- - dAtA[i] = 0xa - } - return len(dAtA) - i, nil -} - func (m *RepositoryResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -864,34 +913,7 @@ func (m *RepositoryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *ConfigRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *ConfigRequest) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *ConfigRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.XXX_unrecognized != nil { - i -= len(m.XXX_unrecognized) - copy(dAtA[i:], m.XXX_unrecognized) - } - return len(dAtA) - i, nil -} - -func (m *ConfigResponse) Marshal() (dAtA []byte, err error) { +func (m *File) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -901,12 +923,12 @@ func (m *ConfigResponse) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *ConfigResponse) MarshalTo(dAtA []byte) (int, error) { +func (m *File) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *ConfigResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *File) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -915,25 +937,12 @@ func (m *ConfigResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } - if m.LockRepo { + if len(m.Chunk) > 0 { + i -= len(m.Chunk) + copy(dAtA[i:], m.Chunk) + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Chunk))) i-- - if m.LockRepo { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i-- - dAtA[i] = 0x10 - } - if m.AllowConcurrency { - i-- - if m.AllowConcurrency { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i-- - dAtA[i] = 0x8 + dAtA[i] = 0xa } return len(dAtA) - i, nil } @@ -949,7 +958,46 @@ func encodeVarintPlugin(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *ManifestRequest) Size() (n int) { +func (m *AppStreamRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Request != nil { + n += m.Request.Size() + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *AppStreamRequest_Metadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Metadata != nil { + l = m.Metadata.Size() + n += 1 + l + sovPlugin(uint64(l)) + } + return n +} +func (m *AppStreamRequest_File) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.File != nil { + l = m.File.Size() + n += 1 + l + sovPlugin(uint64(l)) + } + return n +} +func (m *ManifestRequestMetadata) Size() (n int) { if m == nil { return 0 } @@ -959,16 +1007,16 @@ func (m *ManifestRequest) Size() (n int) { if l > 0 { n += 1 + l + sovPlugin(uint64(l)) } - l = len(m.AppPath) + l = len(m.AppRelPath) if l > 0 { n += 1 + l + sovPlugin(uint64(l)) } - l = len(m.RepoPath) + l = len(m.Checksum) if l > 0 { n += 1 + l + sovPlugin(uint64(l)) } - if m.NoCache { - n += 2 + if m.Size_ != 0 { + n += 1 + sovPlugin(uint64(m.Size_)) } if len(m.Env) > 0 { for _, e := range m.Env { @@ -1024,80 +1072,165 @@ func (m *ManifestResponse) Size() (n int) { return n } -func (m *RepositoryRequest) Size() (n int) { +func (m *RepositoryResponse) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = len(m.Path) + if m.IsSupported { + n += 2 + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *File) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Chunk) if l > 0 { n += 1 + l + sovPlugin(uint64(l)) } - if len(m.Env) > 0 { - for _, e := range m.Env { - l = e.Size() - n += 1 + l + sovPlugin(uint64(l)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovPlugin(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozPlugin(x uint64) (n int) { + return sovPlugin(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *AppStreamRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AppStreamRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AppStreamRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPlugin + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &ManifestRequestMetadata{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Request = &AppStreamRequest_Metadata{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field File", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPlugin + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &File{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Request = &AppStreamRequest_File{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPlugin(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthPlugin + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy } } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - -func (m *RepositoryResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.IsSupported { - n += 2 - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - -func (m *ConfigRequest) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} -func (m *ConfigResponse) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.AllowConcurrency { - n += 2 - } - if m.LockRepo { - n += 2 - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) + if iNdEx > l { + return io.ErrUnexpectedEOF } - return n -} - -func sovPlugin(x uint64) (n int) { - return (math_bits.Len64(x|1) + 6) / 7 -} -func sozPlugin(x uint64) (n int) { - return sovPlugin(uint64((x << 1) ^ uint64((int64(x) >> 63)))) + return nil } -func (m *ManifestRequest) Unmarshal(dAtA []byte) error { +func (m *ManifestRequestMetadata) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1120,10 +1253,10 @@ func (m *ManifestRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: ManifestRequest: wiretype end group for non-group") + return fmt.Errorf("proto: ManifestRequestMetadata: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: ManifestRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: ManifestRequestMetadata: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -1160,7 +1293,7 @@ func (m *ManifestRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field AppPath", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field AppRelPath", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1188,11 +1321,11 @@ func (m *ManifestRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.AppPath = string(dAtA[iNdEx:postIndex]) + m.AppRelPath = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RepoPath", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Checksum", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1220,13 +1353,13 @@ func (m *ManifestRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.RepoPath = string(dAtA[iNdEx:postIndex]) + m.Checksum = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex case 4: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field NoCache", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Size_", wireType) } - var v int + m.Size_ = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPlugin @@ -1236,12 +1369,11 @@ func (m *ManifestRequest) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= int(b&0x7F) << shift + m.Size_ |= int64(b&0x7F) << shift if b < 0x80 { break } } - m.NoCache = bool(v != 0) case 5: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Env", wireType) @@ -1528,123 +1660,6 @@ func (m *ManifestResponse) Unmarshal(dAtA []byte) error { } return nil } -func (m *RepositoryRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPlugin - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: RepositoryRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: RepositoryRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Path", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPlugin - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthPlugin - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthPlugin - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Path = string(dAtA[iNdEx:postIndex]) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Env", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPlugin - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPlugin - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPlugin - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Env = append(m.Env, &EnvEntry{}) - if err := m.Env[len(m.Env)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipPlugin(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthPlugin - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *RepositoryResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1716,58 +1731,7 @@ func (m *RepositoryResponse) Unmarshal(dAtA []byte) error { } return nil } -func (m *ConfigRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPlugin - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: ConfigRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: ConfigRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - default: - iNdEx = preIndex - skippy, err := skipPlugin(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthPlugin - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *ConfigResponse) Unmarshal(dAtA []byte) error { +func (m *File) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1790,17 +1754,17 @@ func (m *ConfigResponse) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: ConfigResponse: wiretype end group for non-group") + return fmt.Errorf("proto: File: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: ConfigResponse: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: File: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field AllowConcurrency", wireType) + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunk", wireType) } - var v int + var byteLen int for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPlugin @@ -1810,32 +1774,26 @@ func (m *ConfigResponse) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - v |= int(b&0x7F) << shift + byteLen |= int(b&0x7F) << shift if b < 0x80 { break } } - m.AllowConcurrency = bool(v != 0) - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LockRepo", wireType) + if byteLen < 0 { + return ErrInvalidLengthPlugin } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPlugin - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthPlugin + } + if postIndex > l { + return io.ErrUnexpectedEOF } - m.LockRepo = bool(v != 0) + m.Chunk = append(m.Chunk[:0], dAtA[iNdEx:postIndex]...) + if m.Chunk == nil { + m.Chunk = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPlugin(dAtA[iNdEx:]) diff --git a/cmpserver/plugin/plugin.go b/cmpserver/plugin/plugin.go index 2f150bd532939..717fea0250cd6 100644 --- a/cmpserver/plugin/plugin.go +++ b/cmpserver/plugin/plugin.go @@ -13,7 +13,10 @@ import ( "github.com/argoproj/pkg/rand" + "github.com/argoproj/argo-cd/v2/common" "github.com/argoproj/argo-cd/v2/util/buffered_context" + "github.com/argoproj/argo-cd/v2/util/cmp" + "github.com/argoproj/argo-cd/v2/util/io/files" "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/mattn/go-zglob" @@ -42,6 +45,19 @@ func NewService(initConstants CMPServerInitConstants) *Service { } } +func (s *Service) Init() error { + workDir := common.GetCMPWorkDir() + err := os.RemoveAll(workDir) + if err != nil { + return fmt.Errorf("error removing workdir %q: %s", workDir, err) + } + err = os.MkdirAll(workDir, 0700) + if err != nil { + return fmt.Errorf("error creating workdir %q: %s", workDir, err) + } + return nil +} + func runCommand(ctx context.Context, command Command, path string, env []string) (string, error) { if len(command.Command) == 0 { return "", fmt.Errorf("Command is empty") @@ -128,11 +144,43 @@ func environ(envVars []*apiclient.EnvEntry) []string { } // GenerateManifest runs generate command from plugin config file and returns generated manifest files -func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error) { - bufferedCtx, cancel := buffered_context.WithEarlierDeadline(ctx, cmpTimeoutBuffer) +func (s *Service) GenerateManifest(stream apiclient.ConfigManagementPluginService_GenerateManifestServer) error { + ctx, cancel := buffered_context.WithEarlierDeadline(stream.Context(), cmpTimeoutBuffer) defer cancel() + workDir, err := files.CreateTempDir(common.GetCMPWorkDir()) + if err != nil { + return fmt.Errorf("error creating temp dir: %s", err) + } + defer func() { + if err := os.RemoveAll(workDir); err != nil { + // we panic here as the workDir may contain sensitive information + panic(fmt.Sprintf("error removing generate manifest workdir: %s", err)) + } + }() - if deadline, ok := bufferedCtx.Deadline(); ok { + metadata, err := cmp.ReceiveRepoStream(ctx, stream, workDir) + if err != nil { + return fmt.Errorf("generate manifest error receiving stream: %s", err) + } + + appPath := filepath.Clean(filepath.Join(workDir, metadata.AppRelPath)) + if !strings.HasPrefix(appPath, workDir) { + return fmt.Errorf("illegal appPath: out of workDir bound") + } + response, err := s.generateManifest(ctx, appPath, metadata.GetEnv()) + if err != nil { + return fmt.Errorf("error generating manifests: %s", err) + } + err = stream.SendAndClose(response) + if err != nil { + return fmt.Errorf("error sending manifest response: %s", err) + } + return nil +} + +// generateManifest runs generate command from plugin config file and returns generated manifest files +func (s *Service) generateManifest(ctx context.Context, appDir string, envEntries []*apiclient.EnvEntry) (*apiclient.ManifestResponse, error) { + if deadline, ok := ctx.Deadline(); ok { log.Infof("Generating manifests with deadline %v from now", time.Until(deadline)) } else { log.Info("Generating manifests with no request-level timeout") @@ -140,15 +188,15 @@ func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestReq config := s.initConstants.PluginConfig - env := append(os.Environ(), environ(q.Env)...) + env := append(os.Environ(), environ(envEntries)...) if len(config.Spec.Init.Command) > 0 { - _, err := runCommand(bufferedCtx, config.Spec.Init, q.AppPath, env) + _, err := runCommand(ctx, config.Spec.Init, appDir, env) if err != nil { return &apiclient.ManifestResponse{}, err } } - out, err := runCommand(bufferedCtx, config.Spec.Generate, q.AppPath, env) + out, err := runCommand(ctx, config.Spec.Generate, appDir, env) if err != nil { return &apiclient.ManifestResponse{}, err } @@ -163,61 +211,86 @@ func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestReq }, err } -// MatchRepository checks whether the application repository type is supported by config management plugin server -func (s *Service) MatchRepository(ctx context.Context, q *apiclient.RepositoryRequest) (*apiclient.RepositoryResponse, error) { - bufferedCtx, cancel := buffered_context.WithEarlierDeadline(ctx, cmpTimeoutBuffer) +// MatchRepository receives the application stream and checks whether +// its repository type is supported by the config management plugin +// server. +//The checks are implemented in the following order: +// 1. If spec.Discover.FileName is provided it finds for a name match in Applications files +// 2. If spec.Discover.Find.Glob is provided if finds for a glob match in Applications files +// 3. Otherwise it runs the spec.Discover.Find.Command +func (s *Service) MatchRepository(stream apiclient.ConfigManagementPluginService_MatchRepositoryServer) error { + bufferedCtx, cancel := buffered_context.WithEarlierDeadline(stream.Context(), cmpTimeoutBuffer) defer cancel() - var repoResponse apiclient.RepositoryResponse + workDir, err := files.CreateTempDir(common.GetCMPWorkDir()) + if err != nil { + return fmt.Errorf("error creating match repository workdir: %s", err) + } + defer func() { + if err := os.RemoveAll(workDir); err != nil { + // we panic here as the workDir may contain sensitive information + panic(fmt.Sprintf("error removing match repository workdir: %s", err)) + } + }() + + _, err = cmp.ReceiveRepoStream(bufferedCtx, stream, workDir) + if err != nil { + return fmt.Errorf("match repository error receiving stream: %s", err) + } + + isSupported, err := s.matchRepository(bufferedCtx, workDir) + if err != nil { + return fmt.Errorf("match repository error: %s", err) + } + repoResponse := &apiclient.RepositoryResponse{IsSupported: isSupported} + + err = stream.SendAndClose(repoResponse) + if err != nil { + return fmt.Errorf("error sending match repository response: %s", err) + } + return nil +} + +func (s *Service) matchRepository(ctx context.Context, workdir string) (bool, error) { config := s.initConstants.PluginConfig if config.Spec.Discover.FileName != "" { log.Debugf("config.Spec.Discover.FileName is provided") - pattern := strings.TrimSuffix(q.Path, "/") + "/" + strings.TrimPrefix(config.Spec.Discover.FileName, "/") + pattern := filepath.Join(workdir, config.Spec.Discover.FileName) matches, err := filepath.Glob(pattern) - if err != nil || len(matches) == 0 { - log.Debugf("Could not find match for pattern %s. Error is %v.", pattern, err) - return &repoResponse, err - } else if len(matches) > 0 { - repoResponse.IsSupported = true - return &repoResponse, nil + if err != nil { + e := fmt.Errorf("error finding filename match for pattern %q: %s", pattern, err) + log.Debug(e) + return false, e } + return len(matches) > 0, nil } if config.Spec.Discover.Find.Glob != "" { log.Debugf("config.Spec.Discover.Find.Glob is provided") - pattern := strings.TrimSuffix(q.Path, "/") + "/" + strings.TrimPrefix(config.Spec.Discover.Find.Glob, "/") + pattern := filepath.Join(workdir, config.Spec.Discover.Find.Glob) // filepath.Glob doesn't have '**' support hence selecting third-party lib // https://github.com/golang/go/issues/11862 matches, err := zglob.Glob(pattern) - if err != nil || len(matches) == 0 { - log.Debugf("Could not find match for pattern %s. Error is %v.", pattern, err) - return &repoResponse, err - } else if len(matches) > 0 { - repoResponse.IsSupported = true - return &repoResponse, nil + if err != nil { + e := fmt.Errorf("error finding glob match for pattern %q: %s", pattern, err) + log.Debug(e) + return false, e } + + if len(matches) > 0 { + return true, nil + } + return false, nil } log.Debugf("Going to try runCommand.") - find, err := runCommand(bufferedCtx, config.Spec.Discover.Find.Command, q.Path, os.Environ()) + find, err := runCommand(ctx, config.Spec.Discover.Find.Command, workdir, os.Environ()) if err != nil { - return &repoResponse, err + return false, fmt.Errorf("error running find command: %s", err) } - var isSupported bool if find != "" { - isSupported = true + return true, nil } - return &apiclient.RepositoryResponse{ - IsSupported: isSupported, - }, nil -} - -// GetPluginConfig returns plugin config -func (s *Service) GetPluginConfig(ctx context.Context, q *apiclient.ConfigRequest) (*apiclient.ConfigResponse, error) { - config := s.initConstants.PluginConfig - return &apiclient.ConfigResponse{ - AllowConcurrency: config.Spec.AllowConcurrency, - LockRepo: config.Spec.LockRepo, - }, nil + return false, nil } diff --git a/cmpserver/plugin/plugin.proto b/cmpserver/plugin/plugin.proto index b7e143772de8a..2940f8a5b0326 100644 --- a/cmpserver/plugin/plugin.proto +++ b/cmpserver/plugin/plugin.proto @@ -5,13 +5,27 @@ package plugin; import "k8s.io/api/core/v1/generated.proto"; -// ManifestRequest is a query for manifest generation. -message ManifestRequest { - // Name of the application for which the request is triggered +// AppStreamRequest is the request object used to send the application's +// files over a stream. +message AppStreamRequest { + oneof request { + ManifestRequestMetadata metadata = 1; + File file = 2; + } +} + +// ManifestRequestMetadata defines the metada related to the file being sent +// to the CMP server. +message ManifestRequestMetadata { + // appName refers to the ArgoCD Application name string appName = 1; - string appPath = 2; - string repoPath = 3; - bool noCache = 4; + // appRelPath points to the application relative path inside the tarball + string appRelPath = 2; + // checksum is used to verify the integrity of the file + string checksum = 3; + // size relates to the file size in bytes + int64 size = 4; + // env is a list with the environment variables needed to generate manifests repeated EnvEntry env = 5; } @@ -28,34 +42,22 @@ message ManifestResponse { string sourceType = 2; } -message RepositoryRequest { - string path = 1; - repeated EnvEntry env = 2; -} - message RepositoryResponse { bool isSupported = 1; } -message ConfigRequest { -} - -message ConfigResponse { - bool allowConcurrency = 1; - bool lockRepo = 2; +message File { + bytes chunk = 1; } // ConfigManagementPlugin Service service ConfigManagementPluginService { - // GenerateManifest generates manifest for application in specified repo name and revision - rpc GenerateManifest(ManifestRequest) returns (ManifestResponse) { - } - - // MatchRepository returns whether or not the given path is supported by the plugin - rpc MatchRepository(RepositoryRequest) returns (RepositoryResponse) { + // GenerateManifests receive a stream containing a tgz archive with all required files necessary + // to generate manifests + rpc GenerateManifest(stream AppStreamRequest) returns (ManifestResponse) { } - // Get configuration of the plugin - rpc GetPluginConfig(ConfigRequest) returns (ConfigResponse) { + // MatchRepository returns whether or not the given application is supported by the plugin + rpc MatchRepository(stream AppStreamRequest) returns (RepositoryResponse) { } } diff --git a/cmpserver/plugin/plugin_test.go b/cmpserver/plugin/plugin_test.go index b8f7bd240c8a7..cb00510385fb5 100644 --- a/cmpserver/plugin/plugin_test.go +++ b/cmpserver/plugin/plugin_test.go @@ -2,14 +2,15 @@ package plugin import ( "context" - "os" + "path/filepath" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/argoproj/argo-cd/v2/cmpserver/apiclient" + "github.com/argoproj/argo-cd/v2/test" ) func newService(configFilePath string) (*Service, error) { @@ -28,19 +29,164 @@ func newService(configFilePath string) (*Service, error) { return service, nil } +type pluginOpt func(*CMPServerInitConstants) + +func withDiscover(d Discover) pluginOpt { + return func(cic *CMPServerInitConstants) { + cic.PluginConfig.Spec.Discover = d + } +} + +func buildPluginConfig(opts ...pluginOpt) *CMPServerInitConstants { + cic := &CMPServerInitConstants{ + PluginConfig: PluginConfig{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigManagementPlugin", + APIVersion: "argoproj.io/v1alpha1", + }, + Metadata: metav1.ObjectMeta{ + Name: "some-plugin", + }, + Spec: PluginConfigSpec{ + Version: "v1.0", + }, + }, + } + for _, opt := range opts { + opt(cic) + } + return cic +} + func TestMatchRepository(t *testing.T) { - configFilePath := "./testdata/ksonnet/config" - service, err := newService(configFilePath) - require.NoError(t, err) + type fixture struct { + service *Service + path string + } + setup := func(t *testing.T, opts ...pluginOpt) *fixture { + t.Helper() + cic := buildPluginConfig(opts...) + path := filepath.Join(test.GetTestDir(t), "testdata", "kustomize") + s := NewService(*cic) + return &fixture{ + service: s, + path: path, + } + } + t.Run("will match plugin by filename", func(t *testing.T) { + // given + d := Discover{ + FileName: "kustomization.yaml", + } + f := setup(t, withDiscover(d)) - q := apiclient.RepositoryRequest{} - path, err := os.Getwd() - require.NoError(t, err) - q.Path = path + // when + match, err := f.service.matchRepository(context.Background(), f.path) - res1, err := service.MatchRepository(context.Background(), &q) - require.NoError(t, err) - require.True(t, res1.IsSupported) + // then + assert.NoError(t, err) + assert.True(t, match) + }) + t.Run("will not match plugin by filename if file not found", func(t *testing.T) { + // given + d := Discover{ + FileName: "not_found.yaml", + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.NoError(t, err) + assert.False(t, match) + }) + t.Run("will match plugin by glob", func(t *testing.T) { + // given + d := Discover{ + Find: Find{ + Glob: "**/*/plugin.yaml", + }, + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.NoError(t, err) + assert.True(t, match) + }) + t.Run("will not match plugin by glob if not found", func(t *testing.T) { + // given + d := Discover{ + Find: Find{ + Glob: "**/*/not_found.yaml", + }, + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.NoError(t, err) + assert.False(t, match) + }) + t.Run("will match plugin by command when returns any output", func(t *testing.T) { + // given + d := Discover{ + Find: Find{ + Command: Command{ + Command: []string{"echo", "test"}, + }, + }, + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.NoError(t, err) + assert.True(t, match) + }) + t.Run("will not match plugin by command when returns no output", func(t *testing.T) { + // given + d := Discover{ + Find: Find{ + Command: Command{ + Command: []string{"echo"}, + }, + }, + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.NoError(t, err) + assert.False(t, match) + }) + t.Run("will not match plugin by command when command fails", func(t *testing.T) { + // given + d := Discover{ + Find: Find{ + Command: Command{ + Command: []string{"cat", "nil"}, + }, + }, + } + f := setup(t, withDiscover(d)) + + // when + match, err := f.service.matchRepository(context.Background(), f.path) + + // then + assert.Error(t, err) + assert.False(t, match) + }) } func Test_Negative_ConfigFile_DoesnotExist(t *testing.T) { @@ -55,8 +201,7 @@ func TestGenerateManifest(t *testing.T) { service, err := newService(configFilePath) require.NoError(t, err) - q := apiclient.ManifestRequest{} - res1, err := service.GenerateManifest(context.Background(), &q) + res1, err := service.generateManifest(context.Background(), "", nil) require.NoError(t, err) require.NotNil(t, res1) diff --git a/cmpserver/server.go b/cmpserver/server.go index 43a73c72ddca7..68d8c84b4c7da 100644 --- a/cmpserver/server.go +++ b/cmpserver/server.go @@ -1,6 +1,7 @@ package cmpserver import ( + "fmt" "net" "os" "os/signal" @@ -72,7 +73,8 @@ func (a *ArgoCDCMPServer) Run() { signal.Notify(a.stopCh, syscall.SIGINT, syscall.SIGTERM) go a.Shutdown(config.Address()) - grpcServer := a.CreateGRPC() + grpcServer, err := a.CreateGRPC() + errors.CheckError(err) err = grpcServer.Serve(listener) errors.CheckError(err) @@ -82,12 +84,16 @@ func (a *ArgoCDCMPServer) Run() { } // CreateGRPC creates new configured grpc server -func (a *ArgoCDCMPServer) CreateGRPC() *grpc.Server { +func (a *ArgoCDCMPServer) CreateGRPC() (*grpc.Server, error) { server := grpc.NewServer(a.opts...) versionpkg.RegisterVersionServiceServer(server, version.NewServer(nil, func() (bool, error) { return true, nil })) pluginService := plugin.NewService(a.initConstants) + err := pluginService.Init() + if err != nil { + return nil, fmt.Errorf("error initializing plugin service: %s", err) + } apiclient.RegisterConfigManagementPluginServiceServer(server, pluginService) healthService := health.NewServer() @@ -96,7 +102,7 @@ func (a *ArgoCDCMPServer) CreateGRPC() *grpc.Server { // Register reflection service on gRPC server. reflection.Register(server) - return server + return server, nil } func (a *ArgoCDCMPServer) Shutdown(address string) { diff --git a/common/common.go b/common/common.go index 5df7091330d7c..686b084658eaa 100644 --- a/common/common.go +++ b/common/common.go @@ -2,7 +2,11 @@ package common import ( "os" + "path/filepath" + "strconv" "time" + + "github.com/sirupsen/logrus" ) // Default service addresses and URLS of Argo CD internal services @@ -200,6 +204,19 @@ const ( EnvMaxCookieNumber = "ARGOCD_MAX_COOKIE_NUMBER" // EnvPluginSockFilePath allows to override the pluginSockFilePath for repo server and cmp server EnvPluginSockFilePath = "ARGOCD_PLUGINSOCKFILEPATH" + // EnvCMPChunkSize defines the chunk size in bytes used when sending files to the cmp server + EnvCMPChunkSize = "ARGOCD_CMP_CHUNK_SIZE" + // EnvCMPWorkDir defines the full path of the work directory used by the CMP server + EnvCMPWorkDir = "ARGOCD_CMP_WORKDIR" +) + +// Config Management Plugin related constants +const ( + // DefaultCMPChunkSize defines chunk size in bytes used when sending files to the cmp server + DefaultCMPChunkSize = 1024 + + // DefaultCMPWorkDirName defines the work directory name used by the cmp-server + DefaultCMPWorkDirName = "_cmp_server" ) const ( @@ -235,3 +252,26 @@ func GetPluginSockFilePath() string { return pluginSockFilePath } } + +// GetCMPChunkSize will return the env var EnvCMPChunkSize value if defined or DefaultCMPChunkSize otherwise. +// If EnvCMPChunkSize is defined but not a valid int, DefaultCMPChunkSize will be returned +func GetCMPChunkSize() int { + if chunkSizeStr := os.Getenv(EnvCMPChunkSize); chunkSizeStr != "" { + chunkSize, err := strconv.Atoi(chunkSizeStr) + if err != nil { + logrus.Warnf("invalid env var value for %s: not a valid int: %s. Default value will be used.", EnvCMPChunkSize, err) + return DefaultCMPChunkSize + } + return chunkSize + } + return DefaultCMPChunkSize +} + +// GetCMPWorkDir will return the full path of the work directory used by the CMP server. +// This directory and all it's contents will be deleted durring CMP bootstrap. +func GetCMPWorkDir() string { + if workDir := os.Getenv(EnvCMPWorkDir); workDir != "" { + return filepath.Join(workDir, DefaultCMPWorkDirName) + } + return filepath.Join(os.TempDir(), DefaultCMPWorkDirName) +} diff --git a/docs/user-guide/config-management-plugins.md b/docs/user-guide/config-management-plugins.md index 82c12ab2eddad..72f0820b32c57 100644 --- a/docs/user-guide/config-management-plugins.md +++ b/docs/user-guide/config-management-plugins.md @@ -82,9 +82,9 @@ application repository is supported by the plugin or not. command: [sh, -c, find . -name env.yaml] ``` -If `discover.fileName` is not provided, the `discover.find.command` is executed in order to determine whether an -application repository is supported by the plugin or not. The `find` command should return a non-error exit code when -the application source type is supported. +If `discover.fileName` is not provided, the `discover.find.command` is executed in order to determine whether an +application repository is supported by the plugin or not. The `find` command should return a non-error exit code +and produce output to stdout when the application source type is supported. If your plugin makes use of `git` (e.g. `git crypt`), it is advised to set `lockRepo` to `true` so that your plugin will have exclusive access to the repository at the time it is executed. Otherwise, two applications synced at the same time may result in a race condition and sync failure. @@ -137,8 +137,6 @@ containers: name: var-files - mountPath: /home/argocd/cmp-server/plugins name: plugins - - mountPath: /tmp - name: tmp # Remove this volumeMount if you've chosen to bake the config file into the sidecar image. - mountPath: /home/argocd/cmp-server/config/plugin.yaml subPath: plugin.yaml diff --git a/reposerver/repository/repository.go b/reposerver/repository/repository.go index 573193a7d597a..9c897af055693 100644 --- a/reposerver/repository/repository.go +++ b/reposerver/repository/repository.go @@ -28,6 +28,7 @@ import ( gogit "github.com/go-git/go-git/v5" "github.com/google/go-jsonnet" "github.com/google/uuid" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" log "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" "google.golang.org/grpc/codes" @@ -45,6 +46,7 @@ import ( "github.com/argoproj/argo-cd/v2/util/app/discovery" argopath "github.com/argoproj/argo-cd/v2/util/app/path" "github.com/argoproj/argo-cd/v2/util/argo" + "github.com/argoproj/argo-cd/v2/util/cmp" executil "github.com/argoproj/argo-cd/v2/util/exec" "github.com/argoproj/argo-cd/v2/util/git" "github.com/argoproj/argo-cd/v2/util/glob" @@ -360,31 +362,98 @@ func (s *Service) GenerateManifest(ctx context.Context, q *apiclient.ManifestReq return ok, err } + tarConcluded := false + var promise *ManifestResponsePromise + operation := func(repoRoot, commitSHA, cacheKey string, ctxSrc operationContextSrc) error { - res, err = s.runManifestGen(ctx, repoRoot, commitSHA, cacheKey, ctxSrc, q) - return err + promise = s.runManifestGen(ctx, repoRoot, commitSHA, cacheKey, ctxSrc, q) + // The fist channel to send the message will resume this operation. + // The main purpose for using channels here is to be able to unlock + // the repository as soon as the lock in not required anymore. In + // case of CMP the repo is compressed (tgz) and sent to the cmp-server + // for manifest generation. + select { + case err := <-promise.errCh: + return err + case resp := <-promise.responseCh: + res = resp + case tarDone := <-promise.tarDoneCh: + tarConcluded = tarDone + } + return nil } settings := operationSettings{sem: s.parallelismLimitSemaphore, noCache: q.NoCache, noRevisionCache: q.NoRevisionCache, allowConcurrent: q.ApplicationSource.AllowsConcurrentProcessing()} - err = s.runRepoOperation(ctx, q.Revision, q.Repo, q.ApplicationSource, q.VerifySignature, cacheFn, operation, settings) + // if the tarDoneCh message is sent it means that the manifest + // generation is being managed by the cmp-server. In this case + // we have to wait for the responseCh to send the manifest + // response. + if tarConcluded && res == nil { + select { + case resp := <-promise.responseCh: + res = resp + case err := <-promise.errCh: + return nil, err + } + } + return res, err } +type ManifestResponsePromise struct { + responseCh <-chan *apiclient.ManifestResponse + tarDoneCh <-chan bool + errCh <-chan error +} + +func NewManifestResponsePromise(responseCh <-chan *apiclient.ManifestResponse, tarDoneCh <-chan bool, errCh chan error) *ManifestResponsePromise { + return &ManifestResponsePromise{ + responseCh: responseCh, + tarDoneCh: tarDoneCh, + errCh: errCh, + } +} + +type generateManifestCh struct { + responseCh chan<- *apiclient.ManifestResponse + tarDoneCh chan<- bool + errCh chan<- error +} + // runManifestGen will be called by runRepoOperation if: // - the cache does not contain a value for this key // - or, the cache does contain a value for this key, but it is an expired manifest generation entry // - or, NoCache is true // Returns a ManifestResponse, or an error, but not both -func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error) { +func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest) *ManifestResponsePromise { + + responseCh := make(chan *apiclient.ManifestResponse) + tarDoneCh := make(chan bool) + errCh := make(chan error) + responsePromise := NewManifestResponsePromise(responseCh, tarDoneCh, errCh) + + channels := &generateManifestCh{ + responseCh: responseCh, + tarDoneCh: tarDoneCh, + errCh: errCh, + } + go s.runManifestGenAsync(ctx, repoRoot, commitSHA, cacheKey, opContextSrc, q, channels) + return responsePromise +} + +func (s *Service) runManifestGenAsync(ctx context.Context, repoRoot, commitSHA, cacheKey string, opContextSrc operationContextSrc, q *apiclient.ManifestRequest, ch *generateManifestCh) { + defer func() { + close(ch.errCh) + close(ch.responseCh) + }() var manifestGenResult *apiclient.ManifestResponse opContext, err := opContextSrc() if err == nil { - manifestGenResult, err = GenerateManifests(ctx, opContext.appPath, repoRoot, commitSHA, q, false, s.gitCredsStore) + manifestGenResult, err = GenerateManifests(ctx, opContext.appPath, repoRoot, commitSHA, q, false, s.gitCredsStore, WithCMPTarDoneChannel(ch.tarDoneCh)) } if err != nil { - // If manifest generation error caching is enabled if s.initConstants.PauseGenerationAfterFailedGenerationAttempts > 0 { @@ -394,7 +463,8 @@ func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cache cacheErr := s.cache.GetManifests(cacheKey, q.ApplicationSource, q, q.Namespace, q.TrackingMethod, q.AppLabelKey, q.AppName, innerRes) if cacheErr != nil && cacheErr != reposervercache.ErrCacheMiss { log.Warnf("manifest cache set error %s: %v", q.ApplicationSource.String(), cacheErr) - return nil, cacheErr + ch.errCh <- cacheErr + return } // If this is the first error we have seen, store the time (we only use the first failure, as this @@ -409,11 +479,13 @@ func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cache cacheErr = s.cache.SetManifests(cacheKey, q.ApplicationSource, q, q.Namespace, q.TrackingMethod, q.AppLabelKey, q.AppName, innerRes) if cacheErr != nil { log.Warnf("manifest cache set error %s: %v", q.ApplicationSource.String(), cacheErr) - return nil, cacheErr + ch.errCh <- cacheErr + return } } - return nil, err + ch.errCh <- err + return } // Otherwise, no error occurred, so ensure the manifest generation error data in the cache entry is reset before we cache the value manifestGenCacheEntry := cache.CachedManifestResponse{ @@ -429,7 +501,7 @@ func (s *Service) runManifestGen(ctx context.Context, repoRoot, commitSHA, cache if err != nil { log.Warnf("manifest cache set error %s/%s: %v", q.ApplicationSource.String(), cacheKey, err) } - return manifestGenCacheEntry.ManifestResponse, nil + ch.responseCh <- manifestGenCacheEntry.ManifestResponse } // getManifestCacheEntry returns false if the 'generate manifests' operation should be run by runRepoOperation, e.g.: @@ -766,8 +838,31 @@ func getRepoCredential(repoCredentials []*v1alpha1.RepoCreds, repoURL string) *v return nil } +type GenerateManifestOpt func(*generateManifestOpt) +type generateManifestOpt struct { + cmpTarDoneCh chan<- bool +} + +func newGenerateManifestOpt(opts ...GenerateManifestOpt) *generateManifestOpt { + o := &generateManifestOpt{} + for _, opt := range opts { + opt(o) + } + return o +} + +// WithCMPTarDoneChannel defines the channel to be used to signalize when the tarball +// generation is concluded when generating manifests with the CMP server. This is used +// to unlock the git repo as soon as possible. +func WithCMPTarDoneChannel(ch chan<- bool) GenerateManifestOpt { + return func(o *generateManifestOpt) { + o.cmpTarDoneCh = ch + } +} + // GenerateManifests generates manifests from a path -func GenerateManifests(ctx context.Context, appPath, repoRoot, revision string, q *apiclient.ManifestRequest, isLocal bool, gitCredsStore git.CredsStore) (*apiclient.ManifestResponse, error) { +func GenerateManifests(ctx context.Context, appPath, repoRoot, revision string, q *apiclient.ManifestRequest, isLocal bool, gitCredsStore git.CredsStore, opts ...GenerateManifestOpt) (*apiclient.ManifestResponse, error) { + opt := newGenerateManifestOpt(opts...) var targetObjs []*unstructured.Unstructured var dest *v1alpha1.ApplicationDestination @@ -796,7 +891,7 @@ func GenerateManifests(ctx context.Context, appPath, repoRoot, revision string, if q.ApplicationSource.Plugin != nil && q.ApplicationSource.Plugin.Name != "" { targetObjs, err = runConfigManagementPlugin(appPath, repoRoot, env, q, q.Repo.GetGitCreds(gitCredsStore)) } else { - targetObjs, err = runConfigManagementPluginSidecars(ctx, appPath, repoRoot, env, q, q.Repo.GetGitCreds(gitCredsStore)) + targetObjs, err = runConfigManagementPluginSidecars(ctx, appPath, repoRoot, env, q, q.Repo.GetGitCreds(gitCredsStore), opt.cmpTarDoneCh) if err != nil { err = fmt.Errorf("plugin sidecar failed. %s", err.Error()) } @@ -1203,7 +1298,7 @@ func getPluginEnvs(envVars *v1alpha1.Env, q *apiclient.ManifestRequest, creds gi return env, nil } -func runConfigManagementPluginSidecars(ctx context.Context, appPath, repoPath string, envVars *v1alpha1.Env, q *apiclient.ManifestRequest, creds git.Creds) ([]*unstructured.Unstructured, error) { +func runConfigManagementPluginSidecars(ctx context.Context, appPath, repoPath string, envVars *v1alpha1.Env, q *apiclient.ManifestRequest, creds git.Creds, tarDoneCh chan<- bool) ([]*unstructured.Unstructured, error) { // detect config management plugin server (sidecar) conn, cmpClient, err := discovery.DetectConfigManagementPlugin(ctx, appPath) if err != nil { @@ -1211,31 +1306,14 @@ func runConfigManagementPluginSidecars(ctx context.Context, appPath, repoPath st } defer io.Close(conn) - config, err := cmpClient.GetPluginConfig(context.Background(), &pluginclient.ConfigRequest{}) - if err != nil { - return nil, err - } - if config.LockRepo { - manifestGenerateLock.Lock(repoPath) - defer manifestGenerateLock.Unlock(repoPath) - } else if !config.AllowConcurrency { - manifestGenerateLock.Lock(appPath) - defer manifestGenerateLock.Unlock(appPath) - } - // generate manifests using commands provided in plugin config file in detected cmp-server sidecar env, err := getPluginEnvs(envVars, q, creds) if err != nil { return nil, err } - - cmpManifests, err := cmpClient.GenerateManifest(ctx, &pluginclient.ManifestRequest{ - AppPath: appPath, - RepoPath: repoPath, - Env: toEnvEntry(env), - }) + cmpManifests, err := generateManifestsCMP(ctx, appPath, repoPath, env, cmpClient, tarDoneCh) if err != nil { - return nil, err + return nil, fmt.Errorf("error generating manifests in cmp: %s", err) } var manifests []*unstructured.Unstructured for _, manifestString := range cmpManifests.Manifests { @@ -1248,16 +1326,23 @@ func runConfigManagementPluginSidecars(ctx context.Context, appPath, repoPath st return manifests, nil } -func toEnvEntry(envVars []string) []*pluginclient.EnvEntry { - envEntry := make([]*pluginclient.EnvEntry, 0) - for _, env := range envVars { - pair := strings.Split(env, "=") - if len(pair) != 2 { - continue - } - envEntry = append(envEntry, &pluginclient.EnvEntry{Name: pair[0], Value: pair[1]}) +// generateManifestsCMP will send the appPath files to the cmp-server over a gRPC stream. +// The cmp-server will generate the manifests. Returns a response object with the generated +// manifests. +func generateManifestsCMP(ctx context.Context, appPath, repoPath string, env []string, cmpClient pluginclient.ConfigManagementPluginServiceClient, tarDoneCh chan<- bool) (*pluginclient.ManifestResponse, error) { + generateManifestStream, err := cmpClient.GenerateManifest(ctx, grpc_retry.Disable()) + if err != nil { + return nil, fmt.Errorf("error getting generateManifestStream: %s", err) + } + opts := []cmp.SenderOption{ + cmp.WithTarDoneChan(tarDoneCh), } - return envEntry + err = cmp.SendRepoStream(generateManifestStream.Context(), appPath, repoPath, generateManifestStream, env, opts...) + if err != nil { + return nil, fmt.Errorf("error sending file to cmp-server: %s", err) + } + + return generateManifestStream.CloseAndRecv() } func (s *Service) GetAppDetails(ctx context.Context, q *apiclient.RepoServerAppDetailsQuery) (*apiclient.RepoAppDetailsResponse, error) { diff --git a/test/cmp/plugin.yaml b/test/cmp/plugin.yaml index 7ea861a5b4566..1184b715d284d 100644 --- a/test/cmp/plugin.yaml +++ b/test/cmp/plugin.yaml @@ -5,9 +5,9 @@ metadata: spec: version: v1.0 generate: - command: [sh, -c, 'echo "{\"kind\": \"ConfigMap\", \"apiVersion\": \"v1\", \"metadata\": { \"name\": \"$ARGOCD_APP_NAME\", \"namespace\": \"$ARGOCD_APP_NAMESPACE\", \"annotations\": {\"Foo\": \"$FOO\", \"KubeVersion\": \"$KUBE_VERSION\", \"KubeApiVersion\": \"$KUBE_API_VERSIONS\",\"Bar\": \"baz\"}}}"'] + command: [sh, -c, 'kustomize build'] discover: find: - command: [sh, -c, 'echo ">>> $PWD"'] + glob: "**/kustomization.yaml" allowConcurrency: true lockRepo: false diff --git a/test/e2e/testdata/cmp-fileName/plugin.yaml b/test/e2e/testdata/cmp-fileName/plugin.yaml index 31ce21110ae8f..766278c79e773 100644 --- a/test/e2e/testdata/cmp-fileName/plugin.yaml +++ b/test/e2e/testdata/cmp-fileName/plugin.yaml @@ -7,6 +7,4 @@ spec: generate: command: [sh, -c, 'echo "{\"kind\": \"ConfigMap\", \"apiVersion\": \"v1\", \"metadata\": { \"name\": \"$ARGOCD_APP_NAME\", \"namespace\": \"$ARGOCD_APP_NAMESPACE\", \"annotations\": {\"Foo\": \"$FOO\", \"KubeVersion\": \"$KUBE_VERSION\", \"KubeApiVersion\": \"$KUBE_API_VERSIONS\",\"Bar\": \"baz\"}}}"'] discover: - fileName: "./subdir/s*.yaml" - allowConcurrency: true - lockRepo: false \ No newline at end of file + fileName: "subdir/s*.yaml" diff --git a/test/testutil.go b/test/testutil.go index e0815b831a8ec..5835aa2333035 100644 --- a/test/testutil.go +++ b/test/testutil.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "log" "net" + "os" + "testing" "time" "github.com/ghodss/yaml" @@ -80,3 +82,14 @@ func YamlToUnstructured(yamlStr string) *unstructured.Unstructured { } return &unstructured.Unstructured{Object: obj} } + +// GetTestDir will return the full directory path of the +// calling test file. +func GetTestDir(t *testing.T) string { + t.Helper() + cwd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } + return cwd +} diff --git a/util/app/discovery/discovery.go b/util/app/discovery/discovery.go index 74a3eaac386d7..5ebcd0b10bbae 100644 --- a/util/app/discovery/discovery.go +++ b/util/app/discovery/discovery.go @@ -7,12 +7,13 @@ import ( "path/filepath" "strings" - "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" - + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" log "github.com/sirupsen/logrus" pluginclient "github.com/argoproj/argo-cd/v2/cmpserver/apiclient" "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/cmp" "github.com/argoproj/argo-cd/v2/util/io" "github.com/argoproj/argo-cd/v2/util/kustomize" ) @@ -28,11 +29,11 @@ func IsManifestGenerationEnabled(sourceType v1alpha1.ApplicationSourceType, enab return enabled } -func Discover(ctx context.Context, root string, enableGenerateManifests map[string]bool) (map[string]string, error) { +func Discover(ctx context.Context, repoPath string, enableGenerateManifests map[string]bool) (map[string]string, error) { apps := make(map[string]string) // Check if it is CMP - conn, _, err := DetectConfigManagementPlugin(ctx, root) + conn, _, err := DetectConfigManagementPlugin(ctx, repoPath) if err == nil { // Found CMP io.Close(conn) @@ -41,14 +42,14 @@ func Discover(ctx context.Context, root string, enableGenerateManifests map[stri return apps, nil } - err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(repoPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } - dir, err := filepath.Rel(root, filepath.Dir(path)) + dir, err := filepath.Rel(repoPath, filepath.Dir(path)) if err != nil { return err } @@ -81,7 +82,7 @@ func AppType(ctx context.Context, path string, enableGenerateManifests map[strin // 3. check isSupported(path)? // 4.a if no then close connection // 4.b if yes then return conn for detected plugin -func DetectConfigManagementPlugin(ctx context.Context, appPath string) (io.Closer, pluginclient.ConfigManagementPluginServiceClient, error) { +func DetectConfigManagementPlugin(ctx context.Context, repoPath string) (io.Closer, pluginclient.ConfigManagementPluginServiceClient, error) { var conn io.Closer var cmpClient pluginclient.ConfigManagementPluginServiceClient @@ -105,13 +106,13 @@ func DetectConfigManagementPlugin(ctx context.Context, appPath string) (io.Close continue } - resp, err := cmpClient.MatchRepository(ctx, &pluginclient.RepositoryRequest{Path: appPath}) + isSupported, err := matchRepositoryCMP(ctx, repoPath, cmpClient) if err != nil { - log.Errorf("repository %s is not the match because %v", appPath, err) + log.Errorf("repository %s is not the match because %v", repoPath, err) continue } - if !resp.IsSupported { + if !isSupported { log.Debugf("Reponse from socket file %s is not supported", file.Name()) io.Close(conn) } else { @@ -122,7 +123,27 @@ func DetectConfigManagementPlugin(ctx context.Context, appPath string) (io.Close } if !connFound { - return nil, nil, fmt.Errorf("Couldn't find cmp-server plugin supporting repository %s", appPath) + return nil, nil, fmt.Errorf("Couldn't find cmp-server plugin supporting repository %s", repoPath) } return conn, cmpClient, err } + +// matchRepositoryCMP will send the repoPath to the cmp-server. The cmp-server will +// inspect the files and return true if the repo is supported for manifest generation. +// Will return false otherwise. +func matchRepositoryCMP(ctx context.Context, repoPath string, client pluginclient.ConfigManagementPluginServiceClient) (bool, error) { + matchRepoStream, err := client.MatchRepository(ctx, grpc_retry.Disable()) + if err != nil { + return false, fmt.Errorf("error getting stream client: %s", err) + } + + err = cmp.SendRepoStream(ctx, repoPath, repoPath, matchRepoStream, []string{}) + if err != nil { + return false, fmt.Errorf("error sending stream: %s", err) + } + resp, err := matchRepoStream.CloseAndRecv() + if err != nil { + return false, fmt.Errorf("error receiving stream response: %s", err) + } + return resp.GetIsSupported(), nil +} diff --git a/util/cmp/stream.go b/util/cmp/stream.go new file mode 100644 index 0000000000000..5147e5fb8add6 --- /dev/null +++ b/util/cmp/stream.go @@ -0,0 +1,298 @@ +package cmp + +import ( + "bufio" + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strings" + + log "github.com/sirupsen/logrus" + + pluginclient "github.com/argoproj/argo-cd/v2/cmpserver/apiclient" + "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/util/io/files" +) + +// StreamSender defines the contract to send App files over stream +type StreamSender interface { + Send(*pluginclient.AppStreamRequest) error +} + +// StreamReceiver defines the contract for receiving Application's files +// over gRPC stream +type StreamReceiver interface { + Recv() (*pluginclient.AppStreamRequest, error) +} + +// ReceiveRepoStream will receive the repository files and save them +// in destDir. Will return the stream metadata if no error. Metadata +// will be nil in case of errors. +func ReceiveRepoStream(ctx context.Context, receiver StreamReceiver, destDir string) (*pluginclient.ManifestRequestMetadata, error) { + header, err := receiver.Recv() + if err != nil { + return nil, fmt.Errorf("error receiving stream header: %w", err) + } + if header == nil || header.GetMetadata() == nil { + return nil, fmt.Errorf("error getting stream metadata: metadata is nil") + } + metadata := header.GetMetadata() + + tgzFile, err := receiveFile(ctx, receiver, metadata.GetChecksum(), destDir) + if err != nil { + return nil, fmt.Errorf("error receiving tgz file: %w", err) + } + err = files.Untgz(destDir, tgzFile) + if err != nil { + return nil, fmt.Errorf("error decompressing tgz file: %w", err) + } + err = os.Remove(tgzFile.Name()) + if err != nil { + log.Warnf("error removing the tgz file %q: %s", tgzFile.Name(), err) + } + return metadata, nil +} + +// SenderOption defines the function type to by used by specific options +type SenderOption func(*senderOption) + +type senderOption struct { + chunkSize int + tarDoneChan chan<- bool +} + +func newSenderOption(opts ...SenderOption) *senderOption { + so := &senderOption{ + chunkSize: common.GetCMPChunkSize(), + } + for _, opt := range opts { + opt(so) + } + return so +} + +// WithChunkSize defines the chunk size used while sending files over +// the gRPC stream. Will only overwrite the DefaultChunkSize if the +// given size is greater than zero. +func WithChunkSize(size int) SenderOption { + return func(opt *senderOption) { + if size > 0 { + opt.chunkSize = size + } + } +} + +func WithTarDoneChan(ch chan<- bool) SenderOption { + return func(opt *senderOption) { + opt.tarDoneChan = ch + } +} + +// SendRepoStream will compress the files under the given repoPath and send +// them using the plugin stream sender. +func SendRepoStream(ctx context.Context, appPath, repoPath string, sender StreamSender, env []string, opts ...SenderOption) error { + opt := newSenderOption(opts...) + + // compress all files in appPath in tgz + tgz, checksum, err := compressFiles(repoPath) + if err != nil { + return fmt.Errorf("error compressing repo files: %w", err) + } + defer closeAndDelete(tgz) + if opt.tarDoneChan != nil { + opt.tarDoneChan <- true + close(opt.tarDoneChan) + } + + fi, err := tgz.Stat() + if err != nil { + return fmt.Errorf("error getting tgz stat: %w", err) + } + appRelPath, err := files.RelativePath(appPath, repoPath) + if err != nil { + return fmt.Errorf("error building app relative path: %s", err) + } + // send metadata first + mr := appMetadataRequest(filepath.Base(appPath), appRelPath, env, checksum, fi.Size()) + err = sender.Send(mr) + if err != nil { + return fmt.Errorf("error sending generate manifest metadata to cmp-server: %w", err) + } + + // send the compressed file + err = sendFile(ctx, sender, tgz, opt) + if err != nil { + return fmt.Errorf("error sending tgz file to cmp-server: %w", err) + } + return nil +} + +// sendFile will send the file over the gRPC stream using a +// buffer. +func sendFile(ctx context.Context, sender StreamSender, file *os.File, opt *senderOption) error { + reader := bufio.NewReader(file) + chunk := make([]byte, opt.chunkSize) + for { + if ctx != nil { + if err := ctx.Err(); err != nil { + return fmt.Errorf("client stream context error: %w", err) + } + } + n, err := reader.Read(chunk) + if n > 0 { + fr := appFileRequest(chunk[:n]) + if e := sender.Send(fr); e != nil { + return fmt.Errorf("error sending stream: %w", err) + } + } + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("buffer reader error: %w", err) + } + } + return nil +} + +func closeAndDelete(f *os.File) { + if f == nil { + return + } + if err := f.Close(); err != nil { + log.Warnf("error closing file %q: %s", f.Name(), err) + } + if err := os.Remove(f.Name()); err != nil { + log.Warnf("error removing file %q: %s", f.Name(), err) + } +} + +// compressFiles will create a tgz file with all contents of appPath +// directory excluding the .git folder. Returns the file alongside +// its sha256 hash to be used as checksum. It is the responsibility +// of the caller to close the file. +func compressFiles(appPath string) (*os.File, string, error) { + excluded := []string{".git"} + appName := filepath.Base(appPath) + tempDir, err := files.CreateTempDir(os.TempDir()) + if err != nil { + return nil, "", fmt.Errorf("error creating tempDir for compressing files: %s", err) + } + tgzFile, err := ioutil.TempFile(tempDir, appName) + if err != nil { + return nil, "", fmt.Errorf("error creating app temp tgz file: %w", err) + } + hasher := sha256.New() + err = files.Tgz(appPath, excluded, tgzFile, hasher) + if err != nil { + closeAndDelete(tgzFile) + return nil, "", fmt.Errorf("error creating app tgz file: %w", err) + } + checksum := hex.EncodeToString(hasher.Sum(nil)) + hasher.Reset() + + // reposition the offset to the beginning of the file for proper reads + _, err = tgzFile.Seek(0, io.SeekStart) + if err != nil { + closeAndDelete(tgzFile) + return nil, "", fmt.Errorf("error processing tgz file: %w", err) + } + return tgzFile, checksum, nil +} + +// receiveFile will receive the file from the gRPC stream and save it in the dst folder. +// Returns error if checksum doesn't match the one provided in the fileMetadata. +// It is responsibility of the caller to close the returned file. +func receiveFile(ctx context.Context, receiver StreamReceiver, checksum, dst string) (*os.File, error) { + fileBuffer := bytes.Buffer{} + hasher := sha256.New() + for { + if ctx != nil { + if err := ctx.Err(); err != nil { + return nil, fmt.Errorf("stream context error: %w", err) + } + } + req, err := receiver.Recv() + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("stream Recv error: %w", err) + } + f := req.GetFile() + if f == nil { + return nil, fmt.Errorf("stream request file is nil") + } + _, err = fileBuffer.Write(f.Chunk) + if err != nil { + return nil, fmt.Errorf("error writing file buffer: %w", err) + } + _, err = hasher.Write(f.Chunk) + if err != nil { + return nil, fmt.Errorf("error writing hasher: %w", err) + } + } + if hex.EncodeToString(hasher.Sum(nil)) != checksum { + return nil, fmt.Errorf("file checksum validation error") + } + + file, err := ioutil.TempFile(dst, "") + if err != nil { + return nil, fmt.Errorf("error creating file: %w", err) + } + _, err = fileBuffer.WriteTo(file) + if err != nil { + closeAndDelete(file) + return nil, fmt.Errorf("error writing file: %w", err) + } + _, err = file.Seek(0, io.SeekStart) + if err != nil { + closeAndDelete(file) + return nil, fmt.Errorf("seek error: %w", err) + } + return file, nil +} + +// appFileRequest build the file payload for the ManifestRequest +func appFileRequest(chunk []byte) *pluginclient.AppStreamRequest { + return &pluginclient.AppStreamRequest{ + Request: &pluginclient.AppStreamRequest_File{ + File: &pluginclient.File{ + Chunk: chunk, + }, + }, + } +} + +// appMetadataRequest build the metadata payload for the ManifestRequest +func appMetadataRequest(appName, appRelPath string, env []string, checksum string, size int64) *pluginclient.AppStreamRequest { + return &pluginclient.AppStreamRequest{ + Request: &pluginclient.AppStreamRequest_Metadata{ + Metadata: &pluginclient.ManifestRequestMetadata{ + AppName: appName, + AppRelPath: appRelPath, + Checksum: checksum, + Size_: size, + Env: toEnvEntry(env), + }, + }, + } +} + +func toEnvEntry(envVars []string) []*pluginclient.EnvEntry { + envEntry := make([]*pluginclient.EnvEntry, 0) + for _, env := range envVars { + pair := strings.Split(env, "=") + if len(pair) != 2 { + continue + } + envEntry = append(envEntry, &pluginclient.EnvEntry{Name: pair[0], Value: pair[1]}) + } + return envEntry +} diff --git a/util/cmp/stream_test.go b/util/cmp/stream_test.go new file mode 100644 index 0000000000000..fb1b5137567a4 --- /dev/null +++ b/util/cmp/stream_test.go @@ -0,0 +1,97 @@ +package cmp_test + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + pluginclient "github.com/argoproj/argo-cd/v2/cmpserver/apiclient" + "github.com/argoproj/argo-cd/v2/test" + "github.com/argoproj/argo-cd/v2/util/cmp" + "github.com/argoproj/argo-cd/v2/util/io/files" +) + +type streamMock struct { + messages chan *pluginclient.AppStreamRequest + done chan bool +} + +func (m *streamMock) Recv() (*pluginclient.AppStreamRequest, error) { + select { + case message := <-m.messages: + return message, nil + case <-m.done: + return nil, io.EOF + case <-time.After(500 * time.Millisecond): + return nil, fmt.Errorf("timeout receiving message mock") + } +} + +func (m *streamMock) Send(message *pluginclient.AppStreamRequest) error { + m.messages <- message + return nil +} + +func newStreamMock() *streamMock { + messagesCh := make(chan *pluginclient.AppStreamRequest) + doneCh := make(chan bool) + return &streamMock{ + messages: messagesCh, + done: doneCh, + } +} + +func TestReceiveApplicationStream(t *testing.T) { + t.Run("will receive the application stream successfully", func(t *testing.T) { + // given + streamMock := newStreamMock() + appDir := filepath.Join(getTestDataDir(t), "app") + workdir, err := files.CreateTempDir("") + require.NoError(t, err) + defer func() { + close(streamMock.messages) + os.RemoveAll(workdir) + }() + go streamMock.sendFile(context.Background(), t, appDir, streamMock, []string{"env1", "env2"}) + + // when + env, err := cmp.ReceiveRepoStream(context.Background(), streamMock, workdir) + + // then + require.NoError(t, err) + assert.NotEmpty(t, workdir) + files, err := ioutil.ReadDir(workdir) + require.NoError(t, err) + require.Equal(t, 2, len(files)) + names := []string{} + for _, f := range files { + names = append(names, f.Name()) + } + assert.Contains(t, names, "README.md") + assert.Contains(t, names, "applicationset") + assert.NotNil(t, env) + }) +} + +func (m *streamMock) sendFile(ctx context.Context, t *testing.T, basedir string, sender cmp.StreamSender, env []string) { + t.Helper() + defer func() { + m.done <- true + }() + err := cmp.SendRepoStream(ctx, basedir, basedir, sender, env) + require.NoError(t, err) +} + +// getTestDataDir will return the full path of the testdata dir +// under the running test folder. +func getTestDataDir(t *testing.T) string { + return filepath.Join(test.GetTestDir(t), "testdata") +} diff --git a/util/cmp/testdata/app/README.md b/util/cmp/testdata/app/README.md new file mode 100644 index 0000000000000..f3954314c1026 --- /dev/null +++ b/util/cmp/testdata/app/README.md @@ -0,0 +1 @@ +# Readme diff --git a/util/cmp/testdata/app/applicationset/latest/kustomization.yaml b/util/cmp/testdata/app/applicationset/latest/kustomization.yaml new file mode 100644 index 0000000000000..70a8b02729e73 --- /dev/null +++ b/util/cmp/testdata/app/applicationset/latest/kustomization.yaml @@ -0,0 +1,4 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: + - https://raw.githubusercontent.com/argoproj/applicationset/master/manifests/install.yaml \ No newline at end of file diff --git a/util/cmp/testdata/app/applicationset/stable/kustomization.yaml b/util/cmp/testdata/app/applicationset/stable/kustomization.yaml new file mode 100644 index 0000000000000..705a670cc2aad --- /dev/null +++ b/util/cmp/testdata/app/applicationset/stable/kustomization.yaml @@ -0,0 +1,4 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +resources: + - https://raw.githubusercontent.com/argoproj/applicationset/v0.3.0/manifests/install.yaml \ No newline at end of file diff --git a/util/cmp/testdata/example.tar.gz b/util/cmp/testdata/example.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..f4a04b2f539c26279a970e04802f1cb9ffe59c1f GIT binary patch literal 322 zcmV-I0loeoiwFP!00000|LoPxPJ=)Y2XM|j1&L?(0}u>16MHgVdhkAlrCqxpWM`<+ zr#GTD+QfQj+G?`DkW3&1hRpoKe0FntKV!B_jze*WgY+jC-G