Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import { getDebugTableInfo } from '../replication/replication-utils.js';
import { KEEPALIVE_STATEMENT, PUBLICATION_NAME } from '../replication/WalStream.js';
import * as types from '../types/types.js';
import { getApplicationName } from '../utils/application-name.js';
import { CustomTypeRegistry } from '../types/registry.js';
import { PostgresTypeResolver } from '../types/resolver.js';

export class PostgresRouteAPIAdapter implements api.RouteAPI {
private typeCache: PostgresTypeResolver;
connectionTag: string;
// TODO this should probably be configurable one day
publicationName = PUBLICATION_NAME;
Expand All @@ -31,6 +34,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
connectionTag?: string,
private config?: types.ResolvedConnectionConfig
) {
this.typeCache = new PostgresTypeResolver(config?.typeRegistry ?? new CustomTypeRegistry(), pool);
this.connectionTag = connectionTag ?? sync_rules.DEFAULT_TAG;
}

Expand Down Expand Up @@ -297,6 +301,7 @@ LEFT JOIN (
SELECT
attrelid,
attname,
atttypid,
format_type(atttypid, atttypmod) as data_type,
(SELECT typname FROM pg_catalog.pg_type WHERE oid = atttypid) as pg_type,
attnum,
Expand All @@ -311,6 +316,7 @@ LEFT JOIN (
)
GROUP BY schemaname, tablename, quoted_name`
);
await this.typeCache.fetchTypesForSchema();
const rows = pgwire.pgwireRows(results);

let schemas: Record<string, service_types.DatabaseSchema> = {};
Expand All @@ -332,9 +338,11 @@ GROUP BY schemaname, tablename, quoted_name`
if (pg_type.startsWith('_')) {
pg_type = `${pg_type.substring(1)}[]`;
}

const knownType = this.typeCache.registry.lookupType(Number(column.atttypid));
table.columns.push({
name: column.attname,
sqlite_type: sync_rules.expressionTypeFromPostgresType(pg_type).typeFlags,
sqlite_type: sync_rules.ExpressionType.fromTypeText(knownType.sqliteType()).typeFlags,
type: column.data_type,
internal_type: column.data_type,
pg_type: pg_type
Expand Down
2 changes: 0 additions & 2 deletions modules/module-postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
export * from './module/PostgresModule.js';

export * as pg_utils from './utils/pgwire_utils.js';
14 changes: 10 additions & 4 deletions modules/module-postgres/src/module/PostgresModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import { WalStreamReplicator } from '../replication/WalStreamReplicator.js';
import * as types from '../types/types.js';
import { PostgresConnectionConfig } from '../types/types.js';
import { getApplicationName } from '../utils/application-name.js';
import { CustomTypeRegistry } from '../types/registry.js';

export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
private customTypes: CustomTypeRegistry = new CustomTypeRegistry();

constructor() {
super({
name: 'Postgres',
Expand Down Expand Up @@ -48,7 +51,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
protected createReplicator(context: system.ServiceContext): replication.AbstractReplicator {
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
const syncRuleProvider = new ConfigurationFileSyncRulesProvider(context.configuration.sync_rules);
const connectionFactory = new ConnectionManagerFactory(normalisedConfig);
const connectionFactory = new ConnectionManagerFactory(normalisedConfig, this.customTypes);

return new WalStreamReplicator({
id: this.getDefaultId(normalisedConfig.database),
Expand All @@ -66,7 +69,8 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
private resolveConfig(config: types.PostgresConnectionConfig): types.ResolvedConnectionConfig {
return {
...config,
...types.normalizeConnectionConfig(config)
...types.normalizeConnectionConfig(config),
typeRegistry: this.customTypes
};
}

Expand All @@ -75,7 +79,8 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
const connectionManager = new PgManager(normalisedConfig, {
idleTimeout: 30_000,
maxSize: 1,
applicationName: getApplicationName()
applicationName: getApplicationName(),
registry: this.customTypes
});

try {
Expand Down Expand Up @@ -106,7 +111,8 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
const connectionManager = new PgManager(normalizedConfig, {
idleTimeout: 30_000,
maxSize: 1,
applicationName: getApplicationName()
applicationName: getApplicationName(),
registry: new CustomTypeRegistry()
});
const connection = await connectionManager.snapshotConnection();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ import { PgManager } from './PgManager.js';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
import { PgPoolOptions } from '@powersync/service-jpgwire';
import { logger } from '@powersync/lib-services-framework';
import { CustomTypeRegistry } from '../types/registry.js';

export class ConnectionManagerFactory {
private readonly connectionManagers: PgManager[];
public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig;

constructor(dbConnectionConfig: NormalizedPostgresConnectionConfig) {
constructor(
dbConnectionConfig: NormalizedPostgresConnectionConfig,
private readonly registry: CustomTypeRegistry
) {
this.dbConnectionConfig = dbConnectionConfig;
this.connectionManagers = [];
}

create(poolOptions: PgPoolOptions) {
const manager = new PgManager(this.dbConnectionConfig, poolOptions);
const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry });
this.connectionManagers.push(manager);
return manager;
}
Expand Down
16 changes: 12 additions & 4 deletions modules/module-postgres/src/replication/PgManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ import * as pgwire from '@powersync/service-jpgwire';
import semver from 'semver';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
import { getApplicationName } from '../utils/application-name.js';
import { PostgresTypeResolver } from '../types/resolver.js';
import { getServerVersion } from '../utils/postgres_version.js';
import { CustomTypeRegistry } from '../types/registry.js';

export interface PgManagerOptions extends pgwire.PgPoolOptions {
registry: CustomTypeRegistry;
}

/**
* Shorter timeout for snapshot connections than for replication connections.
Expand All @@ -14,14 +21,17 @@ export class PgManager {
*/
public readonly pool: pgwire.PgClient;

public readonly types: PostgresTypeResolver;

private connectionPromises: Promise<pgwire.PgConnection>[] = [];

constructor(
public options: NormalizedPostgresConnectionConfig,
public poolOptions: pgwire.PgPoolOptions
public poolOptions: PgManagerOptions
) {
// The pool is lazy - no connections are opened until a query is performed.
this.pool = pgwire.connectPgWirePool(this.options, poolOptions);
this.types = new PostgresTypeResolver(poolOptions.registry, this.pool);
}

public get connectionTag() {
Expand All @@ -41,9 +51,7 @@ export class PgManager {
* @returns The Postgres server version in a parsed Semver instance
*/
async getServerVersion(): Promise<semver.SemVer | null> {
const result = await this.pool.query(`SHOW server_version;`);
// The result is usually of the form "16.2 (Debian 16.2-1.pgdg120+2)"
return semver.coerce(result.rows[0][0].split(' ')[0]);
return await getServerVersion(this.pool);
}

/**
Expand Down
9 changes: 9 additions & 0 deletions modules/module-postgres/src/replication/PgRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@ export function getPgOutputRelation(source: PgoutputRelation): storage.SourceEnt
replicaIdColumns: getReplicaIdColumns(source)
} satisfies storage.SourceEntityDescriptor;
}

export function referencedColumnTypeIds(source: PgoutputRelation): number[] {
const oids = new Set<number>();
for (const column of source.columns) {
oids.add(column.typeOid);
}

return [...oids];
}
79 changes: 49 additions & 30 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ import {
TablePattern,
toSyncRulesRow
} from '@powersync/service-sync-rules';
import * as pg_utils from '../utils/pgwire_utils.js';

import { PgManager } from './PgManager.js';
import { getPgOutputRelation, getRelId } from './PgRelation.js';
import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js';
import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js';
import { ReplicationMetric } from '@powersync/service-types';
import {
Expand Down Expand Up @@ -189,28 +188,30 @@ export class WalStream {

let tableRows: any[];
const prefix = tablePattern.isWildcard ? tablePattern.tablePrefix : undefined;
if (tablePattern.isWildcard) {
const result = await db.query({
statement: `SELECT c.oid AS relid, c.relname AS table_name

{
let query = `
SELECT
c.oid AS relid,
c.relname AS table_name,
(SELECT
json_agg(DISTINCT a.atttypid)
FROM pg_attribute a
WHERE a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = c.oid)
AS column_types
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1
AND c.relkind = 'r'
AND c.relname LIKE $2`,
params: [
{ type: 'varchar', value: schema },
{ type: 'varchar', value: tablePattern.tablePattern }
]
});
tableRows = pgwire.pgwireRows(result);
} else {
AND c.relkind = 'r'`;

if (tablePattern.isWildcard) {
query += ' AND c.relname LIKE $2';
} else {
query += ' AND c.relname = $2';
}

const result = await db.query({
statement: `SELECT c.oid AS relid, c.relname AS table_name
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = $1
AND c.relkind = 'r'
AND c.relname = $2`,
statement: query,
params: [
{ type: 'varchar', value: schema },
{ type: 'varchar', value: tablePattern.tablePattern }
Expand All @@ -219,6 +220,7 @@ export class WalStream {

tableRows = pgwire.pgwireRows(result);
}

let result: storage.SourceTable[] = [];

for (let row of tableRows) {
Expand Down Expand Up @@ -258,16 +260,18 @@ export class WalStream {

const cresult = await getReplicationIdentityColumns(db, relid);

const table = await this.handleRelation(
const columnTypes = (JSON.parse(row.column_types) as string[]).map((e) => Number(e));
const table = await this.handleRelation({
batch,
{
descriptor: {
name,
schema,
objectId: relid,
replicaIdColumns: cresult.replicationColumns
} as SourceEntityDescriptor,
false
);
snapshot: false,
referencedTypeIds: columnTypes
});

result.push(table);
}
Expand Down Expand Up @@ -683,7 +687,14 @@ WHERE oid = $1::regclass`,
}
}

async handleRelation(batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, snapshot: boolean) {
async handleRelation(options: {
batch: storage.BucketStorageBatch;
descriptor: SourceEntityDescriptor;
snapshot: boolean;
referencedTypeIds: number[];
}) {
const { batch, descriptor, snapshot, referencedTypeIds } = options;

if (!descriptor.objectId && typeof descriptor.objectId != 'number') {
throw new ReplicationAssertionError(`objectId expected, got ${typeof descriptor.objectId}`);
}
Expand All @@ -699,6 +710,9 @@ WHERE oid = $1::regclass`,
// Drop conflicting tables. This includes for example renamed tables.
await batch.drop(result.dropTables);

// Ensure we have a description for custom types referenced in the table.
await this.connections.types.fetchTypes(referencedTypeIds);

// Snapshot if:
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
// 2. Snapshot is not already done, AND:
Expand Down Expand Up @@ -789,7 +803,7 @@ WHERE oid = $1::regclass`,

if (msg.tag == 'insert') {
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const baseRecord = pg_utils.constructAfterRecord(msg);
const baseRecord = this.connections.types.constructAfterRecord(msg);
return await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
Expand All @@ -802,8 +816,8 @@ WHERE oid = $1::regclass`,
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
// "before" may be null if the replica id columns are unchanged
// It's fine to treat that the same as an insert.
const before = pg_utils.constructBeforeRecord(msg);
const after = pg_utils.constructAfterRecord(msg);
const before = this.connections.types.constructBeforeRecord(msg);
const after = this.connections.types.constructAfterRecord(msg);
return await batch.save({
tag: storage.SaveOperationTag.UPDATE,
sourceTable: table,
Expand All @@ -814,7 +828,7 @@ WHERE oid = $1::regclass`,
});
} else if (msg.tag == 'delete') {
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
const before = pg_utils.constructBeforeRecord(msg)!;
const before = this.connections.types.constructBeforeRecord(msg)!;

return await batch.save({
tag: storage.SaveOperationTag.DELETE,
Expand Down Expand Up @@ -955,7 +969,12 @@ WHERE oid = $1::regclass`,

for (const msg of messages) {
if (msg.tag == 'relation') {
await this.handleRelation(batch, getPgOutputRelation(msg), true);
await this.handleRelation({
batch,
descriptor: getPgOutputRelation(msg),
snapshot: true,
referencedTypeIds: referencedColumnTypeIds(msg)
});
} else if (msg.tag == 'begin') {
// This may span multiple transactions in the same chunk, or even across chunks.
skipKeepalive = true;
Expand Down
Loading