diff --git a/app/build.yaml b/app/build.yaml index 2167c8adca..2404b0950b 100644 --- a/app/build.yaml +++ b/app/build.yaml @@ -6,6 +6,7 @@ targets: - '$package$' - 'lib/$lib$' - 'lib/account/models.dart' + - 'lib/database/**' - 'lib/admin/models.dart' - 'lib/dartdoc/models.dart' - 'lib/frontend/handlers/pubapi.dart' diff --git a/app/lib/admin/tools/delete_all_staging.dart b/app/lib/admin/tools/delete_all_staging.dart index 7acc609d9a..6fd0617614 100644 --- a/app/lib/admin/tools/delete_all_staging.dart +++ b/app/lib/admin/tools/delete_all_staging.dart @@ -55,6 +55,7 @@ Future executeDeleteAllStaging(List args) async { dbService.query(): 500, dbService.query(): 500, dbService.query(): 500, + // ignore: deprecated_member_use_from_same_package dbService.query(): 100, }; diff --git a/app/lib/database/model.dart b/app/lib/database/model.dart new file mode 100644 index 0000000000..6c11037a1a --- /dev/null +++ b/app/lib/database/model.dart @@ -0,0 +1,89 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:convert' show json; + +import 'package:gcloud/service_scope.dart' as ss; +import 'package:json_annotation/json_annotation.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; +import 'package:postgres/postgres.dart'; +import 'package:pub_dev/service/secret/backend.dart'; +import 'package:pub_dev/shared/exceptions.dart'; +import 'package:pub_dev/task/models.dart' + show AbortedTokenInfo, PackageVersionStateInfo; +import 'package:typed_sql/typed_sql.dart'; + +export 'package:typed_sql/typed_sql.dart' hide AuthenticationException; + +part 'model.g.dart'; +part 'model.task.dart'; + +final _log = Logger('database'); + +/// Sets the active [database]. +void registerDatabase(Database value) => + ss.register(#_database, value); + +/// The active [Database]. +Database get database => + ss.lookup(#_database) as Database; + +abstract final class PrimaryDatabase extends Schema { + Table get tasks; + + Table get taskDependencies; +} + +Future setupDatabaseConnection() async { + final connectionString = + await secretBackend.lookup(SecretKey.databaseConnectionString); + if (connectionString == null) { + _log.shout('Missing database connection string from secrets'); + throw AssertionError('Missing database connection string from secrets'); + } + final u = Uri.parse(connectionString); + if (!u.isScheme('postgres')) { + _log.shout('Invalid database connection string scheme: ${u.scheme}'); + throw AssertionError( + 'Invalid database connection string scheme: ${u.scheme}'); + } + + if (u.host.isEmpty) { + throw AssertionError('Missing database host'); + } + + var database = u.path; + if (database.startsWith('/')) { + database = database.substring(1); + } + + String? username, password; + final userInfo = u.userInfo; + if (userInfo.isNotEmpty) { + if (userInfo.split(':').length != 2) { + throw AssertionError( + 'Invalid database connection string: unable to parse username/password', + ); + } + username = userInfo.split(':').firstOrNull; + password = userInfo.split(':').lastOrNull; + } + + return DatabaseAdapter.postgres(Pool.withEndpoints( + [ + Endpoint( + host: u.host, + port: u.port == 0 ? 5432 : u.port, + database: database, + username: username, + password: password, + ), + ], + settings: PoolSettings( + applicationName: 'pub-dev', + maxConnectionCount: 10, + ), + )); +} diff --git a/app/lib/database/model.g.dart b/app/lib/database/model.g.dart new file mode 100644 index 0000000000..9a878d5478 --- /dev/null +++ b/app/lib/database/model.g.dart @@ -0,0 +1,904 @@ +// GENERATED CODE - DO NOT MODIFY BY HAND + +part of 'model.dart'; + +// ************************************************************************** +// JsonSerializableGenerator +// ************************************************************************** + +TaskState _$TaskStateFromJson(Map json) => TaskState( + versions: (json['versions'] as Map).map( + (k, e) => MapEntry( + k, PackageVersionStateInfo.fromJson(e as Map)), + ), + abortedTokens: (json['abortedTokens'] as List) + .map((e) => AbortedTokenInfo.fromJson(e as Map)) + .toList(), + ); + +Map _$TaskStateToJson(TaskState instance) => { + 'versions': instance.versions, + 'abortedTokens': instance.abortedTokens, + }; + +// ************************************************************************** +// Generator: _TypedSqlBuilder +// ************************************************************************** + +/// Extension methods for a [Database] operating on [PrimaryDatabase]. +extension PrimaryDatabaseSchema on Database { + static const _$tables = [_$Task._$table, _$TaskDependency._$table]; + + Table get tasks => ExposedForCodeGen.declareTable( + this, + _$Task._$table, + ); + + Table get taskDependencies => ExposedForCodeGen.declareTable( + this, + _$TaskDependency._$table, + ); + + /// Create tables defined in [PrimaryDatabase]. + /// + /// Calling this on an empty database will create the tables + /// defined in [PrimaryDatabase]. In production it's often better to + /// use [createPrimaryDatabaseTables] and manage migrations using + /// external tools. + /// + /// This method is mostly useful for testing. + /// + /// > [!WARNING] + /// > If the database is **not empty** behavior is undefined, most + /// > likely this operation will fail. + Future createTables() async => ExposedForCodeGen.createTables( + context: this, + tables: _$tables, + ); +} + +/// Get SQL [DDL statements][1] for tables defined in [PrimaryDatabase]. +/// +/// This returns a SQL script with multiple DDL statements separated by `;` +/// using the specified [dialect]. +/// +/// Executing these statements in an empty database will create the tables +/// defined in [PrimaryDatabase]. In practice, this method is often used for +/// printing the DDL statements, such that migrations can be managed by +/// external tools. +/// +/// [1]: https://en.wikipedia.org/wiki/Data_definition_language +String createPrimaryDatabaseTables(SqlDialect dialect) => + ExposedForCodeGen.createTableSchema( + dialect: dialect, + tables: PrimaryDatabaseSchema._$tables, + ); + +final class _$Task extends Task { + _$Task._( + this.runtimeVersion, + this.package, + this.state, + this.pendingAt, + this.lastDependencyChanged, + this.finished, + ); + + @override + final String runtimeVersion; + + @override + final String package; + + @override + final TaskState state; + + @override + final DateTime pendingAt; + + @override + final DateTime lastDependencyChanged; + + @override + final DateTime finished; + + static const _$table = ( + tableName: 'tasks', + columns: [ + 'runtimeVersion', + 'package', + 'state', + 'pendingAt', + 'lastDependencyChanged', + 'finished' + ], + columnInfo: <({ + ColumnType type, + bool isNotNull, + Object? defaultValue, + bool autoIncrement, + })>[ + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.dateTime, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.dateTime, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.dateTime, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ) + ], + primaryKey: ['runtimeVersion', 'package'], + unique: >[], + foreignKeys: <({ + String name, + List columns, + String referencedTable, + List referencedColumns, + })>[], + readRow: _$Task._$fromDatabase, + ); + + static Task? _$fromDatabase(RowReader row) { + final runtimeVersion = row.readString(); + final package = row.readString(); + final state = ExposedForCodeGen.customDataTypeOrNull( + row.readString(), + TaskState.fromDatabase, + ); + final pendingAt = row.readDateTime(); + final lastDependencyChanged = row.readDateTime(); + final finished = row.readDateTime(); + if (runtimeVersion == null && + package == null && + state == null && + pendingAt == null && + lastDependencyChanged == null && + finished == null) { + return null; + } + return _$Task._(runtimeVersion!, package!, state!, pendingAt!, + lastDependencyChanged!, finished!); + } + + @override + String toString() => + 'Task(runtimeVersion: "$runtimeVersion", package: "$package", state: "$state", pendingAt: "$pendingAt", lastDependencyChanged: "$lastDependencyChanged", finished: "$finished")'; +} + +/// Extension methods for table defined in [Task]. +extension TableTaskExt on Table { + /// Insert row into the `tasks` table. + /// + /// Returns a [InsertSingle] statement on which `.execute` must be + /// called for the row to be inserted. + InsertSingle insert({ + required Expr runtimeVersion, + required Expr package, + required Expr state, + required Expr pendingAt, + required Expr lastDependencyChanged, + required Expr finished, + }) => + ExposedForCodeGen.insertInto( + table: this, + values: [ + runtimeVersion, + package, + state, + pendingAt, + lastDependencyChanged, + finished, + ], + ); + + /// Delete a single row from the `tasks` table, specified by + /// _primary key_. + /// + /// Returns a [DeleteSingle] statement on which `.execute()` must be + /// called for the row to be deleted. + /// + /// To delete multiple rows, using `.where()` to filter which rows + /// should be deleted. If you wish to delete all rows, use + /// `.where((_) => toExpr(true)).delete()`. + DeleteSingle delete( + String runtimeVersion, + String package, + ) => + ExposedForCodeGen.deleteSingle( + byKey(runtimeVersion, package), + _$Task._$table, + ); +} + +/// Extension methods for building queries against the `tasks` table. +extension QueryTaskExt on Query<(Expr,)> { + /// Lookup a single row in `tasks` table using the _primary key_. + /// + /// Returns a [QuerySingle] object, which returns at-most one row, + /// when `.fetch()` is called. + QuerySingle<(Expr,)> byKey( + String runtimeVersion, + String package, + ) => + where((task) => + task.runtimeVersion.equalsValue(runtimeVersion) & + task.package.equalsValue(package)).first; + + /// Update all rows in the `tasks` table matching this [Query]. + /// + /// The changes to be applied to each row matching this [Query] are + /// defined using the [updateBuilder], which is given an [Expr] + /// representation of the row being updated and a `set` function to + /// specify which fields should be updated. The result of the `set` + /// function should always be returned from the `updateBuilder`. + /// + /// Returns an [Update] statement on which `.execute()` must be called + /// for the rows to be updated. + /// + /// **Example:** decrementing `1` from the `value` field for each row + /// where `value > 0`. + /// ```dart + /// await db.mytable + /// .where((row) => row.value > toExpr(0)) + /// .update((row, set) => set( + /// value: row.value - toExpr(1), + /// )) + /// .execute(); + /// ``` + /// + /// > [!WARNING] + /// > The `updateBuilder` callback does not make the update, it builds + /// > the expressions for updating the rows. You should **never** invoke + /// > the `set` function more than once, and the result should always + /// > be returned immediately. + Update update( + UpdateSet Function( + Expr task, + UpdateSet Function({ + Expr runtimeVersion, + Expr package, + Expr state, + Expr pendingAt, + Expr lastDependencyChanged, + Expr finished, + }) set, + ) updateBuilder) => + ExposedForCodeGen.update( + this, + _$Task._$table, + (task) => updateBuilder( + task, + ({ + Expr? runtimeVersion, + Expr? package, + Expr? state, + Expr? pendingAt, + Expr? lastDependencyChanged, + Expr? finished, + }) => + ExposedForCodeGen.buildUpdate([ + runtimeVersion, + package, + state, + pendingAt, + lastDependencyChanged, + finished, + ]), + ), + ); + + /// Delete all rows in the `tasks` table matching this [Query]. + /// + /// Returns a [Delete] statement on which `.execute()` must be called + /// for the rows to be deleted. + Delete delete() => ExposedForCodeGen.delete(this, _$Task._$table); +} + +/// Extension methods for building point queries against the `tasks` table. +extension QuerySingleTaskExt on QuerySingle<(Expr,)> { + /// Update the row (if any) in the `tasks` table matching this + /// [QuerySingle]. + /// + /// The changes to be applied to the row matching this [QuerySingle] are + /// defined using the [updateBuilder], which is given an [Expr] + /// representation of the row being updated and a `set` function to + /// specify which fields should be updated. The result of the `set` + /// function should always be returned from the `updateBuilder`. + /// + /// Returns an [UpdateSingle] statement on which `.execute()` must be + /// called for the row to be updated. The resulting statement will + /// **not** fail, if there are no rows matching this query exists. + /// + /// **Example:** decrementing `1` from the `value` field the row with + /// `id = 1`. + /// ```dart + /// await db.mytable + /// .byKey(1) + /// .update((row, set) => set( + /// value: row.value - toExpr(1), + /// )) + /// .execute(); + /// ``` + /// + /// > [!WARNING] + /// > The `updateBuilder` callback does not make the update, it builds + /// > the expressions for updating the rows. You should **never** invoke + /// > the `set` function more than once, and the result should always + /// > be returned immediately. + UpdateSingle update( + UpdateSet Function( + Expr task, + UpdateSet Function({ + Expr runtimeVersion, + Expr package, + Expr state, + Expr pendingAt, + Expr lastDependencyChanged, + Expr finished, + }) set, + ) updateBuilder) => + ExposedForCodeGen.updateSingle( + this, + _$Task._$table, + (task) => updateBuilder( + task, + ({ + Expr? runtimeVersion, + Expr? package, + Expr? state, + Expr? pendingAt, + Expr? lastDependencyChanged, + Expr? finished, + }) => + ExposedForCodeGen.buildUpdate([ + runtimeVersion, + package, + state, + pendingAt, + lastDependencyChanged, + finished, + ]), + ), + ); + + /// Delete the row (if any) in the `tasks` table matching this [QuerySingle]. + /// + /// Returns a [DeleteSingle] statement on which `.execute()` must be called + /// for the row to be deleted. The resulting statement will **not** + /// fail, if there are no rows matching this query exists. + DeleteSingle delete() => + ExposedForCodeGen.deleteSingle(this, _$Task._$table); +} + +/// Extension methods for expressions on a row in the `tasks` table. +extension ExpressionTaskExt on Expr { + /// Runtime version this [Task] belongs to. + Expr get runtimeVersion => + ExposedForCodeGen.field(this, 0, ExposedForCodeGen.text); + + Expr get package => + ExposedForCodeGen.field(this, 1, ExposedForCodeGen.text); + + Expr get state => + ExposedForCodeGen.field(this, 2, TaskStateExt._exprType); + + /// Next [DateTime] at which point some package version becomes pending. + Expr get pendingAt => + ExposedForCodeGen.field(this, 3, ExposedForCodeGen.dateTime); + + /// Last [DateTime] a dependency was updated. + Expr get lastDependencyChanged => + ExposedForCodeGen.field(this, 4, ExposedForCodeGen.dateTime); + + /// The last time the a worker completed with a failure or success. + Expr get finished => + ExposedForCodeGen.field(this, 5, ExposedForCodeGen.dateTime); + + /// Get [SubQuery] of rows from the `taskDependencies` table which + /// reference this row. + /// + /// This returns a [SubQuery] of [TaskDependency] rows, + /// where [TaskDependency.runtimeVersion], [TaskDependency.package] + /// references [Task.runtimeVersion], [Task.package] + /// in this row. + SubQuery<(Expr,)> get dependencies => + ExposedForCodeGen.subqueryTable(_$TaskDependency._$table).where((r) => + r.runtimeVersion.equals(runtimeVersion) & r.package.equals(package)); +} + +extension ExpressionNullableTaskExt on Expr { + /// Runtime version this [Task] belongs to. + Expr get runtimeVersion => + ExposedForCodeGen.field(this, 0, ExposedForCodeGen.text); + + Expr get package => + ExposedForCodeGen.field(this, 1, ExposedForCodeGen.text); + + Expr get state => + ExposedForCodeGen.field(this, 2, TaskStateExt._exprType); + + /// Next [DateTime] at which point some package version becomes pending. + Expr get pendingAt => + ExposedForCodeGen.field(this, 3, ExposedForCodeGen.dateTime); + + /// Last [DateTime] a dependency was updated. + Expr get lastDependencyChanged => + ExposedForCodeGen.field(this, 4, ExposedForCodeGen.dateTime); + + /// The last time the a worker completed with a failure or success. + Expr get finished => + ExposedForCodeGen.field(this, 5, ExposedForCodeGen.dateTime); + + /// Get [SubQuery] of rows from the `taskDependencies` table which + /// reference this row. + /// + /// This returns a [SubQuery] of [TaskDependency] rows, + /// where [TaskDependency.runtimeVersion], [TaskDependency.package] + /// references [Task.runtimeVersion], [Task.package] + /// in this row, if any. + /// + /// If this row is `NULL` the subquery is always be empty. + SubQuery<(Expr,)> get dependencies => + ExposedForCodeGen.subqueryTable(_$TaskDependency._$table).where((r) => + r.runtimeVersion.equalsUnlessNull(runtimeVersion).asNotNull() & + r.package.equalsUnlessNull(package).asNotNull()); + + /// Check if the row is not `NULL`. + /// + /// This will check if _primary key_ fields in this row are `NULL`. + /// + /// If this is a reference lookup by subquery it might be more efficient + /// to check if the referencing field is `NULL`. + Expr isNotNull() => runtimeVersion.isNotNull() & package.isNotNull(); + + /// Check if the row is `NULL`. + /// + /// This will check if _primary key_ fields in this row are `NULL`. + /// + /// If this is a reference lookup by subquery it might be more efficient + /// to check if the referencing field is `NULL`. + Expr isNull() => isNotNull().not(); +} + +extension InnerJoinTaskTaskDependencyExt + on InnerJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [Task.runtimeVersion] = [TaskDependency.runtimeVersion] and [Task.package] = [TaskDependency.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + a.runtimeVersion.equals(b.runtimeVersion) & a.package.equals(b.package)); +} + +extension LeftJoinTaskTaskDependencyExt + on LeftJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [Task.runtimeVersion] = [TaskDependency.runtimeVersion] and [Task.package] = [TaskDependency.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + a.runtimeVersion.equals(b.runtimeVersion) & a.package.equals(b.package)); +} + +extension RightJoinTaskTaskDependencyExt + on RightJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [Task.runtimeVersion] = [TaskDependency.runtimeVersion] and [Task.package] = [TaskDependency.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + a.runtimeVersion.equals(b.runtimeVersion) & a.package.equals(b.package)); +} + +final class _$TaskDependency extends TaskDependency { + _$TaskDependency._( + this.runtimeVersion, + this.package, + this.dependency, + ); + + @override + final String runtimeVersion; + + @override + final String package; + + @override + final String dependency; + + static const _$table = ( + tableName: 'taskDependencies', + columns: ['runtimeVersion', 'package', 'dependency'], + columnInfo: <({ + ColumnType type, + bool isNotNull, + Object? defaultValue, + bool autoIncrement, + })>[ + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ), + ( + type: ExposedForCodeGen.text, + isNotNull: true, + defaultValue: null, + autoIncrement: false, + ) + ], + primaryKey: ['runtimeVersion', 'package', 'dependency'], + unique: >[], + foreignKeys: <({ + String name, + List columns, + String referencedTable, + List referencedColumns, + })>[ + ( + name: 'task', + columns: ['runtimeVersion', 'package'], + referencedTable: 'tasks', + referencedColumns: ['runtimeVersion', 'package'], + ) + ], + readRow: _$TaskDependency._$fromDatabase, + ); + + static TaskDependency? _$fromDatabase(RowReader row) { + final runtimeVersion = row.readString(); + final package = row.readString(); + final dependency = row.readString(); + if (runtimeVersion == null && package == null && dependency == null) { + return null; + } + return _$TaskDependency._(runtimeVersion!, package!, dependency!); + } + + @override + String toString() => + 'TaskDependency(runtimeVersion: "$runtimeVersion", package: "$package", dependency: "$dependency")'; +} + +/// Extension methods for table defined in [TaskDependency]. +extension TableTaskDependencyExt on Table { + /// Insert row into the `taskDependencies` table. + /// + /// Returns a [InsertSingle] statement on which `.execute` must be + /// called for the row to be inserted. + InsertSingle insert({ + required Expr runtimeVersion, + required Expr package, + required Expr dependency, + }) => + ExposedForCodeGen.insertInto( + table: this, + values: [ + runtimeVersion, + package, + dependency, + ], + ); + + /// Delete a single row from the `taskDependencies` table, specified by + /// _primary key_. + /// + /// Returns a [DeleteSingle] statement on which `.execute()` must be + /// called for the row to be deleted. + /// + /// To delete multiple rows, using `.where()` to filter which rows + /// should be deleted. If you wish to delete all rows, use + /// `.where((_) => toExpr(true)).delete()`. + DeleteSingle delete( + String runtimeVersion, + String package, + String dependency, + ) => + ExposedForCodeGen.deleteSingle( + byKey(runtimeVersion, package, dependency), + _$TaskDependency._$table, + ); +} + +/// Extension methods for building queries against the `taskDependencies` table. +extension QueryTaskDependencyExt on Query<(Expr,)> { + /// Lookup a single row in `taskDependencies` table using the _primary key_. + /// + /// Returns a [QuerySingle] object, which returns at-most one row, + /// when `.fetch()` is called. + QuerySingle<(Expr,)> byKey( + String runtimeVersion, + String package, + String dependency, + ) => + where((taskDependency) => + taskDependency.runtimeVersion.equalsValue(runtimeVersion) & + taskDependency.package.equalsValue(package) & + taskDependency.dependency.equalsValue(dependency)).first; + + /// Update all rows in the `taskDependencies` table matching this [Query]. + /// + /// The changes to be applied to each row matching this [Query] are + /// defined using the [updateBuilder], which is given an [Expr] + /// representation of the row being updated and a `set` function to + /// specify which fields should be updated. The result of the `set` + /// function should always be returned from the `updateBuilder`. + /// + /// Returns an [Update] statement on which `.execute()` must be called + /// for the rows to be updated. + /// + /// **Example:** decrementing `1` from the `value` field for each row + /// where `value > 0`. + /// ```dart + /// await db.mytable + /// .where((row) => row.value > toExpr(0)) + /// .update((row, set) => set( + /// value: row.value - toExpr(1), + /// )) + /// .execute(); + /// ``` + /// + /// > [!WARNING] + /// > The `updateBuilder` callback does not make the update, it builds + /// > the expressions for updating the rows. You should **never** invoke + /// > the `set` function more than once, and the result should always + /// > be returned immediately. + Update update( + UpdateSet Function( + Expr taskDependency, + UpdateSet Function({ + Expr runtimeVersion, + Expr package, + Expr dependency, + }) set, + ) updateBuilder) => + ExposedForCodeGen.update( + this, + _$TaskDependency._$table, + (taskDependency) => updateBuilder( + taskDependency, + ({ + Expr? runtimeVersion, + Expr? package, + Expr? dependency, + }) => + ExposedForCodeGen.buildUpdate([ + runtimeVersion, + package, + dependency, + ]), + ), + ); + + /// Delete all rows in the `taskDependencies` table matching this [Query]. + /// + /// Returns a [Delete] statement on which `.execute()` must be called + /// for the rows to be deleted. + Delete delete() => + ExposedForCodeGen.delete(this, _$TaskDependency._$table); +} + +/// Extension methods for building point queries against the `taskDependencies` table. +extension QuerySingleTaskDependencyExt on QuerySingle<(Expr,)> { + /// Update the row (if any) in the `taskDependencies` table matching this + /// [QuerySingle]. + /// + /// The changes to be applied to the row matching this [QuerySingle] are + /// defined using the [updateBuilder], which is given an [Expr] + /// representation of the row being updated and a `set` function to + /// specify which fields should be updated. The result of the `set` + /// function should always be returned from the `updateBuilder`. + /// + /// Returns an [UpdateSingle] statement on which `.execute()` must be + /// called for the row to be updated. The resulting statement will + /// **not** fail, if there are no rows matching this query exists. + /// + /// **Example:** decrementing `1` from the `value` field the row with + /// `id = 1`. + /// ```dart + /// await db.mytable + /// .byKey(1) + /// .update((row, set) => set( + /// value: row.value - toExpr(1), + /// )) + /// .execute(); + /// ``` + /// + /// > [!WARNING] + /// > The `updateBuilder` callback does not make the update, it builds + /// > the expressions for updating the rows. You should **never** invoke + /// > the `set` function more than once, and the result should always + /// > be returned immediately. + UpdateSingle update( + UpdateSet Function( + Expr taskDependency, + UpdateSet Function({ + Expr runtimeVersion, + Expr package, + Expr dependency, + }) set, + ) updateBuilder) => + ExposedForCodeGen.updateSingle( + this, + _$TaskDependency._$table, + (taskDependency) => updateBuilder( + taskDependency, + ({ + Expr? runtimeVersion, + Expr? package, + Expr? dependency, + }) => + ExposedForCodeGen.buildUpdate([ + runtimeVersion, + package, + dependency, + ]), + ), + ); + + /// Delete the row (if any) in the `taskDependencies` table matching this [QuerySingle]. + /// + /// Returns a [DeleteSingle] statement on which `.execute()` must be called + /// for the row to be deleted. The resulting statement will **not** + /// fail, if there are no rows matching this query exists. + DeleteSingle delete() => + ExposedForCodeGen.deleteSingle(this, _$TaskDependency._$table); +} + +/// Extension methods for expressions on a row in the `taskDependencies` table. +extension ExpressionTaskDependencyExt on Expr { + Expr get runtimeVersion => + ExposedForCodeGen.field(this, 0, ExposedForCodeGen.text); + + Expr get package => + ExposedForCodeGen.field(this, 1, ExposedForCodeGen.text); + + /// Name of a package that is either a direct or transitive dependency of + /// [package]. + Expr get dependency => + ExposedForCodeGen.field(this, 2, ExposedForCodeGen.text); + + /// Do a subquery lookup of the row from table + /// `tasks` referenced in + /// [runtimeVersion], [package]. + /// + /// The gets the row from table `tasks` where + /// [Task.runtimeVersion], [Task.package] + /// is equal to [runtimeVersion], [package]. + Expr get task => ExposedForCodeGen.subqueryTable(_$Task._$table) + .where((r) => + r.runtimeVersion.equals(runtimeVersion) & r.package.equals(package)) + .first + .asNotNull(); +} + +extension ExpressionNullableTaskDependencyExt on Expr { + Expr get runtimeVersion => + ExposedForCodeGen.field(this, 0, ExposedForCodeGen.text); + + Expr get package => + ExposedForCodeGen.field(this, 1, ExposedForCodeGen.text); + + /// Name of a package that is either a direct or transitive dependency of + /// [package]. + Expr get dependency => + ExposedForCodeGen.field(this, 2, ExposedForCodeGen.text); + + /// Do a subquery lookup of the row from table + /// `tasks` referenced in + /// [runtimeVersion], [package]. + /// + /// The gets the row from table `tasks` where + /// [Task.runtimeVersion], [Task.package] + /// is equal to [runtimeVersion], [package], if any. + /// + /// If this row is `NULL` the subquery is always return `NULL`. + Expr get task => ExposedForCodeGen.subqueryTable(_$Task._$table) + .where((r) => + r.runtimeVersion.equalsUnlessNull(runtimeVersion).asNotNull() & + r.package.equalsUnlessNull(package).asNotNull()) + .first; + + /// Check if the row is not `NULL`. + /// + /// This will check if _primary key_ fields in this row are `NULL`. + /// + /// If this is a reference lookup by subquery it might be more efficient + /// to check if the referencing field is `NULL`. + Expr isNotNull() => + runtimeVersion.isNotNull() & package.isNotNull() & dependency.isNotNull(); + + /// Check if the row is `NULL`. + /// + /// This will check if _primary key_ fields in this row are `NULL`. + /// + /// If this is a reference lookup by subquery it might be more efficient + /// to check if the referencing field is `NULL`. + Expr isNull() => isNotNull().not(); +} + +extension InnerJoinTaskDependencyTaskExt + on InnerJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [TaskDependency.runtimeVersion] = [Task.runtimeVersion] and [TaskDependency.package] = [Task.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + b.runtimeVersion.equals(a.runtimeVersion) & b.package.equals(a.package)); +} + +extension LeftJoinTaskDependencyTaskExt + on LeftJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [TaskDependency.runtimeVersion] = [Task.runtimeVersion] and [TaskDependency.package] = [Task.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + b.runtimeVersion.equals(a.runtimeVersion) & b.package.equals(a.package)); +} + +extension RightJoinTaskDependencyTaskExt + on RightJoin<(Expr,), (Expr,)> { + /// Join using the `task` _foreign key_. + /// + /// This will match rows where [TaskDependency.runtimeVersion] = [Task.runtimeVersion] and [TaskDependency.package] = [Task.package]. + Query<(Expr, Expr)> usingTask() => on((a, b) => + b.runtimeVersion.equals(a.runtimeVersion) & b.package.equals(a.package)); +} + +/// Wrap this [TaskState] as [Expr] for use queries with +/// `package:typed_sql`. +extension TaskStateExt on TaskState { + static final _exprType = ExposedForCodeGen.customDataType( + ExposedForCodeGen.text, + TaskState.fromDatabase, + ); + + /// Wrap this [TaskState] as [Expr] for use queries with + /// `package:typed_sql`. + Expr get asExpr => ExposedForCodeGen.literalCustomDataType( + this, + _exprType, + ).asNotNull(); +} + +/// Wrap this [TaskState] as [Expr] for use queries with +/// `package:typed_sql`. +extension TaskStateNullableExt on TaskState? { + /// Wrap this [TaskState] as [Expr] for use queries with + /// `package:typed_sql`. + Expr get asExpr => ExposedForCodeGen.literalCustomDataType( + this, + TaskStateExt._exprType, + ); +} diff --git a/app/lib/database/model.task.dart b/app/lib/database/model.task.dart new file mode 100644 index 0000000000..e7be80050b --- /dev/null +++ b/app/lib/database/model.task.dart @@ -0,0 +1,65 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of 'model.dart'; + +@PrimaryKey(['runtimeVersion', 'package']) +abstract final class Task extends Row { + /// Runtime version this [Task] belongs to. + String get runtimeVersion; + String get package; + + TaskState get state; + + /// Next [DateTime] at which point some package version becomes pending. + DateTime get pendingAt; + + /// Last [DateTime] a dependency was updated. + DateTime get lastDependencyChanged; + + /// The last time the a worker completed with a failure or success. + DateTime get finished; +} + +@PrimaryKey(['runtimeVersion', 'package', 'dependency']) +@ForeignKey( + ['runtimeVersion', 'package'], + table: 'tasks', + fields: ['runtimeVersion', 'package'], + name: 'task', + as: 'dependencies', +) +abstract final class TaskDependency extends Row { + String get runtimeVersion; + String get package; + + /// Name of a package that is either a direct or transitive dependency of + /// [package]. + String get dependency; +} + +@immutable +@JsonSerializable() +final class TaskState implements CustomDataType { + /// Scheduling state for all versions of this package. + final Map versions; + + /// The list of tokens that were removed from this [versions]. + /// When a worker reports back using one of these tokens, they will + /// recieve a [TaskAbortedException]. + final List abortedTokens; + + TaskState({ + required this.versions, + required this.abortedTokens, + }); + + factory TaskState.fromJson(Map m) => _$TaskStateFromJson(m); + Map toJson() => _$TaskStateToJson(this); + + factory TaskState.fromDatabase(String value) => + TaskState.fromJson(json.decode(value) as Map); + @override + String toDatabase() => json.encode(toJson()); +} diff --git a/app/lib/fake/backend/fake_database.dart b/app/lib/fake/backend/fake_database.dart new file mode 100644 index 0000000000..7bbac33dde --- /dev/null +++ b/app/lib/fake/backend/fake_database.dart @@ -0,0 +1,80 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; +import 'dart:isolate' show Isolate; + +import 'package:logging/logging.dart'; +import 'package:pub_dev/database/model.dart'; +import 'package:pub_dev/shared/env_config.dart'; + +DatabaseAdapter createFakeDatabaseAdaptor() { + return DatabaseAdapter.withLogging( + DatabaseAdapter.fromFuture(_createFakeDatabaseAdaptor()), + Logger('sql').info); +} + +Future _createFakeDatabaseAdaptor() async { + if (envConfig.isContinuesIntegration) { + // Use default environment variables. + return DatabaseAdapter.postgresTestDatabase(); + } + + final lib = Isolate.resolvePackageUriSync(Uri.parse('package:pub_dev/')); + if (lib == null) { + throw StateError('Cannot resolve package:pub_dev/ to find .dart_tool/'); + } + + await Directory('/tmp/pub_dev_postgres/run/').create(recursive: true); + final socketPath = '/tmp/pub_dev_postgres/run/.s.PGSQL.5432'; + final lockFile = File('/tmp/pub_dev_postgres/.lock'); + if (!await _tryConnect(socketPath)) { + final lock = await lockFile.open(mode: FileMode.append); + try { + await lock.lock(FileLock.blockingExclusive); + + if (!await _tryConnect(socketPath)) { + // Start tool launching the postgres docker instance in a detatched + // background process that can keep running. + final tool = lib.resolve('../../tool/run_postgres_test_server.sh'); + await Process.start( + tool.toFilePath(), + ['--quiet'], + workingDirectory: lib.resolve('../../').toFilePath(), + mode: ProcessStartMode.detached, + ); + + do { + await Future.delayed(Duration(milliseconds: 300)); + } while (!await _tryConnect(socketPath)); + } + } finally { + try { + await lock.unlock(); + } finally { + await lock.close(); + } + } + } + + return DatabaseAdapter.postgresTestDatabase( + host: socketPath, + database: 'postgres', + user: 'postgres', + password: 'postgres', + ); +} + +Future _tryConnect(String socketPath) async { + try { + final s = await Socket.connect( + InternetAddress(socketPath, type: InternetAddressType.unix), + 5432, + ); + s.destroy(); + return true; + } on SocketException { + return false; + } +} diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index 425b4c3ced..b3e9ca5d44 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -19,6 +19,7 @@ import 'package:meta/meta.dart'; import 'package:pana/src/dartdoc/pub_dartdoc_data.dart'; import 'package:path/path.dart' as p; import 'package:pool/pool.dart'; +import 'package:pub_dev/database/model.dart'; import 'package:pub_dev/shared/monitoring.dart'; import 'package:retry/retry.dart'; @@ -40,7 +41,6 @@ import '../shared/storage.dart'; import '../shared/versions.dart'; import '../task/backend.dart'; import '../task/global_lock.dart'; -import '../task/models.dart'; import 'dart_sdk_mem_index.dart'; import 'flutter_sdk_mem_index.dart'; @@ -85,10 +85,11 @@ void registerSearchIndex(SearchIndex index) => /// Datastore-related access methods for the search service class SearchBackend { + final Database db; final DatastoreDB _db; final VersionedJsonStorage _snapshotStorage; - SearchBackend(this._db, Bucket snapshotBucket) + SearchBackend(this.db, this._db, Bucket snapshotBucket) : _snapshotStorage = VersionedJsonStorage(snapshotBucket, 'snapshot/'); /// Runs a forever loop and tries to get a global lock. @@ -255,11 +256,14 @@ class SearchBackend { addResult(p.name!, p.updated!); } - final q3 = _db.query() - ..filter('finished >=', updatedThreshold) - ..order('-finished'); - await for (final s in q3.run()) { - addResult(s.package, s.finished); + final q2 = db.tasks + .where((task) => + task.runtimeVersion.equalsValue(runtimeVersion) & + task.finished.isAfterValue(updatedThreshold)) + .orderBy((task) => [(task.finished, Order.descending)]) + .select((task) => (task.package, task.finished)); + await for (final (package, finished) in q2.stream()) { + addResult(package, finished); } return results; diff --git a/app/lib/service/secret/models.dart b/app/lib/service/secret/models.dart index 871614b80a..84aca63150 100644 --- a/app/lib/service/secret/models.dart +++ b/app/lib/service/secret/models.dart @@ -16,6 +16,8 @@ class Secret extends db.Model { abstract class SecretKey { static const String redisConnectionString = 'redis-connection-string'; + static const String databaseConnectionString = 'database-connection-string'; + /// OAuth client secret. static const String oauthClientSecret = 'oauth-client-secret'; @@ -38,6 +40,7 @@ abstract class SecretKey { /// List of all keys. static const values = [ redisConnectionString, + databaseConnectionString, oauthClientSecret, announcement, uploadRestriction, diff --git a/app/lib/service/services.dart b/app/lib/service/services.dart index c34bdaeb10..a6dab03cf3 100644 --- a/app/lib/service/services.dart +++ b/app/lib/service/services.dart @@ -14,6 +14,9 @@ import 'package:gcloud/service_scope.dart'; import 'package:gcloud/storage.dart'; import 'package:googleapis_auth/auth_io.dart' as auth; import 'package:logging/logging.dart'; +import 'package:pub_dev/database/model.dart'; +import 'package:pub_dev/fake/backend/fake_database.dart' + show createFakeDatabaseAdaptor; import 'package:pub_dev/package/api_export/api_exporter.dart'; import 'package:pub_dev/search/handlers.dart'; import 'package:pub_dev/service/async_queue/async_queue.dart'; @@ -87,6 +90,15 @@ Future withServices(FutureOr Function() fn) async { final retryingAuthClient = httpRetryClient(innerClient: authClient); registerScopeExitCallback(() async => retryingAuthClient.close()); + registerSecretBackend(GcpSecretBackend(authClient)); + + final databaseAdapter = await setupDatabaseConnection(); + registerDatabase(Database( + databaseAdapter, + SqlDialect.postgres(), + )); + registerScopeExitCallback(() => databaseAdapter.close()); + // override storageService with retrying http client registerStorageService( Storage(retryingAuthClient, activeConfiguration.projectId)); @@ -105,7 +117,6 @@ Future withServices(FutureOr Function() fn) async { : loggingEmailSender, ); registerUploadSigner(await createUploadSigner(retryingAuthClient)); - registerSecretBackend(GcpSecretBackend(authClient)); // Configure a CloudCompute pool for later use in TaskBackend // @@ -179,6 +190,14 @@ Future withFakeServices({ } // register fake services that would have external dependencies + final databaseAdapter = createFakeDatabaseAdaptor(); + registerDatabase(Database( + databaseAdapter, + SqlDialect.postgres(), + )); + await database.createTables(); + registerScopeExitCallback(() => databaseAdapter.close()); + registerSecretBackend(FakeSecretBackend({})); registerAuthProvider(FakeAuthProvider()); registerScopeExitCallback(authProvider.close); @@ -261,7 +280,7 @@ Future _withPubServices(FutureOr Function() fn) async { registerIndexUpdater(IndexUpdater(dbService)); registerPublisherBackend(PublisherBackend(dbService)); registerScoreCardBackend(ScoreCardBackend(dbService)); - registerSearchBackend(SearchBackend(dbService, + registerSearchBackend(SearchBackend(database, dbService, storageService.bucket(activeConfiguration.searchSnapshotBucketName!))); registerSearchClient(SearchClient()); registerSearchAdapter(SearchAdapter()); @@ -280,6 +299,7 @@ Future _withPubServices(FutureOr Function() fn) async { storageService.bucket(activeConfiguration.publicPackagesBucketName!), )); registerTaskBackend(TaskBackend( + database, dbService, storageService.bucket(activeConfiguration.taskResultBucketName!), )); diff --git a/app/lib/shared/env_config.dart b/app/lib/shared/env_config.dart index f0e7d7dbb4..d5f18247af 100644 --- a/app/lib/shared/env_config.dart +++ b/app/lib/shared/env_config.dart @@ -37,6 +37,11 @@ class _EnvConfig { /// Youtube API key to use (skips Datastore secret). late final youtubeApiKey = Platform.environment['YOUTUBE_API_KEY']; + /// True, if the process is running in a CI environment. + late final isContinuesIntegration = !['false', '0', ''].contains( + Platform.environment['CI']?.toLowerCase() ?? '', + ); + /// Drives the logging environment in certain tests. /// **Examples**: /// * `DEBUG='*'`, will show output from all loggers. diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 2a899d1550..6c53827798 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -1,3 +1,7 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + import 'dart:async'; import 'dart:convert'; import 'dart:io'; @@ -18,12 +22,14 @@ import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange; import 'package:logging/logging.dart' show Logger; import 'package:pana/models.dart' show Summary; import 'package:pool/pool.dart' show Pool; +import 'package:pub_dev/database/model.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/package/models.dart'; import 'package:pub_dev/package/upload_signer_service.dart'; import 'package:pub_dev/scorecard/backend.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/exceptions.dart'; +import 'package:pub_dev/shared/parallel_foreach.dart'; import 'package:pub_dev/shared/redis_cache.dart'; import 'package:pub_dev/shared/storage.dart'; import 'package:pub_dev/shared/utils.dart' @@ -34,8 +40,6 @@ import 'package:pub_dev/shared/versions.dart' gcBeforeRuntimeVersion, shouldGCVersion, acceptedRuntimeVersions; -import 'package:pub_dev/shared/versions.dart' as shared_versions - show runtimeVersion; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; @@ -43,10 +47,10 @@ import 'package:pub_dev/task/handlers.dart'; import 'package:pub_dev/task/models.dart' show AbortedTokenInfo, - PackageState, PackageStateInfo, PackageVersionStateInfo, PackageVersionStatus, + derivePendingAt, initialTimestamp, maxTaskExecutionTime; import 'package:pub_dev/task/scheduler.dart'; @@ -80,6 +84,7 @@ void registerTaskBackend(TaskBackend backend) => TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend; class TaskBackend { + final Database db; final DatastoreDB _db; final Bucket _bucket; @@ -94,7 +99,7 @@ class TaskBackend { /// `null` when not started yet. Completer? _stopped; - TaskBackend(this._db, this._bucket); + TaskBackend(this.db, this._db, this._bucket); /// Start continuous background processes for scheduling of tasks. /// @@ -165,7 +170,7 @@ class TaskBackend { // kill overdue VMs. try { await lock.withClaim((claim) async { - await schedule(claim, taskWorkerCloudCompute, _db, + await schedule(claim, taskWorkerCloudCompute, _db, db, abort: aborted); }, abort: aborted); } catch (e, st) { @@ -219,7 +224,7 @@ class TaskBackend { /// Track all package versions. /// /// This will synchronize any changes from [Package] and [PackageVersion] - /// entities to [PackageState] entities. + /// entities to [Task] entities. /// /// This is intended to run as a background tasks that is called once per /// day or so. @@ -228,8 +233,8 @@ class TaskBackend { // [PackageState] entities that shouldn't exist. final packageNames = {}; - // Allow a little concurrency - final pool = Pool(10); + const concurrency = 10; + // Track error / stackTrace, so we can re-throw the first error, when this // backfill task is done. We want to bubble up so that background task is // not registered as having completed successfully. @@ -237,55 +242,44 @@ class TaskBackend { StackTrace? stackTrace; // For each package we should ensure state is tracked - await for (final p in _db.packages.listAllNames()) { + await _db.packages.listAllNames().parallelForEach(concurrency, (p) async { packageNames.add(p.name); - - scheduleMicrotask(() async { - await pool.withResource(() async { - try { - await trackPackage(p.name, updateDependents: false); - } catch (e, st) { - _log.severe('failed to track state for "${p.name}"', e, st); - if (error == null) { - error = e; // save [e] for later, if this is the first failure - stackTrace = st; - } - } - }); - }); - } - - // Check that all [PackageState] entities have a matching [Package] entity. - await for (final state in _db.tasks.listAllForCurrentRuntime()) { - if (!packageNames.contains(state.package)) { - final r = await pool.request(); - - scheduleMicrotask(() async { - try { - // Lookup the package to ensure it really doesn't exist - if (await _db.packages.exists(state.package)) { - // package may have been created recently - // no need to delete [PackageState] - } else { - // no package entry, deleting is needed - await _db.tasks.delete(state.package); - } - } catch (e, st) { - _log.severe('failed to untrack "${state.package}"', e, st); - if (error == null) { - error = e; // save [e] for later, if this is the first failure - stackTrace = st; - } - } finally { - r.release(); // always release to avoid deadlock - } - }); + try { + await trackPackage(p.name, updateDependents: false); + } catch (e, st) { + _log.severe('failed to track state for "${p.name}"', e, st); + if (error == null) { + error = e; // save [e] for later, if this is the first failure + stackTrace = st; + } } - } + }); - // Wait for all ongoing microtasks started above to complete. - await pool.close(); - await pool.done; + // Check that all [Task] rows have a matching [Package] entity. + await db.tasks + .where((task) => task.runtimeVersion.equalsValue(runtimeVersion)) + .select((task) => (task.package,)) + .stream() + .parallelForEach(concurrency, (package) async { + if (packageNames.contains(package)) { + return; + } + try { + // Lookup the package to ensure it really doesn't exist! + // The package may have been created recently, if so there is no need + // to delete [PackageState] + if (!await _db.packages.exists(package)) { + // no package entry, deleting is needed + await db.tasks.delete(runtimeVersion, package).execute(); + } + } catch (e, st) { + _log.severe('failed to untrack "$package"', e, st); + if (error == null) { + error = e; // save [e] for later, if this is the first failure + stackTrace = st; + } + } + }); // If we had any error, we rethrow to ensure that any background task // calling this method won't register completion as successful. @@ -350,8 +344,10 @@ class TaskBackend { seen.removeWhere((_, updated) => updated.isBefore(since)); // Wait until aborted or 10 minutes before scanning again! - await abort.future - .timeoutWithClock(Duration(minutes: 10), onTimeout: () => null); + await abort.future.timeoutWithClock( + Duration(minutes: 10), + onTimeout: () => null, + ); } } @@ -359,73 +355,74 @@ class TaskBackend { String packageName, { bool updateDependents = false, }) async { - var lastVersionCreated = initialTimestamp; - String? latestVersion; - final changed = await withRetryTransaction(_db, (tx) async { - // Lookup Package and PackageVersion in the same transaction. + // Lookup Package and PackageVersion in the same transaction. + final (package, packageVersions) = + await withRetryTransaction(_db, (tx) async { final packageFuture = tx.packages.lookupOrNull(packageName); final packageVersionsFuture = tx.versions.listVersionsOfPackage(packageName); - final stateFuture = tx.tasks.lookupOrNull(packageName); + // Ensure we await all futures! - await Future.wait([packageFuture, packageVersionsFuture, stateFuture]); - final state = await stateFuture; + await Future.wait([ + packageFuture, + packageVersionsFuture, + ]); final package = await packageFuture; final packageVersions = await packageVersionsFuture; + return (package, packageVersions); + }); - if (package == null) { - return false; // assume package was deleted! - } - latestVersion = package.latestVersion; + if (package == null) { + return; // assume package was deleted! + } - // Update the timestamp for when the last version was published. - // This is used if we need to update dependents. - lastVersionCreated = packageVersions.map((pv) => pv.created!).max; + if (package.isNotVisible) { + await db.tasks.delete(runtimeVersion, packageName).execute(); + await _purgeCache(packageName, package.latestVersion); + return; + } - // If package is not visible, we should remove it! - if (package.isNotVisible) { - await tx.tasks - .deleteAllStates(packageName, currentRuntimeKey: state?.key); + // Determined the set of versions to track + final versions = _versionsToTrack(package, packageVersions) + .map( + (v) => v.canonicalizedVersion, // add extra sanity! + ) + .toList(); + + final changed = await db.transact(() async { + final task = await db.tasks.byKey(runtimeVersion, packageName).fetch(); + if (task == null) { + await db.tasks + .insert( + runtimeVersion: runtimeVersion.asExpr, + package: packageName.asExpr, + state: TaskState( + versions: { + for (final version in versions) + version: PackageVersionStateInfo( + scheduled: DateTime.utc(0), + attempts: 0, + ), + }, + abortedTokens: [], + ).asExpr, + lastDependencyChanged: DateTime.utc(0).asExpr, + pendingAt: DateTime.utc(0).asExpr, + finished: DateTime.utc(0).asExpr, + ) + .execute(); return true; } - // Determined the set of versions to track - final versions = _versionsToTrack(package, packageVersions).map( - (v) => v.canonicalizedVersion, // add extra sanity! - ); - - // Ensure we have PackageState entity - if (state == null) { - // Create [PackageState] entity to track the package - _log.info('Started state tracking for $packageName'); - await tx.tasks.insert( - PackageState() - ..setId(runtimeVersion, packageName) - ..runtimeVersion = runtimeVersion - ..versions = { - for (final version in versions) - version: PackageVersionStateInfo( - scheduled: initialTimestamp, - attempts: 0, - ), - } - ..dependencies = [] - ..lastDependencyChanged = initialTimestamp - ..finished = initialTimestamp - ..derivePendingAt(), - ); - return true; // no more work for this package, state is synced - } - // List versions that not tracked, but should be final untrackedVersions = [ - ...versions.whereNot(state.versions!.containsKey), + ...versions.whereNot(task.state.versions.containsKey), ]; // List of versions that are tracked, but don't exist. These have // probably been deselected by _versionsToTrack. final deselectedVersions = [ - ...state.versions!.keys.whereNot(versions.contains), + ...task.state.versions.keys.whereNot(versions.contains), ]; // There should never be an overlap between versions untracked and @@ -442,41 +439,55 @@ class TaskBackend { return false; } - state.abortedTokens = [ - ...state.versions!.entries - .where((e) => deselectedVersions.contains(e.key)) - .map((e) => e.value) - .where((vs) => vs.secretToken != null) - .map( - (vs) => AbortedTokenInfo( - token: vs.secretToken!, - expires: vs.scheduled.add(maxTaskExecutionTime), + final state = TaskState( + abortedTokens: [ + ...task.state.versions.entries + .where((e) => deselectedVersions.contains(e.key)) + .map((e) => e.value) + .where((vs) => vs.secretToken != null) + .map( + (vs) => AbortedTokenInfo( + token: vs.secretToken!, + expires: vs.scheduled.add(maxTaskExecutionTime), + ), ), - ), - ...?state.abortedTokens, - ].where((t) => t.isNotExpired).take(50).toList(); - - // Make changes! - state.versions! - // Remove versions that have been deselected - ..removeWhere((v, _) => deselectedVersions.contains(v)) - // Add versions we should be tracking - ..addAll({ - for (final v in untrackedVersions) - v: PackageVersionStateInfo( - scheduled: initialTimestamp, - attempts: 0, - ), - }); - state.derivePendingAt(); + ...task.state.abortedTokens, + ].where((t) => t.isNotExpired).take(50).toList(), + versions: Map.fromEntries([ + ...task.state.versions.entries + .where((e) => !deselectedVersions.contains(e.key)), + ...untrackedVersions.map( + (v) => MapEntry( + v, + PackageVersionStateInfo( + scheduled: initialTimestamp, + attempts: 0, + )), + ), + ]), + ); - _log.info('Update state tracking for $packageName'); - await tx.tasks.update(state); + final pendingAt = derivePendingAt( + state: state, + lastDependencyChanged: task.lastDependencyChanged, + ); + + await db.tasks + .byKey(runtimeVersion, packageName) + .update((task, set) => set( + state: state.asExpr, + pendingAt: pendingAt.asExpr, + )) + .execute(); return true; }); + // Update the timestamp for when the last version was published. + // This is used if we need to update dependents. + final lastVersionCreated = packageVersions.map((pv) => pv.created!).max; + if (changed) { - await _purgeCache(packageName, latestVersion); + await _purgeCache(packageName, package.latestVersion); } if (updateDependents && @@ -488,9 +499,12 @@ class TaskBackend { } } - /// Garbage collect [PackageState] and results from old runtimeVersions. + /// Garbage collect [Task] and results from old runtimeVersions. Future garbageCollect() async { - await _db.tasks.deleteBeforeGcRuntime(); + await db.tasks + .where((task) => task.runtimeVersion < gcBeforeRuntimeVersion.asExpr) + .delete() + .execute(); // Limit to 50 concurrent deletion requests final pool = Pool(50); @@ -536,52 +550,42 @@ class TaskBackend { await pool.done; } - /// Update [PackageState.lastDependencyChanged] for all packages with + /// Update [Task.lastDependencyChanged] for all packages with /// dependency on [package] to at-least [publishedAt]. Future _updateLastDependencyChangedForDependents( String package, DateTime publishedAt, ) async { - // Max concurrency of 20! - final pool = Pool(20); - - // Query for [PackageState] that has [package] listed in [dependencies]. - // Notice that datastore query logic for `dependencies = package` means - // entities where: - // (A) `dependencies` is equal to `package` (won't happen here). - // (B) `dependencies` is a list of strings containing `packages`, - // this is the matching logic we leverage here. - // - // We only update [PackageState] to have [lastDependencyChanged], this - // ensures that there is no risk of indefinite propagation. - final stream = _db.tasks.listDependenciesOfPackage(package, publishedAt); - await for (final state in stream) { - final r = await pool.request(); - - // Schedule a microtask that attempts to update [lastDependencyChanged], - // and logs any failures before always releasing the [r]. - scheduleMicrotask(() async { - try { - final changed = await _db.tasks - .updateDependencyChanged(state.package, publishedAt); - if (changed) { - await _purgeCache(state.package); - } - } catch (e, st) { - _log.warning( - 'failed to propagate lastDependencyChanged for ${state.package}', - e, - st, - ); - } finally { - r.release(); // always release to avoid deadlocks - } - }); - } - // Close the pool -- no more resources requested. - await pool.close(); - // Wait for all resources to be released. - await pool.done; + await db.tasks + .join(db.taskDependencies) + .usingTask() + .where((task, dependency) => + dependency.package.equalsValue(package) & + dependency.runtimeVersion.equalsValue(runtimeVersion) & + task.lastDependencyChanged.isBeforeValue(publishedAt)) + .select((task, dependency) => (task.package, task.state)) + .stream() + .parallelForEach(20, (row) async { + final (package, state) = row; + try { + await db.tasks + .byKey(runtimeVersion, package) + .update((_, set) => set( + lastDependencyChanged: publishedAt.asExpr, + pendingAt: derivePendingAt( + state: state, + lastDependencyChanged: publishedAt, + ).asExpr, + )) + .execute(); + } catch (e, st) { + _log.warning( + 'failed to propagate lastDependencyChanged to "$package"', + e, + st, + ); + } + }); } // Handles POST `/api/tasks/$package/$version/upload` @@ -598,12 +602,12 @@ class TaskBackend { throw AuthenticationException.authenticationRequired(); } - final state = await _db.tasks.lookupOrNull(package); - if (state == null) { + final task = await db.tasks.byKey(runtimeVersion, package).fetch(); + if (task == null) { throw NotFoundException.resource('$package/$version'); } final versionState = - _authorizeWorkerCallback(package, version, state, token); + _authorizeWorkerCallback(package, version, task, token); // Set expiration of signed URLs to remaining execution time + 5 min to // allow for clock skew. @@ -668,57 +672,75 @@ class TaskBackend { await _gzippedTaskResult(index, 'summary.json'), ); final hasDocIndexHtml = index.lookup('doc/index.html') != null; - await withRetryTransaction(_db, (tx) async { - final state = await tx.tasks.lookupOrNull(package); - if (state == null) { + + await db.transact(() async { + final task = await db.tasks.byKey(runtimeVersion, package).fetch(); + if (task == null) { throw NotFoundException.resource('$package/$version'); } final versionState = - _authorizeWorkerCallback(package, version, state, token); - - // Update dependencies, if pana summary has dependencies - if (summary != null && summary.allDependencies != null) { - final updatedDependencies = _updatedDependencies( - state.dependencies, - summary.allDependencies, - // for logging only - package: package, - version: version, - ); - // Only update if new dependencies have been discovered. - // This avoids unnecessary churn on datastore when there is no changes. - if (state.dependencies != updatedDependencies && - !{...?state.dependencies}.containsAll(updatedDependencies)) { - state.dependencies = updatedDependencies; - } + _authorizeWorkerCallback(package, version, task, token); + + final newDependencies = _newDependencies( + await db.taskDependencies + .where((d) => + d.package.equalsValue(package) & + d.runtimeVersion.equalsValue(runtimeVersion)) + .select((d) => (d.dependency,)) + .fetch(), + summary?.allDependencies ?? [], + // for logging only + package: package, + version: version, + ); + + for (final d in newDependencies) { + await db.taskDependencies + .insert( + runtimeVersion: runtimeVersion.asExpr, + package: package.asExpr, + dependency: d.asExpr, + ) + .execute(); } zone = versionState.zone!; instance = versionState.instance!; - // Remove instanceName, zone, secretToken, and set attempts = 0 - state.versions![version] = PackageVersionStateInfo( - scheduled: versionState.scheduled, - docs: hasDocIndexHtml, - pana: summary != null, - finished: true, - attempts: 0, - instance: null, // version is no-longer running on this instance - secretToken: null, // TODO: Consider retaining this for idempotency - zone: null, + final state = TaskState( + abortedTokens: task.state.abortedTokens, + versions: { + ...task.state.versions, + version: PackageVersionStateInfo( + scheduled: versionState.scheduled, + docs: hasDocIndexHtml, + pana: summary != null, + finished: true, + attempts: 0, + instance: null, // version is no-longer running on this instance + secretToken: null, // TODO: Consider retaining this for idempotency + zone: null, + ), + }, ); // Determine if something else was running on the instance - isInstanceDone = state.versions!.values.none( + isInstanceDone = state.versions.values.none( (v) => v.instance == instance, ); - // Ensure that we update [state.pendingAt], otherwise it might be - // re-scheduled way too soon. - state.derivePendingAt(); - state.finished = clock.now().toUtc(); - - await tx.tasks.update(state); + // Update task row and derive new pendingAt + await db.tasks + .byKey(runtimeVersion, package) + .update((_, set) => set( + state: state.asExpr, + pendingAt: derivePendingAt( + state: state, + lastDependencyChanged: task.lastDependencyChanged, + ).asExpr, + finished: clock.now().toUtc().asExpr, + )) + .execute(); }); // Clearing the state cache after the update. @@ -969,19 +991,25 @@ class TaskBackend { /// Get the most up-to-date status information for a package that has already been analyzed. Future packageStatus(String package) async { final status = await cache.taskPackageStatus(package).get(() async { - for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull(package, runtimeVersion: rt); - // skip states where the entry was created, but no analysis has not finished yet - if (state == null || state.hasNeverFinished) { - continue; - } - return PackageStateInfo( - runtimeVersion: state.runtimeVersion!, - package: package, - versions: state.versions ?? {}, - ); + final (rt, state) = await db.tasks + .where((task) => + task.package.equalsValue(package) & + task.finished.isAfterValue(DateTime.utc(0)) & + acceptedRuntimeVersions + .map((rv) => task.runtimeVersion.equalsValue(rv)) + .reduce((a, b) => a | b)) + .orderBy((task) => [(task.runtimeVersion, Order.descending)]) + .select((task) => (task.runtimeVersion, task.state)) + .first + .fetchOrNulls(); + if (rt == null || state == null) { + return PackageStateInfo.empty(package: package); } - return PackageStateInfo.empty(package: package); + return PackageStateInfo( + runtimeVersion: rt, + package: package, + versions: state.versions, + ); }); return status ?? PackageStateInfo.empty(package: package); } @@ -1003,12 +1031,18 @@ class TaskBackend { Future Function(Payload payload) processPayload, ) async { await backfillTrackingState(); - await for (final state in _db.tasks.listAll()) { + + final packages = await db.tasks + .where((task) => task.runtimeVersion.equalsValue(runtimeVersion)) + .select((task) => (task.package,)) + .fetch(); + + for (final pkg in packages) { final zone = taskWorkerCloudCompute.zones.first; // ignore: invalid_use_of_visible_for_testing_member final payload = await updatePackageStateWithPendingVersions( - _db, - state, + db, + pkg, zone, taskWorkerCloudCompute.generateInstanceName(), ); @@ -1023,7 +1057,12 @@ class TaskBackend { Future adminBumpPriority(String packageName) async { // Ensure we're up-to-date. await trackPackage(packageName); - await _db.tasks.bumpPriority(packageName); + await db.tasks + .byKey(runtimeVersion, packageName) + .update((task, set) => set( + pendingAt: DateTime.utc(0).asExpr, + )) + .execute(); } /// Returns the latest version of the [package] which has a finished analysis. @@ -1032,23 +1071,20 @@ class TaskBackend { Future latestFinishedVersion(String package) async { final cachedValue = await cache.latestFinishedVersion(package).get(() async { - for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull(package, runtimeVersion: rt); - // skip states where the entry was created, but no analysis has not finished yet - if (state == null || state.hasNeverFinished) { - continue; - } - final bestVersion = state.versions?.entries - .where((e) => e.value.finished) - .map((e) => Version.parse(e.key)) - .latestVersion; - if (bestVersion != null) { - // sanity check: the version is not deleted - final pv = await packageBackend.lookupPackageVersion( - package, bestVersion.toString()); - if (pv != null) { - return bestVersion.toString(); - } + // Note that this ONLY considers newer runtimeVersions if not nothing has + // finished for this package in the current runtimeVersion! + final status = await packageStatus(package); + final bestVersion = status.versions.entries + .where((e) => e.value.finished) + .map((e) => Version.parse(e.key)) + .latestVersion; + + if (bestVersion != null) { + // sanity check: the version is not deleted + final pv = await packageBackend.lookupPackageVersion( + package, bestVersion.toString()); + if (pv != null) { + return bestVersion.toString(); } } return ''; @@ -1073,45 +1109,39 @@ class TaskBackend { final cachedValue = await cache.closestFinishedVersion(package, version).get(() async { final semanticVersion = Version.parse(version); - for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull(package, runtimeVersion: rt); - // Skip states where the entry was created, but the analysis has not finished yet. - if (state == null || state.hasNeverFinished) { - continue; - } - List? candidates; - if (preferDocsCompleted) { - final finishedDocCandidates = state.versions?.entries - .where((e) => e.value.docs) - .map((e) => Version.parse(e.key)) - .toList(); - if (finishedDocCandidates != null && - finishedDocCandidates.isNotEmpty) { - candidates = finishedDocCandidates; - } - } - candidates ??= state.versions?.entries - .where((e) => e.value.finished) + // Note that this ONLY considers newer runtimeVersions if not nothing has + // finished for this package in the current runtimeVersion! + final status = await packageStatus(package); + + List? candidates; + if (preferDocsCompleted) { + final finishedDocCandidates = status.versions.entries + .where((e) => e.value.docs) .map((e) => Version.parse(e.key)) .toList(); - if (candidates == null || candidates.isEmpty) { - continue; + if (finishedDocCandidates.isNotEmpty) { + candidates = finishedDocCandidates; } - if (candidates.contains(semanticVersion)) { - return version; - } - final newerCandidates = - candidates.where((e) => isNewer(semanticVersion, e)).toList(); - if (newerCandidates.isNotEmpty) { - // Return the earliest finished that is newer than [version]. - return newerCandidates - .reduce((a, b) => isNewer(a, b) ? a : b) - .toString(); - } - return candidates.latestVersion!.toString(); } - return ''; + candidates ??= status.versions.entries + .where((e) => e.value.finished) + .map((e) => Version.parse(e.key)) + .toList(); + + if (candidates.contains(semanticVersion)) { + return version; + } + + final newerCandidates = + candidates.where((e) => isNewer(semanticVersion, e)).toList(); + if (newerCandidates.isNotEmpty) { + // Return the earliest finished that is newer than [version]. + return newerCandidates + .reduce((a, b) => isNewer(a, b) ? a : b) + .toString(); + } + return candidates.latestVersion?.toString() ?? ''; }); return (cachedValue == null || cachedValue.isEmpty) ? null : cachedValue; } @@ -1140,18 +1170,15 @@ String? _extractBearerToken(shelf.Request request) { PackageVersionStateInfo _authorizeWorkerCallback( String package, String version, - PackageState state, + Task task, String token, ) { // fixed-time verification of aborted tokens - final isKnownAbortedToken = state.abortedTokens - ?.map((t) => t.isAuthorized(token)) - .fold(false, (a, b) => a || b); - if (isKnownAbortedToken ?? false) { + if (task.state.abortedTokens.any((t) => t.isAuthorized(token))) { throw TaskAbortedException('$package/$version has been aborted.'); } - final versionState = state.versions![version]; + final versionState = task.state.versions[version]; if (versionState == null) { throw NotFoundException.resource('$package/$version'); } @@ -1216,20 +1243,12 @@ List _versionsToTrack( }.nonNulls.where(visibleVersions.contains).toList(); } -List _updatedDependencies( - List? dependencies, - List? discoveredDependencies, { +List _newDependencies( + List existing, + List discoveredDependencies, { required String package, required String version, }) { - dependencies ??= []; - discoveredDependencies ??= []; - - // If discoveredDependencies is in dependencies, then we're done. - if (dependencies.toSet().containsAll(discoveredDependencies)) { - return dependencies; - } - // Check if any of the dependencies returned have invalid names, if this is // the case, then we should ignore the entire result! final hasBadDependencies = discoveredDependencies.any((dep) { @@ -1249,142 +1268,13 @@ List _updatedDependencies( } }); if (hasBadDependencies) { - return dependencies; // no changes! + return []; } - // An indexed property cannot be larger than 1500 bytes, strings counts as - // length + 1, so we prefer newly [discoveredDependencies] and then choose - // [dependencies], after which we just pick the dependencies we can get while - // staying below 1500 bytes. - var size = 0; + // Consider at-most 500 dependencies return discoveredDependencies - .followedBy(dependencies.whereNot(discoveredDependencies.contains)) - .takeWhile((p) => (size += p.length + 1) < 1500) - .sorted(); -} - -/// Low-level, narrowly typed data access methods for [PackageState] entity. -extension TaskDatastoreDBExt on DatastoreDB { - _TaskDataAccess get tasks => _TaskDataAccess(this); -} - -extension TaskTransactionWrapperExt on TransactionWrapper { - _TaskTransactionDataAcccess get tasks => _TaskTransactionDataAcccess(this); -} - -final class _TaskDataAccess { - final DatastoreDB _db; - - _TaskDataAccess(this._db); - - Future lookupOrNull( - String package, { - String? runtimeVersion, - }) async { - final key = PackageState.createKey(_db.emptyKey, - runtimeVersion ?? shared_versions.runtimeVersion, package); - return await _db.lookupOrNull(key); - } - - Future delete(String package) async { - final key = PackageState.createKey(_db.emptyKey, runtimeVersion, package); - await _db.commit(deletes: [key]); - } - - // GC the old [PackageState] entities - Future deleteBeforeGcRuntime() async { - await _db.deleteWithQuery( - _db.query() - ..filter('runtimeVersion <', gcBeforeRuntimeVersion), - ); - } - - Stream listAll() { - return _db.query().run(); - } - - Stream<({String package})> listAllForCurrentRuntime() async* { - final query = _db.query() - ..filter('runtimeVersion =', runtimeVersion); - await for (final ps in query.run()) { - yield (package: ps.package); - } - } - - Stream<({String package})> listDependenciesOfPackage( - String package, DateTime publishedAt) async* { - final query = _db.query() - ..filter('dependencies =', package) - ..filter('lastDependencyChanged <', publishedAt); - await for (final ps in query.run()) { - yield (package: ps.package); - } - } - - /// Returns whether the entry has been updated. - Future updateDependencyChanged( - String package, DateTime publishedAt) async { - return await withRetryTransaction(_db, (tx) async { - // Reload [state] within a transaction to avoid overwriting changes - // made by others trying to update state for another package. - final s = await tx.lookupValue( - PackageState.createKey(_db.emptyKey, runtimeVersion, package)); - if (s.lastDependencyChanged!.isBefore(publishedAt)) { - tx.insert( - s - ..lastDependencyChanged = publishedAt - ..derivePendingAt(), - ); - return true; - } - return false; - }); - } - - Future bumpPriority(String packageName) async { - await withRetryTransaction(_db, (tx) async { - final stateKey = - PackageState.createKey(_db.emptyKey, runtimeVersion, packageName); - final state = await tx.lookupOrNull(stateKey); - if (state != null) { - state.pendingAt = initialTimestamp; - tx.insert(state); - } - }); - } -} - -class _TaskTransactionDataAcccess { - final TransactionWrapper _tx; - - _TaskTransactionDataAcccess(this._tx); - - Future lookupOrNull(String name, - {String? runtimeVersion}) async { - final key = PackageState.createKey( - _tx.emptyKey, runtimeVersion ?? shared_versions.runtimeVersion, name); - return await _tx.lookupOrNull(key); - } - - Future deleteAllStates(String name, {Key? currentRuntimeKey}) async { - if (currentRuntimeKey != null) { - _tx.delete(currentRuntimeKey); - } - // also delete earlier runtime versions - for (final rv - in acceptedRuntimeVersions.where((rv) => rv != runtimeVersion)) { - final s = await lookupOrNull(name, runtimeVersion: rv); - if (s != null) { - _tx.delete(s.key); - } - } - } - - Future insert(PackageState state) async { - _tx.insert(state); - } - - Future update(PackageState state) async { - _tx.insert(state); - } + .sorted() + .take(500) + .whereNot(existing.contains) + .toList(); } diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index b070c561e5..ccf578ae7e 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -7,7 +7,9 @@ import 'dart:convert' show json; import 'package:clock/clock.dart'; import 'package:json_annotation/json_annotation.dart'; import 'package:pub_dev/admin/actions/actions.dart'; +import 'package:pub_dev/database/model.dart'; import 'package:pub_dev/shared/utils.dart'; +import 'package:pub_semver/pub_semver.dart'; import '../shared/datastore.dart' as db; import '../shared/versions.dart' as shared_versions; @@ -71,6 +73,7 @@ Duration taskRetryDelay(int attempts) => /// /// * `id`, is the `runtimeVersion / packageName`. /// * `PackageState` entities never have a parent. +@Deprecated('use db') @db.Kind(name: 'PackageState', idType: db.IdType.String) class PackageState extends db.ExpandoModel { /// Create a key for [runtimeVersion] and [packageName] using the Datastore's empty key. @@ -211,7 +214,69 @@ class PackageState extends db.ExpandoModel { '\n)'; } -/// State of a given `version` within a [PackageState]. +/// Derive [Task.pendingAt] using [TaskState] and [lastDependencyChanged]. +/// +/// When updating PackageState the pendingAt property is set to the minimum of: +/// * `scheduled + 31 days` for any version, +/// * `scheduled + 24 hours` for any version where `dependencyChanged > scheduled` +/// * `scheduled + 3 hours * attempts^2` for any version where `attempts > 0 && attempts < 3`. +DateTime derivePendingAt({ + required TaskState state, + required DateTime lastDependencyChanged, +}) { + return [ + // scheduled + 31 days + ...state.versions.values.map((v) => v.scheduled.add(taskRetriggerInterval)), + // scheduled + 24 hours, where scheduled < lastDependencyChanged + ...state.versions.values + .where((v) => v.scheduled.isBefore(lastDependencyChanged)) + .map((v) => v.scheduled.add(taskDependencyRetriggerCoolOff)), + // scheduled + 3 hours * attempts^2, where attempts > 0 && attempts < 3 + ...state.versions.values + .where((v) => v.attempts > 0 && v.attempts < taskRetryLimit) + .map((v) => v.scheduled.add(taskRetryDelay(v.attempts))), + // Pick the minimum of the candidates, default scheduling in year 3k + // if there is no date before that. + ].fold(DateTime(3000), (a, b) => a.isBefore(b) ? a : b); +} + +/// Return a list of pending versions for this package. +/// +/// When scheduling analysis of a package we piggyback along versions that +/// are going to be pending soon too. Hence, we return a version if: +/// * `now - scheduled > 21 days`, +/// * `lastDependencyChanged > scheduled`, or, +/// * `attempts > 0 && attempts < 3 && now - scheduled > 3 hours * attempts^2` +List derivePendingVersions({ + DateTime? at, + required TaskState state, + required DateTime lastDependencyChanged, +}) { + final at_ = at ?? clock.now(); + Duration timeSince(DateTime past) => at_.difference(past); + + return state.versions.entries + .where( + // NOTE: Any changes here must be reflected in [derivePendingAt] + (e) => + // If scheduled more than 21 days ago + timeSince(e.value.scheduled) > minTaskRetriggerInterval || + // If a dependency has changed since it was last scheduled + lastDependencyChanged.isAfter(e.value.scheduled) || + // If: + // - attempts > 0 (analysis is not done, and has been started) + // - no more than 3 attempts have been done, + // - now - scheduled > 3 hours * attempts^2 + (e.value.attempts > 0 && + e.value.attempts < taskRetryLimit && + timeSince(e.value.scheduled) > + taskRetryDelay(e.value.attempts)), + ) + .map((e) => Version.parse(e.key)) + .toList(); +} + +/// State of a given `version` within a [Task]. @JsonSerializable() class PackageVersionStateInfo { PackageVersionStatus get status { @@ -414,7 +479,7 @@ enum PackageVersionStatus { failed, } -/// Tracks a token that was removed from the [PackageState], but a worker +/// Tracks a token that was removed from the [Task], but a worker /// may still use it to report a completed task. Such workers may recieve /// an error code that says they shouldn't really panic on the rejection. @JsonSerializable() diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 3e036fbfa2..5a4f864d4d 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -3,9 +3,11 @@ import 'dart:convert'; import 'dart:math'; import 'package:_pub_shared/data/task_payload.dart'; +import 'package:basics/basics.dart'; import 'package:clock/clock.dart'; import 'package:logging/logging.dart' show Logger; import 'package:meta/meta.dart'; +import 'package:pub_dev/database/model.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; @@ -15,18 +17,18 @@ import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/models.dart'; -import 'package:pub_semver/pub_semver.dart'; final _log = Logger('pub.task.schedule'); const _maxInstancesPerIteration = 50; // Later consider something like: 50; -/// Schedule tasks from [PackageState] while [claim] is valid, and [abort] have +/// Schedule [Task] while [claim] is valid, and [abort] have /// not been resolved. Future schedule( GlobalLockClaim claim, CloudCompute compute, - DatastoreDB db, { + DatastoreDB db, + Database db2, { required Completer abort, }) async { /// Sleep [delay] time [since] timestamp, or now if not given. @@ -145,23 +147,28 @@ Future schedule( // Schedule analysis for some packages var pendingPackagesReviewed = 0; - await Future.wait(await (db.query() - ..filter('runtimeVersion =', runtimeVersion) - ..filter('pendingAt <=', clock.now()) - ..order('pendingAt') - ..limit(min( - _maxInstancesPerIteration, - max(0, activeConfiguration.maxTaskInstances - instances), - ))) - .run() - .map>((state) async { + + final tasks = await db2.tasks + .where((task) => task.runtimeVersion.equalsValue(runtimeVersion)) + .where((task) => task.pendingAt.isBeforeValue(clock.now())) + .orderBy((task) => [(task.pendingAt, Order.ascending)]) + .limit(min( + _maxInstancesPerIteration, + max(0, activeConfiguration.maxTaskInstances - instances), + )) + .fetch(); + await Future.wait(tasks.map>((task) async { pendingPackagesReviewed += 1; final instanceName = compute.generateInstanceName(); final zone = pickZone(); final payload = await updatePackageStateWithPendingVersions( - db, state, zone, instanceName); + db2, + task.package, + zone, + instanceName, + ); if (payload == null) { return; } @@ -180,7 +187,7 @@ Future schedule( await purgePackageCache(payload.package); _log.info( 'creating instance $instanceName in $zone for ' - 'package:${state.package}', + 'package:${task.package}', ); await compute.createInstance( zone: zone, @@ -229,23 +236,42 @@ Future schedule( banZone(zone, minutes: 15); } finally { if (rollbackPackageState) { - // Restore the state of the PackageState for versions that were + // Restore the state of the Task.state for versions that were // suppose to run on the instance we just failed to create. // If this doesn't work, we'll eventually retry. Hence, correctness // does not hinge on this transaction being successful. - await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull(state.key); - if (s == null) { + await db2.transact(() async { + final (lastDependencyChanged, state) = await db2.tasks + .byKey(runtimeVersion, task.package) + .select((task) => ( + task.lastDependencyChanged, + task.state, + )) + .fetchOrNulls(); + if (lastDependencyChanged == null || state == null) { return; // Presumably, the package was deleted. } - s.versions!.addEntries( - s.versions!.entries - .where((e) => e.value.instance == instanceName) - .map((e) => MapEntry(e.key, state.versions![e.key]!)), + final restoredState = TaskState( + abortedTokens: state.abortedTokens, + versions: { + ...state.versions, + ...state.versions + .whereValue((v) => v.instance == instanceName) + .map((v, _) => MapEntry(v, task.state.versions[v]!)), + }, ); - s.derivePendingAt(); - tx.insert(s); + + await db2.tasks + .byKey(runtimeVersion, task.package) + .update((_, set) => set( + state: restoredState.asExpr, + pendingAt: derivePendingAt( + state: restoredState, + lastDependencyChanged: lastDependencyChanged, + ).asExpr, + )) + .execute(); }); } } @@ -275,21 +301,24 @@ Future schedule( /// will be pending soon. @visibleForTesting Future updatePackageStateWithPendingVersions( - DatastoreDB db, - PackageState state, + Database db, + String package, String zone, String instanceName, ) async { - return await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull(state.key); - if (s == null) { + return await db.transact(() async { + final task = await db.tasks.byKey(runtimeVersion, package).fetch(); + if (task == null) { // presumably the package was deleted. return null; } final now = clock.now(); - final pendingVersions = - s.pendingVersions(at: now).map(Version.parse).toList(); + final pendingVersions = derivePendingVersions( + state: task.state, + lastDependencyChanged: task.lastDependencyChanged, + at: now, + ); if (pendingVersions.isEmpty) { // do not schedule anything return null; @@ -304,8 +333,10 @@ Future updatePackageStateWithPendingVersions( // - 3.0.0-dev1 // - 2.7.0-beta // - 1.0.0-dev - pendingVersions - .sort((a, b) => compareSemanticVersionsDesc(a, b, true, true)); + pendingVersions.sort( + (a, b) => compareSemanticVersionsDesc(a, b, true, true), + ); + // Promote the first prerelease version to the second position, e.g. // - 2.5.0 // - 3.0.0-dev2 @@ -326,28 +357,37 @@ Future updatePackageStateWithPendingVersions( } } - // Update PackageState - s.versions!.addAll({ + // Create updated TaskState + final state = TaskState(abortedTokens: task.state.abortedTokens, versions: { for (final v in pendingVersions.map((v) => v.toString())) v: PackageVersionStateInfo( scheduled: now, - attempts: s.versions![v]!.attempts + 1, + attempts: task.state.versions[v]!.attempts + 1, zone: zone, instance: instanceName, secretToken: createUuid(), - finished: s.versions![v]!.finished, + finished: task.state.versions[v]!.finished, ), }); - s.derivePendingAt(); - tx.insert(s); + + await db.tasks + .byKey(runtimeVersion, package) + .update((_, set) => set( + state: state.asExpr, + pendingAt: derivePendingAt( + state: state, + lastDependencyChanged: task.lastDependencyChanged, + ).asExpr, + )) + .execute(); // Create payload return Payload( - package: s.package, + package: package, pubHostedUrl: activeConfiguration.defaultServiceBaseUrl, versions: pendingVersions.map((v) => VersionTokenPair( version: v.toString(), - token: s.versions![v.toString()]!.secretToken!, + token: state.versions[v.toString()]!.secretToken!, )), ); }); diff --git a/app/pubspec.yaml b/app/pubspec.yaml index 7b9340b14b..7593ebd26b 100644 --- a/app/pubspec.yaml +++ b/app/pubspec.yaml @@ -52,6 +52,8 @@ dependencies: ulid: '2.0.1' tar: '2.0.0' api_builder: + typed_sql: ^0.1.3 + postgres: ^3.5.6 dev_dependencies: build_runner: '^2.0.0' diff --git a/pubspec.lock b/pubspec.lock index 21aa70e3d7..3e72157b8a 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -73,6 +73,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.2" + buffer: + dependency: transitive + description: + name: buffer + sha256: "389da2ec2c16283c8787e0adaede82b1842102f8c8aae2f49003a766c5c6b3d1" + url: "https://pub.dev" + source: hosted + version: "1.2.3" build: dependency: transitive description: @@ -657,6 +665,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.5.1" + postgres: + dependency: transitive + description: + name: postgres + sha256: "9aaa6f4872956adef653535a4e2133b167465c1a68c22b9bd0744ef1244e9393" + url: "https://pub.dev" + source: hosted + version: "3.5.6" protobuf: dependency: transitive description: @@ -713,6 +729,22 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.0" + sasl_scram: + dependency: transitive + description: + name: sasl_scram + sha256: a47207a436eb650f8fdcf54a2e2587b850dc3caef9973ce01f332b07a6fc9cb9 + url: "https://pub.dev" + source: hosted + version: "0.1.1" + saslprep: + dependency: transitive + description: + name: saslprep + sha256: "3d421d10be9513bf4459c17c5e70e7b8bc718c9fc5ad4ba5eb4f5fd27396f740" + url: "https://pub.dev" + source: hosted + version: "1.0.3" sass: dependency: transitive description: @@ -825,6 +857,14 @@ packages: url: "https://pub.dev" source: hosted version: "7.0.0" + sqlite3: + dependency: transitive + description: + name: sqlite3 + sha256: "310af39c40dd0bb2058538333c9d9840a2725ae0b9f77e4fd09ad6696aa8f66e" + url: "https://pub.dev" + source: hosted + version: "2.7.5" stack_trace: dependency: transitive description: @@ -921,6 +961,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.4.0" + typed_sql: + dependency: transitive + description: + name: typed_sql + sha256: b9cf7444aa0c02705fd0faf8156280aae75574bb614f4c3893128247e7839204 + url: "https://pub.dev" + source: hosted + version: "0.1.3" ulid: dependency: transitive description: @@ -929,6 +977,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.0.1" + unorm_dart: + dependency: transitive + description: + name: unorm_dart + sha256: "23d8bf65605401a6a32cff99435fed66ef3dab3ddcad3454059165df46496a3b" + url: "https://pub.dev" + source: hosted + version: "0.3.0" uuid: dependency: transitive description: diff --git a/tool/run_postgres_test_server.sh b/tool/run_postgres_test_server.sh new file mode 100755 index 0000000000..9f7dfd6203 --- /dev/null +++ b/tool/run_postgres_test_server.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +# Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +# for details. All rights reserved. Use of this source code is governed by a +# BSD-style license that can be found in the LICENSE file. + +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +ROOT="${SCRIPT_DIR}/.." + +# Create directory for exposing sockets +LOCK_DIR="/tmp/pub_dev_postgres/" +SOCKET_DIR="/tmp/pub_dev_postgres/run/" +mkdir -p "$SOCKET_DIR" + +# Use an extra lock file to avoid every creating more than one docker container +LOCKFILE="$LOCK_DIR/.docker.lock" +touch "$LOCKFILE" + +CONTAINER_ID=$(( + flock -ox 200 + if ! docker inspect 'pub_dev_postgres' > /dev/null 2>&1; then + docker run \ + --detach \ + --rm \ + --name pub_dev_postgres \ + -e POSTGRES_PASSWORD=postgres \ + -v "$SOCKET_DIR":/var/run/postgresql/ \ + --mount type=tmpfs,destination=/var/lib/postgresql/data \ + postgres:17 \ + postgres \ + -c fsync=off \ + -c synchronous_commit=off \ + -c full_page_writes=off \ + -c wal_level=minimal \ + -c max_wal_senders=0 \ + -c archive_mode=off + fi +) 200>"$LOCKFILE") + +if [ -n "$CONTAINER_ID" ]; then + if [ "$1" != '--quiet' ]; then + echo 'Started postgres test database' + echo 'Will terminate it in 6 hours, if you do not close this script' + fi + trap "/bin/bash -c 'docker kill "$CONTAINER_ID" > /dev/null 2>&1'" EXIT + sleep 6h; +else + if [ "$1" != '--quiet' ]; then + echo 'Found postgres test database already running!' + echo 'If you want to restart it, you can use with:' + echo 'docker kill pub_dev_postgres' + fi +fi