-
Notifications
You must be signed in to change notification settings - Fork 71
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a simple postgresql storage provider (#350)
* Add a simple postgresql storage provider * Fix queries * Add unit test * Run postgres containers concurrently * Log postgres for debugging workflow * Adjust timeouts * Upgrade dependencies? * Adjust timeouts again
- Loading branch information
Showing
6 changed files
with
1,455 additions
and
845 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
import * as postgres from "postgres"; | ||
|
||
import { IStorageProvider } from "./IStorageProvider"; | ||
import { IAppserviceStorageProvider } from "./IAppserviceStorageProvider"; | ||
import { IFilterInfo } from "../IFilter"; | ||
|
||
/** | ||
* A barebones postgresql storage provider. It is not efficient, but it does work. | ||
* @category Storage providers | ||
*/ | ||
export class SimplePostgresStorageProvider implements IStorageProvider, IAppserviceStorageProvider { | ||
private readonly db: postgres.Sql; | ||
private readonly waitPromise: Promise<void>; | ||
private completedTransactions = []; | ||
|
||
/** | ||
* Creates a new simple postgresql storage provider. | ||
* @param connectionString The `postgres://` connection string to use. | ||
* @param trackTransactionsInMemory True (default) to track all received appservice transactions rather than on disk. | ||
* @param maxInMemoryTransactions The maximum number of transactions to hold in memory before rotating the oldest out. Defaults to 20. | ||
*/ | ||
constructor(connectionString: string, private trackTransactionsInMemory = true, private maxInMemoryTransactions = 20) { | ||
this.db = postgres(connectionString); | ||
|
||
this.waitPromise = Promise.all([ | ||
this.db` | ||
CREATE TABLE IF NOT EXISTS bot_metadata (key TEXT NOT NULL PRIMARY KEY, value TEXT); | ||
`, | ||
this.db` | ||
CREATE TABLE IF NOT EXISTS bot_kv (key TEXT NOT NULL PRIMARY KEY, value TEXT); | ||
`, | ||
this.db` | ||
CREATE TABLE IF NOT EXISTS appservice_users (user_id TEXT NOT NULL PRIMARY KEY, registered BOOLEAN NOT NULL); | ||
`, | ||
this.db` | ||
CREATE TABLE IF NOT EXISTS appservice_transactions (txn_id TEXT NOT NULL PRIMARY KEY, completed BOOLEAN NOT NULL); | ||
`, | ||
]).then(); | ||
} | ||
|
||
public async setSyncToken(token: string | null): Promise<any> { | ||
await this.waitPromise; | ||
return this.db` | ||
INSERT INTO bot_metadata (key, value) VALUES ('syncToken', ${token}) | ||
ON CONFLICT (key) DO UPDATE SET value = ${token}; | ||
`; | ||
} | ||
|
||
public async getSyncToken(): Promise<string | null> { | ||
await this.waitPromise; | ||
return (await this.db` | ||
SELECT value FROM bot_metadata WHERE key = 'syncToken'; | ||
`)[0]?.value; | ||
} | ||
|
||
public async setFilter(filter: IFilterInfo): Promise<any> { | ||
await this.waitPromise; | ||
const filterStr = filter ? JSON.stringify(filter) : null; | ||
return this.db` | ||
INSERT INTO bot_metadata (key, value) VALUES ('filter', ${filterStr}) | ||
ON CONFLICT (key) DO UPDATE SET value = ${filterStr}; | ||
`; | ||
} | ||
|
||
public async getFilter(): Promise<IFilterInfo> { | ||
await this.waitPromise; | ||
const value = (await this.db` | ||
SELECT value FROM bot_metadata WHERE key = 'filter'; | ||
`)[0]?.value; | ||
return typeof value === "string" ? JSON.parse(value) : value; | ||
} | ||
|
||
public async addRegisteredUser(userId: string): Promise<any> { | ||
await this.waitPromise; | ||
return this.db` | ||
INSERT INTO appservice_users (user_id, registered) VALUES (${userId}, TRUE) | ||
ON CONFLICT (user_id) DO UPDATE SET registered = TRUE; | ||
`; | ||
} | ||
|
||
public async isUserRegistered(userId: string): Promise<boolean> { | ||
await this.waitPromise; | ||
return !!(await this.db` | ||
SELECT registered FROM appservice_users WHERE user_id = ${userId}; | ||
`)[0]?.registered; | ||
} | ||
|
||
public async setTransactionCompleted(transactionId: string): Promise<any> { | ||
await this.waitPromise; | ||
if (this.trackTransactionsInMemory) { | ||
if (this.completedTransactions.indexOf(transactionId) === -1) { | ||
this.completedTransactions.push(transactionId); | ||
} | ||
if (this.completedTransactions.length > this.maxInMemoryTransactions) { | ||
this.completedTransactions = this.completedTransactions.reverse().slice(0, this.maxInMemoryTransactions).reverse(); | ||
} | ||
return; | ||
} | ||
|
||
return this.db` | ||
INSERT INTO appservice_transactions (txn_id, completed) VALUES (${transactionId}, TRUE) | ||
ON CONFLICT (txn_id) DO UPDATE SET completed = TRUE; | ||
`; | ||
} | ||
|
||
public async isTransactionCompleted(transactionId: string): Promise<boolean> { | ||
await this.waitPromise; | ||
if (this.trackTransactionsInMemory) { | ||
return this.completedTransactions.includes(transactionId); | ||
} | ||
|
||
return (await this.db` | ||
SELECT completed FROM appservice_transactions WHERE txn_id = ${transactionId}; | ||
`)[0]?.completed; | ||
} | ||
|
||
public async readValue(key: string): Promise<string | null | undefined> { | ||
await this.waitPromise; | ||
return (await this.db` | ||
SELECT value FROM bot_kv WHERE key = ${key}; | ||
`)[0]?.value; | ||
} | ||
|
||
public async storeValue(key: string, value: string): Promise<void> { | ||
await this.waitPromise; | ||
return this.db` | ||
INSERT INTO bot_kv (key, value) VALUES (${key}, ${value}) | ||
ON CONFLICT (key) DO UPDATE SET value = ${value}; | ||
`.then(); | ||
} | ||
|
||
public storageForUser(userId: string): IStorageProvider { | ||
return new NamespacedPostgresProvider(userId, this); | ||
} | ||
} | ||
|
||
/** | ||
* A namespaced storage provider that uses postgres to store information. | ||
* @category Storage providers | ||
*/ | ||
class NamespacedPostgresProvider implements IStorageProvider { | ||
constructor(private prefix: string, private parent: SimplePostgresStorageProvider) { | ||
} | ||
|
||
public setFilter(filter: IFilterInfo): Promise<any> | void { | ||
return this.parent.storeValue(`${this.prefix}_internal_filter`, JSON.stringify(filter)); | ||
} | ||
|
||
public async getFilter(): Promise<IFilterInfo> { | ||
return this.parent.readValue(`${this.prefix}_internal_filter`).then(r => r ? JSON.parse(r) : r); | ||
} | ||
|
||
public setSyncToken(token: string | null): Promise<any> | void { | ||
return this.parent.storeValue(`${this.prefix}_internal_syncToken`, token ?? ""); | ||
} | ||
|
||
public async getSyncToken(): Promise<string> { | ||
return this.parent.readValue(`${this.prefix}_internal_syncToken`).then(r => r === "" ? null : r); | ||
} | ||
|
||
public storeValue(key: string, value: string): Promise<any> | void { | ||
return this.parent.storeValue(`${this.prefix}_internal_kv_${key}`, value); | ||
} | ||
|
||
public readValue(key: string): string | Promise<string | null | undefined> | null | undefined { | ||
return this.parent.readValue(`${this.prefix}_internal_kv_${key}`); | ||
} | ||
} |
Oops, something went wrong.