Skip to content

Commit

Permalink
fix(pubsub): pipe revision ID in name in DeleteSchemaRevision (#7519)
Browse files Browse the repository at this point in the history
Since `RevisionID` is deprecated in `DeleteSchemaRevisionRequest`, we need to pass in the revisionID directly in the schema name field instead. This PR also updates the fake to reflect that `RevisionID` is not being used.
  • Loading branch information
hongalex authored Mar 10, 2023
1 parent 1e821bd commit e211635
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 16 deletions.
28 changes: 17 additions & 11 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,20 +1544,26 @@ func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRe
return ret.(*pb.Schema), err
}

if sc, ok := s.schemas[req.Name]; ok {
if len(sc) == 1 {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q@%q", req.Name, req.RevisionId)
schemaPath := strings.Split(req.Name, "@")
if len(schemaPath) != 2 {
return nil, status.Errorf(codes.InvalidArgument, "could not parse revision ID from schema name: %q", req.Name)
}
schemaName := schemaPath[0]
revID := schemaPath[1]
schemaRevisions, ok := s.schemas[schemaName]
if ok {
if len(schemaRevisions) == 1 {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q", req.Name)
}
}

schema := s.schemas[req.Name]
for i, sc := range schema {
if sc.RevisionId == req.RevisionId {
s.schemas[req.Name] = append(schema[:i], schema[i+1:]...)
return schema[len(schema)-1], nil
for i, sc := range schemaRevisions {
if sc.RevisionId == revID {
s.schemas[schemaName] = append(schemaRevisions[:i], schemaRevisions[i+1:]...)
return schemaRevisions[len(schemaRevisions)-1], nil
}
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)

return nil, status.Errorf(codes.NotFound, "schema %q not found", req.Name)
}

func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error) {
Expand Down
8 changes: 4 additions & 4 deletions pubsub/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func NewSchemaClient(ctx context.Context, projectID string, opts ...option.Clien

// SchemaConfig is a reference to a PubSub schema.
type SchemaConfig struct {
// The name of the schema populated by the server. This field is read-only.
// Name of the schema.
// Format is `projects/{project}/schemas/{schema}`
Name string

// The type of the schema definition.
Expand Down Expand Up @@ -264,10 +265,9 @@ func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID

// DeleteSchemaRevision deletes a specific schema revision.
func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
schemaPath := fmt.Sprintf("projects/%s/schemas/%s@%s", c.projectID, schemaID, revisionID)
schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{
Name: schemaPath,
RevisionId: revisionID,
Name: schemaPath,
})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pubsub/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestSchema_SchemaRevisions(t *testing.T) {
got = append(got, sc)
}
if gotLen, wantLen := len(got), 2; gotLen != wantLen {
t.Errorf("lListSchemaRevisions() got %d revisions, want: %d", gotLen, wantLen)
t.Errorf("ListSchemaRevisions() got %d revisions, want: %d", gotLen, wantLen)
}
}

Expand Down

0 comments on commit e211635

Please sign in to comment.