-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Table addition validation #2469
base: main
Are you sure you want to change the base?
Conversation
66a17c0
to
80d31a7
Compare
@@ -190,3 +196,93 @@ func (h *FlowRequestHandler) CheckIfMirrorNameExists(ctx context.Context, mirror | |||
|
|||
return nameExists.Bool, nil | |||
} | |||
|
|||
func (h *FlowRequestHandler) ValidateTableAdditions(ctx context.Context, req *protos.ValidateTableAdditionsRequest) (*protos.ValidateCDCMirrorResponse, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want to split the API into validating and then doing the actual operation? it seems more reasonable to have the existing endpoint check and then error out, would map better to the ClickPipes UI as well
Requires publication name as string and table values in the form of (schema::text, table::text) pairs. | ||
The schema and table names should be quoted and escaped. | ||
*/ | ||
func (c *PostgresConnector) CheckIfTablesAreInPublication(ctx context.Context, pubName string, tableValues []string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can share code with the check done in AddTablesToPublication
err = chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | ||
TableMappings: req.ConnectionConfigs.TableMappings, | ||
TableNameSchemaMapping: res, | ||
SyncedAtColName: req.ConnectionConfigs.SyncedAtColName, | ||
Resync: req.ConnectionConfigs.Resync, | ||
DoInitialSnapshot: req.ConnectionConfigs.DoInitialSnapshot, | ||
}) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | |
TableMappings: req.ConnectionConfigs.TableMappings, | |
TableNameSchemaMapping: res, | |
SyncedAtColName: req.ConnectionConfigs.SyncedAtColName, | |
Resync: req.ConnectionConfigs.Resync, | |
DoInitialSnapshot: req.ConnectionConfigs.DoInitialSnapshot, | |
}) | |
if err != nil { | |
if err := chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | |
TableMappings: req.ConnectionConfigs.TableMappings, | |
TableNameSchemaMapping: res, | |
SyncedAtColName: req.ConnectionConfigs.SyncedAtColName, | |
Resync: req.ConnectionConfigs.Resync, | |
DoInitialSnapshot: req.ConnectionConfigs.DoInitialSnapshot, | |
}); err != nil { |
err = chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | ||
TableMappings: req.AddedTables, | ||
TableNameSchemaMapping: res, | ||
SyncedAtColName: req.SyncedAtColName, | ||
// TODO: Implement resync and cdc-only for table addition | ||
Resync: false, | ||
DoInitialSnapshot: true, | ||
}) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | |
TableMappings: req.AddedTables, | |
TableNameSchemaMapping: res, | |
SyncedAtColName: req.SyncedAtColName, | |
// TODO: Implement resync and cdc-only for table addition | |
Resync: false, | |
DoInitialSnapshot: true, | |
}) | |
if err != nil { | |
if err := chPeer.CheckDestinationTables(ctx, connclickhouse.ClickHouseDestinationCheckInput{ | |
TableMappings: req.AddedTables, | |
TableNameSchemaMapping: res, | |
SyncedAtColName: req.SyncedAtColName, | |
// TODO: Implement resync and cdc-only for table addition | |
Resync: false, | |
DoInitialSnapshot: true, | |
}); err != nil { |
return nil, errors.New("source peer config is not postgres") | ||
} | ||
|
||
pgPeer, err := connpostgres.NewPostgresConnector(ctx, nil, sourcePeerConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to stop creating postgres connectors like this in endpoints
See #2525 where I put validation behind a connector interface
publicationErr := pgPeer.CheckIfTablesAreInPublication(ctx, req.PublicationName, addedTableValues) | ||
if publicationErr != nil { | ||
h.alerter.LogNonFlowWarning(ctx, telemetry.EditMirror, req.FlowJobName, publicationErr.Error()) | ||
return nil, publicationErr | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publicationErr := pgPeer.CheckIfTablesAreInPublication(ctx, req.PublicationName, addedTableValues) | |
if publicationErr != nil { | |
h.alerter.LogNonFlowWarning(ctx, telemetry.EditMirror, req.FlowJobName, publicationErr.Error()) | |
return nil, publicationErr | |
} | |
if err := pgPeer.CheckIfTablesAreInPublication(ctx, req.PublicationName, addedTableValues); err != nil { | |
h.alerter.LogNonFlowWarning(ctx, telemetry.EditMirror, req.FlowJobName, err.Error()) | |
return nil, err | |
} |
I know it seems nice to avoid shadowing, but what happens is someone makes a change to the error message or something, assumes the variable is called err
, & ends up having their code reference some always-nil err
from earlier in the code
This PR introduces validation for table addition. The validation consists of:
A new Flow API endpoint has been made for this, and the UI sends a request to this if there are additional tables selected.