Skip to content

Commit

Permalink
Avoid writing to shared memory in go-routine.
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Nelson <minelson@vmware.com>
  • Loading branch information
absoludity committed Mar 1, 2022
1 parent 4738d2b commit 6483385
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,97 +50,89 @@ func (s *Server) GetAvailablePackageSummaries(ctx context.Context, request *core
return nil, statuserror.FromK8sError("get", "PackageMetadata", "", err)
}

// paginate the list of results
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, len(pkgMetadatas))

// Create a channel to receive any errors. The channel is also used as a
// Create a channel to receive any results. The channel is also used as a
// natural waitgroup to synchronize the results.
errs := make(chan error)
numRoutines := 0
type fetchResult struct {
index int
availablePackageSummary *corev1.AvailablePackageSummary
err error
}
fetchResults := make(chan fetchResult)
numFetched := 0

// TODO(agamez): DRY up this logic (cf GetInstalledPackageSummaries)
if len(pkgMetadatas) > 0 {
startAt := -1
startAt := 0
if pageSize > 0 {
startAt = int(pageSize) * pageOffset
}
for i, pkgMetadata := range pkgMetadatas {
if startAt <= i {
numRoutines++
numFetched++
go func(i int, pkgMetadata *datapackagingv1alpha1.PackageMetadata) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
errs <- statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
return
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
errs <- err
return
}
availablePackageSummary, err := s.fetchPackageSummaryForMeta(ctx, cluster, namespace, pkgMetadata)

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
errs <- statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
return
}

// append the availablePackageSummary to the slice
availablePackageSummaries[i] = availablePackageSummary
// Ensure we signal a completion.
errs <- nil
// The index of this result is relative to the page.
fetchResults <- fetchResult{i - startAt, availablePackageSummary, err}
}(i, pkgMetadata)
}
// if we've reached the end of the page, stop iterating
if pageSize > 0 && len(availablePackageSummaries) == int(pageSize) {
if pageSize > 0 && numFetched == int(pageSize) {
break
}
}
}
// Return an error if any is found. We continue only if there were no
// errors.
for i := 0; i < numRoutines; i++ {
err := <-errs
if err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
}
}

