Skip to content

Commit

Permalink
Prepare for Mysql and Sqlite support
Browse files Browse the repository at this point in the history
  • Loading branch information
m-barthelemy committed Apr 5, 2020
1 parent 8f0b699 commit 0602485
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 23 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,27 @@ app.migrations.add(JobModelMigrate())

Load the `QueuesFluentDriver` driver:
```swift
app.queues.use(.fluent(.psql))
app.queues.use(.fluent(.psql, dbType: .postgres))
```


 

### Using a custom Database
You can optionally create a dedicated Database, set to `isdefault: false` and with a custom `DatabaseID` and use it for your Queues.
In that case you would initialize the Queues configuration like this:

```swift
let queuesDb = DatabaseID(string: "my_queues_db")
app.databases.use(.postgres(configuration: dbConfig), as: queuesDb, isDefault: false)
app.queues.use(.fluent(queuesDb))
app.queues.use(.fluent(queuesDb, dbType: .postgres))
```

### Customizing the jobs table name
By default the `JobModelMigrate` migration will create a table named `jobs`. You can customize the name during the migration :
```swift
app.migrations.add(JobModelMigrate(schema: "vapor_queues"))
```

## Caveats

Expand Down
34 changes: 25 additions & 9 deletions Sources/QueuesFluentDriver/FluentQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import SQLKit
struct FluentQueue {
let db: Database?
let context: QueueContext
let useForUpdateSkipLocked: Bool
let dbType: QueuesFluentDbType
let useSoftDeletes: Bool = true
}

