Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions internal/migration_acceptance_tests/column_cases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,58 @@ var columnAcceptanceTestCases = []acceptanceTestCase{
`,
},
},
{
name: "Incompatible type conversion (integer to timestamptz) - Issue #179",
oldSchemaDDL: []string{
`
CREATE TABLE activity(
id INT PRIMARY KEY,
ts INTEGER NOT NULL
);
`,
},
newSchemaDDL: []string{
`
CREATE TABLE activity(
id INT PRIMARY KEY,
ts TIMESTAMPTZ NOT NULL
);
`,
},
expectedPlanDDL: []string{
"ALTER TABLE \"public\".\"activity\" DROP COLUMN \"ts\"",
"ALTER TABLE \"public\".\"activity\" ADD COLUMN \"ts\" timestamp with time zone NOT NULL",
},
expectedHazardTypes: []diff.MigrationHazardType{
diff.MigrationHazardTypeDeletesData,
},
},
{
name: "Incompatible type conversion (timestamp to integer) - Issue #179 reverse",
oldSchemaDDL: []string{
`
CREATE TABLE activity(
id INT PRIMARY KEY,
ts TIMESTAMP NOT NULL
);
`,
},
newSchemaDDL: []string{
`
CREATE TABLE activity(
id INT PRIMARY KEY,
ts INTEGER NOT NULL
);
`,
},
expectedPlanDDL: []string{
"ALTER TABLE \"public\".\"activity\" DROP COLUMN \"ts\"",
"ALTER TABLE \"public\".\"activity\" ADD COLUMN \"ts\" integer NOT NULL",
},
expectedHazardTypes: []diff.MigrationHazardType{
diff.MigrationHazardTypeDeletesData,
},
},
}

func TestColumnTestCases(t *testing.T) {
Expand Down
237 changes: 229 additions & 8 deletions pkg/diff/sql_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,44 @@ func (csg *columnSQLVertexGenerator) Alter(diff columnDiff) ([]Statement, error)
var stmts []Statement
alterColumnPrefix := fmt.Sprintf("%s ALTER COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(newColumn.Name))

// Check if we have an incompatible type conversion that requires drop/add
hasIncompatibleTypeConversion := !strings.EqualFold(oldColumn.Type, newColumn.Type) &&
!isTypeConversionCompatible(oldColumn.Type, newColumn.Type)

if hasIncompatibleTypeConversion {
// For incompatible types, we need to drop and recreate the column
// This will lose all data in the column, but will handle all property changes at once

// Drop the column
dropStmt := Statement{
DDL: fmt.Sprintf("%s DROP COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(oldColumn.Name)),
Timeout: statementTimeoutDefault,
LockTimeout: lockTimeoutDefault,
Hazards: []MigrationHazard{
{
Type: MigrationHazardTypeDeletesData,
Message: fmt.Sprintf("Converting column from %s to %s requires dropping and recreating the column. All existing data in this column will be permanently lost.", oldColumn.Type, newColumn.Type),
},
},
}

// Add the column back with the new type and all new properties
columnDef, err := buildColumnDefinition(newColumn)
if err != nil {
return nil, fmt.Errorf("building column definition for incompatible type conversion: %w", err)
}

addStmt := Statement{
DDL: fmt.Sprintf("%s ADD COLUMN %s", alterTablePrefix(csg.tableName), columnDef),
Timeout: statementTimeoutDefault,
LockTimeout: lockTimeoutDefault,
}

return []Statement{dropStmt, addStmt}, nil
}

// For compatible type conversions or no type change, handle each property individually

// Adding a "NOT NULL" constraint must come before updating a column to be an identity column, otherwise
// the add statement will fail because a column must be non-nullable to become an identity column.
if oldColumn.IsNullable != newColumn.IsNullable && !newColumn.IsNullable {
Expand Down Expand Up @@ -1233,6 +1271,8 @@ func (csg *columnSQLVertexGenerator) Alter(diff columnDiff) ([]Statement, error)

if !strings.EqualFold(oldColumn.Type, newColumn.Type) ||
!strings.EqualFold(oldColumn.Collation.GetFQEscapedName(), newColumn.Collation.GetFQEscapedName()) {

// We already handled incompatible conversions above, so this must be compatible
stmts = append(stmts,
[]Statement{
csg.generateTypeTransformationStatement(
Expand Down Expand Up @@ -1273,12 +1313,177 @@ func (csg *columnSQLVertexGenerator) Alter(diff columnDiff) ([]Statement, error)
return stmts, nil
}

// TypeCoercionInfo represents information about type coercion capabilities
type TypeCoercionInfo struct {
// IsCoercible indicates if the types can be cast at all
IsCoercible bool
// IsBinaryCoercible indicates if the conversion doesn't require a table rewrite
// (i.e., the binary representation is compatible)
IsBinaryCoercible bool
}

// GetTypeCoercionInfo analyzes the coercion capabilities between two PostgreSQL types
func GetTypeCoercionInfo(oldType, newType string) TypeCoercionInfo {
// Normalize type names to lowercase for comparison
oldType = strings.ToLower(strings.TrimSpace(oldType))
newType = strings.ToLower(strings.TrimSpace(newType))

// If types are the same, they're fully compatible
if oldType == newType {
return TypeCoercionInfo{IsCoercible: true, IsBinaryCoercible: true}
}

// Handle type aliases and normalize to canonical forms
oldType = normalizeTypeName(oldType)
newType = normalizeTypeName(newType)

// Check again after normalization
if oldType == newType {
return TypeCoercionInfo{IsCoercible: true, IsBinaryCoercible: true}
}

// Binary coercible conversions (no table rewrite needed)
binaryCoerciblePairs := map[string][]string{
// Integer type family - these are binary compatible in most cases
"smallint": {"integer", "bigint"},
"integer": {"bigint"},
"int2": {"int4", "int8"},
"int4": {"int8"},

// Text type family - these are binary compatible
"varchar": {"text"},
"character varying": {"text"},
"char": {"varchar", "text"},
"character": {"varchar", "text"},

// Numeric precision changes that are binary compatible
"real": {"double precision"},
"float4": {"float8"},
}

// Check for binary coercible conversions
if compatibleTypes, exists := binaryCoerciblePairs[oldType]; exists {
for _, compatibleType := range compatibleTypes {
if newType == compatibleType {
return TypeCoercionInfo{IsCoercible: true, IsBinaryCoercible: true}
}
}
}

// Coercible but requires table rewrite (explicit casts exist)
coerciblePairs := map[string][]string{
// Numeric conversions that require rewrite (including downsizing and string conversions)
"bigint": {"integer", "smallint", "text", "varchar", "timestamp", "timestamptz"}, // Downsizing + string + special timestamp
"int8": {"int4", "int2"},
"integer": {"smallint", "text", "varchar"},
"int4": {"int2"},
"double precision": {"real", "text", "varchar"},
"float8": {"float4"},
"smallint": {"text", "varchar"},
"numeric": {"text", "varchar"},
"real": {"text", "varchar"},

// String to numeric (with potential data loss)
"text": {"integer", "bigint", "smallint", "numeric", "real", "double precision"},
"varchar": {"integer", "bigint", "smallint", "numeric", "real", "double precision"},

// Date/time conversions
"date": {"timestamp", "timestamptz"},
"timestamp": {"date", "timestamptz"},
"timestamptz": {"date", "timestamp"},
}

// Check for coercible conversions
if compatibleTypes, exists := coerciblePairs[oldType]; exists {
for _, compatibleType := range compatibleTypes {
if newType == compatibleType {
return TypeCoercionInfo{IsCoercible: true, IsBinaryCoercible: false}
}
}
}

// Known incompatible conversions (no cast exists)
incompatiblePairs := map[string][]string{
// Integer types to timestamp types (except bigint which has special handling)
"integer": {"timestamp", "timestamptz", "boolean", "uuid"},
"smallint": {"timestamp", "timestamptz", "boolean", "uuid"},

// Timestamp types to integer types (except to bigint which might work)
"timestamp": {"integer", "smallint", "uuid"},
"timestamptz": {"integer", "smallint", "uuid"},

// Text/varchar to bytea and vice versa
"text": {"bytea", "uuid"},
"varchar": {"bytea", "uuid"},
"bytea": {"text", "varchar"},

// Boolean to/from numeric types
"boolean": {"integer", "bigint", "smallint", "numeric", "real", "double precision"},
"bigint": {"boolean", "uuid"},
"numeric": {"boolean"},
"real": {"boolean"},
"double precision": {"boolean"},

// UUID to/from other types
"uuid": {"integer", "bigint", "smallint", "text", "varchar", "timestamp", "timestamptz"},
}

// Check for incompatible conversions
if incompatibleTypes, exists := incompatiblePairs[oldType]; exists {
for _, incompatibleType := range incompatibleTypes {
if newType == incompatibleType {
return TypeCoercionInfo{IsCoercible: false, IsBinaryCoercible: false}
}
}
}

// For unknown type combinations, assume they're coercible but require rewrite
// This is a conservative approach that errs on the side of caution
return TypeCoercionInfo{IsCoercible: true, IsBinaryCoercible: false}
}

// normalizeTypeName converts type aliases to their canonical forms
func normalizeTypeName(typeName string) string {
typeAliases := map[string]string{
"int": "integer",
"int2": "smallint",
"int4": "integer",
"int8": "bigint",
"float4": "real",
"float8": "double precision",
"bool": "boolean",
"character varying": "varchar",
"character": "char",
"timestamp without time zone": "timestamp",
"timestamp with time zone": "timestamptz",
}

if canonical, exists := typeAliases[typeName]; exists {
return canonical
}
return typeName
}

// IsTypeConversionCompatible checks if a type conversion from oldType to newType
// can be performed using ALTER COLUMN ... SET DATA TYPE with a cast.
// Returns false for conversions that require DROP/ADD approach.
func IsTypeConversionCompatible(oldType, newType string) bool {
info := GetTypeCoercionInfo(oldType, newType)
return info.IsCoercible
}

// isTypeConversionCompatible is the internal version used within the package
func isTypeConversionCompatible(oldType, newType string) bool {
return IsTypeConversionCompatible(oldType, newType)
}

func (csg *columnSQLVertexGenerator) generateTypeTransformationStatement(
col schema.Column,
oldType string,
newType string,
newTypeCollation schema.SchemaQualifiedName,
) Statement {
// Special case for bigint to timestamp conversion with custom logic
if strings.EqualFold(oldType, "bigint") &&
strings.EqualFold(newType, "timestamp without time zone") {
return Statement{
Expand All @@ -1301,11 +1506,34 @@ func (csg *columnSQLVertexGenerator) generateTypeTransformationStatement(
}
}

// Get type coercion information to provide appropriate hazards
coercionInfo := GetTypeCoercionInfo(oldType, newType)

collationModifier := ""
if !newTypeCollation.IsEmpty() {
collationModifier = fmt.Sprintf("COLLATE %s ", newTypeCollation.GetFQEscapedName())
}

var hazards []MigrationHazard

if coercionInfo.IsBinaryCoercible {
// Binary coercible types don't require table rewrite, so no ACCESS_EXCLUSIVE lock hazard
// This addresses issue #52
hazards = []MigrationHazard{{
Type: MigrationHazardTypeAcquiresAccessExclusiveLock,
Message: "This will acquire a brief ACCESS EXCLUSIVE lock to update the table metadata, " +
"but no table rewrite is required since the types are binary compatible.",
}}
} else {
// Non-binary coercible types require table rewrite
hazards = []MigrationHazard{{
Type: MigrationHazardTypeAcquiresAccessExclusiveLock,
Message: "This will completely lock the table while the data is being re-written. " +
"The duration of this conversion depends on the size of your data since a " +
"table rewrite is required to convert between these types.",
}}
}

return Statement{
DDL: fmt.Sprintf("%s SET DATA TYPE %s %susing %s::%s",
csg.alterColumnPrefix(col),
Expand All @@ -1316,14 +1544,7 @@ func (csg *columnSQLVertexGenerator) generateTypeTransformationStatement(
),
Timeout: statementTimeoutDefault,
LockTimeout: lockTimeoutDefault,
Hazards: []MigrationHazard{{
Type: MigrationHazardTypeAcquiresAccessExclusiveLock,
Message: "This will completely lock the table while the data is being re-written. " +
"The duration of this conversion depends on if the type conversion is trivial " +
"or not. A non-trivial conversion will require a table rewrite. A trivial " +
"conversion is one where the binary values are coercible and the column " +
"contents are not changing.",
}},
Hazards: hazards,
}
}

Expand Down
Loading