// TODO(agamez): the slice with make is filled with <nil>, in case of an error in the
// i goroutine, the i-th <nil> stub will remain. Check if 'errgroup' works here, but I haven't
// been able so far.
// An alternative is using channels to perform a fine-grained control... but not sure if it worths
// However, should we just return an error if so? See https://github.com/kubeapps/kubeapps/pull/3784#discussion_r754836475
// filter out <nil> values
availablePackageSummariesNilSafe := []*corev1.AvailablePackageSummary{}
availablePackageSummaries := make([]*corev1.AvailablePackageSummary, numFetched)
categories := []string{}
for _, availablePackageSummary := range availablePackageSummaries {
if availablePackageSummary != nil {
availablePackageSummariesNilSafe = append(availablePackageSummariesNilSafe, availablePackageSummary)
categories = append(categories, availablePackageSummary.Categories...)

for i := 0; i < numFetched; i++ {
fetchResult := <-fetchResults
if fetchResult.err != nil {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("unexpected error while gathering available packages: %v", err))
}
// append the availablePackageSummary to the slice
availablePackageSummaries[fetchResult.index] = fetchResult.availablePackageSummary
categories = append(categories, fetchResult.availablePackageSummary.Categories...)
}

// Only return a next page token if the request was for pagination and
// the results are a full page.
nextPageToken := ""
if pageSize > 0 && len(availablePackageSummariesNilSafe) == int(pageSize) {
if pageSize > 0 && numFetched == int(pageSize) {
nextPageToken = fmt.Sprintf("%d", pageOffset+1)
}
response := &corev1.GetAvailablePackageSummariesResponse{
AvailablePackageSummaries: availablePackageSummariesNilSafe,
AvailablePackageSummaries: availablePackageSummaries,
Categories: categories,
NextPageToken: nextPageToken,
}
return response, nil
}

func (s *Server) fetchPackageSummaryForMeta(ctx context.Context, cluster, namespace string, pkgMetadata *datapackagingv1alpha1.PackageMetadata) (*corev1.AvailablePackageSummary, error) {
// fetch the associated packages
// Use the field selector to return only Package CRs that match on the spec.refName.
// TODO(agamez): perhaps we better fetch all the packages and filter ourselves to reduce the k8s calls
fieldSelector := fmt.Sprintf("spec.refName=%s", pkgMetadata.Name)
pkgs, err := s.getPkgsWithFieldSelector(ctx, cluster, namespace, fieldSelector)
if err != nil {
return nil, statuserror.FromK8sError("get", "Package", pkgMetadata.Name, err)
}
pkgVersionsMap, err := getPkgVersionsMap(pkgs)
if err != nil {
return nil, err
}

// generate the availablePackageSummary from the fetched information
availablePackageSummary, err := s.buildAvailablePackageSummary(pkgMetadata, pkgVersionsMap, cluster)
if err != nil {
return nil, statuserror.FromK8sError("create", "AvailablePackageSummary", pkgMetadata.Name, err)
}

return availablePackageSummary, nil
}

// GetAvailablePackageVersions returns the package versions managed by the 'kapp_controller' plugin
func (s *Server) GetAvailablePackageVersions(ctx context.Context, request *corev1.GetAvailablePackageVersionsRequest) (*corev1.GetAvailablePackageVersionsResponse, error) {
// Retrieve parameters from the request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
name string
existingObjects []runtime.Object
expectedPackages []*corev1.AvailablePackageSummary
paginationOptions corev1.PaginationOptions
expectedStatusCode codes.Code
}{
{
Expand Down Expand Up @@ -454,6 +455,109 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
},
},
{
name: "it returns paginated carvel package summaries",
existingObjects: []runtime.Object{
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Classic Tetris",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "A great game for arcade gamers",
LongDescription: "A few sentences but not really a readme",
Categories: []string{"logging", "daemon-set"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tetris inc.",
},
},
&datapackagingv1alpha1.PackageMetadata{
TypeMeta: metav1.TypeMeta{
Kind: pkgMetadataResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com",
},
Spec: datapackagingv1alpha1.PackageMetadataSpec{
DisplayName: "Tombi!",
IconSVGBase64: "Tm90IHJlYWxseSBTVkcK",
ShortDescription: "An awesome game from the 90's",
LongDescription: "Tombi! is an open world platform-adventure game with RPG elements.",
Categories: []string{"platforms", "rpg"},
Maintainers: []datapackagingv1alpha1.Maintainer{{Name: "person1"}, {Name: "person2"}},
SupportDescription: "Some support information",
ProviderName: "Tombi!",
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tetris.foo.example.com.1.2.3",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tetris.foo.example.com",
Version: "1.2.3",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1984, time.June, 6, 0, 0, 0, 0, time.UTC)},
},
},
&datapackagingv1alpha1.Package{
TypeMeta: metav1.TypeMeta{
Kind: pkgResource,
APIVersion: datapackagingAPIVersion,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "tombi.foo.example.com.1.2.5",
},
Spec: datapackagingv1alpha1.PackageSpec{
RefName: "tombi.foo.example.com",
Version: "1.2.5",
Licenses: []string{"my-license"},
ReleaseNotes: "release notes",
CapactiyRequirementsDescription: "capacity description",
ReleasedAt: metav1.Time{time.Date(1997, time.December, 25, 0, 0, 0, 0, time.UTC)},
},
},
},
paginationOptions: corev1.PaginationOptions{
PageToken: "1",
PageSize: 1,
},
expectedPackages: []*corev1.AvailablePackageSummary{
{
AvailablePackageRef: &corev1.AvailablePackageReference{
Context: defaultContext,
Plugin: &pluginDetail,
Identifier: "tombi.foo.example.com",
},
Name: "tombi.foo.example.com",
DisplayName: "Tombi!",
LatestVersion: &corev1.PackageAppVersion{
PkgVersion: "1.2.5",
AppVersion: "1.2.5",
},
IconUrl: "",
ShortDescription: "An awesome game from the 90's",
Categories: []string{"platforms", "rpg"},
},
},
},
}

for _, tc := range testCases {
Expand All @@ -479,7 +583,10 @@ func TestGetAvailablePackageSummaries(t *testing.T) {
},
}

response, err := s.GetAvailablePackageSummaries(context.Background(), &corev1.GetAvailablePackageSummariesRequest{Context: defaultContext})
response, err := s.GetAvailablePackageSummaries(context.Background(), &corev1.GetAvailablePackageSummariesRequest{
Context: defaultContext,
PaginationOptions: &tc.paginationOptions,
})

if got, want := status.Code(err), tc.expectedStatusCode; got != want {
t.Fatalf("got: %d, want: %d, err: %+v", got, want, err)
Expand Down

0 comments on commit 6483385

Please sign in to comment.