extension FluentQueue: Queue {
static let model = JobModel(id: UUID.generateRandom(), key: "", data: Data())
static let model = JobModel(id: UUID.generateRandom(), key: "")

func get(_ id: JobIdentifier) -> EventLoopFuture<JobData> {
guard let database = db else {
Expand All @@ -25,8 +25,8 @@ extension FluentQueue: Queue {
.first()
.unwrap(or: QueuesFluentError.missingJob(id))
.flatMapThrowing { job in
let jobData = try JSONDecoder().decode(JobData.self, from: job.data)
return jobData
let jobData = job.data//try JSONDecoder().decode(JobData.self, from: job.data)
return jobData!
}
}

Expand All @@ -38,7 +38,7 @@ extension FluentQueue: Queue {
return database.eventLoop.makeFailedFuture(QueuesFluentError.invalidIdentifier)
}
do {
let data = try JSONEncoder().encode(jobStorage)
let data = jobStorage //try JSONEncoder().encode(jobStorage)
return JobModel(id: uuid, key: key, data: data).save(on: database).map { return }
}
catch {
Expand Down Expand Up @@ -95,18 +95,18 @@ extension FluentQueue: Queue {
}
let db = database as! SQLDatabase

var subQuery = db
var selectQuery = db
.select ()
.column ("\(Self.model.$id.key)")
.from (JobModel.schema)
.where ("\(Self.model.$state.key)", SQLBinaryOperator.equal, JobState.pending)
.orderBy("\(Self.model.$createdAt.path.first!)")
.limit (1)

if (self.useForUpdateSkipLocked) {
subQuery = subQuery.lockingClause(SQLForUpdateSkipLocked.forUpdateSkipLocked)
if (self.dbType != .sqlite) {
selectQuery = selectQuery.lockingClause(SQLForUpdateSkipLocked.forUpdateSkipLocked)
}
let subQueryGroup = SQLGroupExpression.init(subQuery.query)
let subQueryGroup = SQLGroupExpression.init(selectQuery.query)

let query = db
.update(JobModel.schema)
Expand All @@ -119,6 +119,22 @@ extension FluentQueue: Queue {
.orWhere(SQLReturning.returning(column: Self.model.$id.key))
.query

// UPDATE `jobs`
// SET `state` = ?, `updated_at` = ?
// WHERE `id` = (SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED)
// OR 1=2
// RETURNING "id"

// -- should be --

// BEGIN TRANSACTION
// SELECT `id` FROM `jobs` WHERE `state` = ? ORDER BY `created_at` ASC LIMIT 1 FOR UPDATE SKIP LOCKED;
// UPDATE `jobs`
// SET
// `state` = ?,
// `updated_at` = ?
// WHERE `id` = xxxxxxx;
// COMMIT
let (sql, binds) = db.serialize(query)

/*let driver = dbDriver()
Expand Down
20 changes: 15 additions & 5 deletions Sources/QueuesFluentDriver/FluentQueuesDriver.swift
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
import Fluent
import Queues


public enum QueuesFluentDbType {
case postgres
case mysql
case sqlite
}

public struct FluentQueuesDriver {
let databaseId: DatabaseID
let useSkipLocked: Bool
let dbType: QueuesFluentDbType
let useSoftDeletes: Bool

init(on databaseId: DatabaseID, useSoftDeletes: Bool, useSkipLocked: Bool) {
init(on databaseId: DatabaseID, dbType: QueuesFluentDbType, useSoftDeletes: Bool) {
self.databaseId = databaseId
self.useSkipLocked = useSkipLocked
self.dbType = dbType
self.useSoftDeletes = useSoftDeletes
}
}

extension FluentQueuesDriver: QueuesDriver {
public func makeQueue(with context: QueueContext) -> Queue {
let db = context.application.databases.database(databaseId, logger: context.logger, on: context.eventLoop)
let db = context
.application
.databases
.database(databaseId, logger: context.logger, on: context.eventLoop)
return FluentQueue(
db: db,
context: context,
useForUpdateSkipLocked: self.useSkipLocked
dbType: self.dbType
)
}

Expand Down
7 changes: 4 additions & 3 deletions Sources/QueuesFluentDriver/JobModel.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import Fluent
import Queues

enum JobState: String, Codable, CaseIterable {
/// Job created but should NOT be picked up for execution yet
Expand Down Expand Up @@ -27,7 +28,8 @@ class JobModel: Model {

/// The Job data
@Field(key: "data")
var data: Data
var data: JobData?
//var data: Data

/// The current state of the Job
@Field(key: "state")
Expand All @@ -45,10 +47,9 @@ class JobModel: Model {
var deletedAt: Date?


init(id: UUID, key: String, data: Data) {
init(id: UUID, key: String, data: JobData? = nil) {
self.id = id
self.key = key
self.data = data
self.state = .initial
}
}
6 changes: 5 additions & 1 deletion Sources/QueuesFluentDriver/JobModelMigrate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import Fluent
public struct JobModelMigrate: Migration {
public init() {}

public init(schema: String) {
JobModel.schema = schema
}

public func prepare(on database: Database) -> EventLoopFuture<Void> {
let model = FluentQueue.model
return database.schema(JobModel.schema)
.field(model.$id.key, .uuid, .identifier(auto: false))
.field(model.$key.key, .string, .required)
.field(model.$data.key, .data, .required)
.field(model.$data.key, .json, .required)
.field(model.$state.key, .string, .required)
.field(model.$createdAt.path.first!, .datetime)
.field(model.$updatedAt.path.first!, .datetime)
Expand Down
6 changes: 3 additions & 3 deletions Sources/QueuesFluentDriver/Queues.Provider+fluent.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import Queues

extension Application.Queues.Provider {
/// `database`: the Fluent `Database` already configured in your application.
/// `dbType`: one of `.psql`, `.mysq;`, `.sqlite`
/// `useSoftDeletes`: if set to `false`, really delete completed jobs insetad of using Fluent's default SoftDelete feature.
/// `useSkipLocked`: whether to use the `FOR UPDATE SKIP LOCKED` SQL feature.
/// **WARNING**: if set to `false`, with any database engine other than Sqlite, a given job could be picked by multiple workers, unless you only have one single Queues worker/process.
/// `useSkipLocked` is `true` by default and is supported on Mysql >= 8.0.1, MariaDB >= 10.3, Postgres >= 9.5, Oracle >= 9i(?).
/// Sqlite doesn't have nor need it since it uses full table locking on update. Other dbs are just too weird (SQL Server).
public static func fluent(_ databaseId: DatabaseID, useSoftDeletes: Bool = true, useSkipLocked: Bool = true) -> Self {
public static func fluent(_ dbId: DatabaseID, dbType: QueuesFluentDbType, useSoftDeletes: Bool = true) -> Self {
.init {
$0.queues.use(custom:
FluentQueuesDriver(on: databaseId, useSoftDeletes: useSoftDeletes, useSkipLocked: useSkipLocked)
FluentQueuesDriver(on: dbId, dbType: dbType, useSoftDeletes: useSoftDeletes)
)
}
}
Expand Down
4 changes: 4 additions & 0 deletions Sources/QueuesFluentDriver/QueuesFluentError.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import Queues

enum QueuesFluentError: Error {
/// Couldn't find a job with this Id
case missingJob(_ id: JobIdentifier)
/// The JobIdentifier is not a valid UUID
case invalidIdentifier
/// Error encoding the jon Payload to JSON
case jobDataEncodingError(_ message: String)
/// The given DatabaseID doesn't match any existing database configured in the Vapor app.
case databaseNotFound
}
25 changes: 25 additions & 0 deletions Sources/QueuesFluentDriver/SQLDatabase+query.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,39 @@ import Foundation
import Fluent
import SQLKit

struct sqlCache{
static var cache: [String: [Range<String.Index>]] = [:]
}

extension SQLDatabase {
func query(db: SQLDatabase, sql: String, binds: [Encodable] = []) -> SQLRawBuilder {
var sql = sql
if(sqlCache.cache.keys.contains(sql)) {
print("•••• Using SQL cache!")
}
else {
sqlCache.cache[sql] = prepareSql(sql: sql, binds: binds).1
}
var bindPos = 0
binds.forEach {
bindPos += 1
sql = sql.replacingOccurrences(of: "$\(bindPos)", with: "'\($0)'")
}
return db.raw(SQLQueryString(sql))
}

private func prepareSql(sql: String, binds: [Encodable]) -> (String, [Range<String.Index>]) {
var bindIndices: [Range<String.Index>] = []
var bindPos = 0
binds.forEach { bind in
bindPos += 1
let bindIndex = sql.range(of: "$\(bindPos)")
bindIndices.append(bindIndex!)
}
return (sql, bindIndices)
}

private func bindPrepared() {
// sql.replacingCharacters(in: bindIndex!, with: "'\($0)'")
}
}

0 comments on commit 0602485

Please sign in to comment.