Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): performance and caching poc #2954

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d541d90
feat(workweek): updated grafana dashboards.
koekiebox Aug 26, 2024
e63608b
feat(backend): ww performance create quote metrics (#2896)
BlairCurrey Aug 26, 2024
56de99c
Merge branch 'main' into bc/ww-performance-jason
koekiebox Aug 27, 2024
748e314
Merge branch 'bc/ww-performance' into bc/ww-performance-jason
koekiebox Aug 27, 2024
b2ac5eb
feat(backend): tel start/stopTimer
BlairCurrey Aug 27, 2024
f78de7c
Merge branch 'bc/ww-performance' into bc/ww-performance-jason
koekiebox Aug 27, 2024
34196f8
fix(backend): test tel mock
BlairCurrey Aug 27, 2024
4d6c933
Merge branch 'bc/ww-performance' into bc/ww-performance-jason
koekiebox Aug 27, 2024
23271ef
Merge branch 'main' into bc/ww-performance
BlairCurrey Aug 27, 2024
a470233
fix(backend): decrease workers to prevent locking, add traces
BlairCurrey Aug 27, 2024
9d2e24e
fix(backend): decrease op worker
BlairCurrey Aug 27, 2024
50babd1
feat(workweek): updated grafana dashboards.
koekiebox Aug 27, 2024
317dcf0
feat(backend): add tracing, metrics, get peer tweak (#2910)
BlairCurrey Aug 28, 2024
60d93f6
Merge branch 'refs/heads/bc/ww-performance' into bc/ww-performance-jason
koekiebox Aug 28, 2024
57362af
feat(workweek): merge.
koekiebox Aug 28, 2024
5b5bd10
feat(workweek): revert example.json
koekiebox Aug 28, 2024
3870bcc
fix(backend): tel dep injection
BlairCurrey Aug 28, 2024
a280ffc
Merge branch 'refs/heads/bc/ww-performance' into bc/ww-performance-jason
koekiebox Aug 28, 2024
02bbc52
Merge branch 'main' into bc/ww-performance-jason
koekiebox Aug 28, 2024
8f79fdc
feat(workweek): asset caching.
koekiebox Aug 29, 2024
106f95e
feat(backend): add tracing
BlairCurrey Aug 29, 2024
802e9ba
Merge branch 'main' into bc/ww-performance
BlairCurrey Aug 29, 2024
865cff3
chore: WIP dont block unit tests with integration
BlairCurrey Aug 29, 2024
a457421
chore: restore correct worker idle
BlairCurrey Aug 29, 2024
0ccb1c6
chore: revert integration test
BlairCurrey Aug 29, 2024
81c43f6
feat(workweek): outgoing payment asset cache.
koekiebox Aug 29, 2024
71c40f5
feat: add op service graph
BlairCurrey Aug 29, 2024
5b9197f
Merge branch 'bc/ww-performance' into bc/ww-performance-jason
BlairCurrey Aug 29, 2024
7a2a447
Merge remote-tracking branch 'refs/remotes/origin/bc/ww-performance-j…
BlairCurrey Aug 29, 2024
7be75f2
chore: remove unecessary if
BlairCurrey Aug 29, 2024
f7eb67c
chore: format
BlairCurrey Aug 29, 2024
0a2968d
chore: use prom var
BlairCurrey Aug 29, 2024
81a9dd7
Revert "chore: remove unecessary if"
BlairCurrey Aug 29, 2024
12ccd8d
fix(backend): missing quote join
BlairCurrey Aug 29, 2024
0fdbd1c
chore: format
BlairCurrey Aug 29, 2024
16b2ea8
fix(backend): missing quote join
BlairCurrey Aug 29, 2024
2ec8c61
fix: use asset cache
BlairCurrey Aug 29, 2024
d45c108
feat(workweek): share lock for quote fetch.
koekiebox Aug 29, 2024
a9d0a0f
feat(grafana): add tps* metric
BlairCurrey Aug 29, 2024
38197e3
fix(backend): telemetry timer
BlairCurrey Aug 30, 2024
e88b675
feat(workweek): share lock for quote fetch.
koekiebox Aug 30, 2024
a4bc3f2
feat(workweek): share lock for quote fetch.
koekiebox Aug 30, 2024
60cd14e
refactor: use new startTimer
BlairCurrey Sep 4, 2024
68a5e97
fix(grafana): timer query
BlairCurrey Sep 4, 2024
d5693eb
feat(workweek): fix caching issues on test cases.
koekiebox Sep 5, 2024
aacfdf6
feat(workweek): fix integration tests.
koekiebox Sep 6, 2024
cad7474
feat(workweek): formatting
koekiebox Sep 6, 2024
282a717
feat(workweek): fix test cases.
koekiebox Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,600 changes: 1,438 additions & 162 deletions localenv/telemetry/grafana/provisioning/dashboards/example.json

Large diffs are not rendered by default.

1,262 changes: 1,262 additions & 0 deletions localenv/telemetry/grafana/provisioning/dashboards/jason_example.json

Large diffs are not rendered by default.

1,894 changes: 1,894 additions & 0 deletions localenv/telemetry/grafana/provisioning/dashboards/local_example.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions localenv/telemetry/otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ processors:
batch:

exporters:
logging:
loglevel: info
debug:
verbosity: detailed
prometheus:
Expand All @@ -18,6 +20,9 @@ exporters:
insecure: true

service:
telemetry:
logs:
level: warn
pipelines:
metrics:
receivers: [otlp]
Expand Down
5 changes: 5 additions & 0 deletions packages/backend/src/accounting/psql/balance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export async function getAccountBalances(
account: LedgerAccount,
trx?: TransactionOrKnex
): Promise<AccountBalance> {
const stopTimer = deps.telemetry?.startTimer('psql_getAccountBalances', {
callName: 'psql_getAccountBalances'
})
try {
const queryResult = await (trx ?? deps.knex).raw(
`
Expand All @@ -38,6 +41,8 @@ export async function getAccountBalances(
const debitsPosted = BigInt(queryResult.rows[0].debitsPosted)
const debitsPending = BigInt(queryResult.rows[0].debitsPending)

stopTimer && stopTimer()

return {
creditsPosted,
creditsPending,
Expand Down
14 changes: 13 additions & 1 deletion packages/backend/src/accounting/psql/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ import {
getAccountTransfers
} from './ledger-transfer'
import { LedgerTransfer, LedgerTransferType } from './ledger-transfer/model'
import { TelemetryService } from '../../telemetry/service'
import { AssetService } from '../../asset/service'

export interface ServiceDependencies extends BaseService {
fetchAssetService?: () => Promise<AssetService>
knex: TransactionOrKnex
withdrawalThrottleDelay?: number
telemetry?: TelemetryService
}

export function createAccountingService(
Expand Down Expand Up @@ -145,13 +149,21 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
accountRef: string
): Promise<bigint | undefined> {
const stopTimer = deps.telemetry?.startTimer('psql_getAccountTotalSent', {
callName: 'psql_getAccountTotalSent'
})
const account = await getLiquidityAccount(deps, accountRef)

if (!account) {
stopTimer && stopTimer()
return
}

return (await getAccountBalances(deps, account)).debitsPosted
const totalsSent = (await getAccountBalances(deps, account)).debitsPosted

stopTimer && stopTimer()

return totalsSent
}

export async function getAccountsTotalSent(
Expand Down
9 changes: 7 additions & 2 deletions packages/backend/src/accounting/service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TransactionOrKnex } from 'objection'
import { BaseService } from '../shared/baseService'
import { TransferError, isTransferError } from './errors'
import { AssetService } from '../asset/service'

export enum LiquidityAccountType {
ASSET = 'ASSET',
Expand Down Expand Up @@ -40,6 +41,8 @@ export interface LiquidityAccount {
export interface OnCreditOptions {
totalReceived: bigint
withdrawalThrottleDelay?: number
// TODO: make this required. should only need updating the tests to do this.
fetchAssetService?: () => Promise<AssetService>
}

export interface OnDebitOptions {
Expand Down Expand Up @@ -133,6 +136,7 @@ export interface TransferToCreate {
}

export interface BaseAccountingServiceDependencies extends BaseService {
fetchAssetService?: () => Promise<AssetService>
withdrawalThrottleDelay?: number
}

Expand Down Expand Up @@ -160,7 +164,7 @@ export async function createAccountToAccountTransfer(
getAccountBalance
} = args

const { withdrawalThrottleDelay } = deps
const { withdrawalThrottleDelay, fetchAssetService } = deps

const { sourceAccount, destinationAccount, sourceAmount, destinationAmount } =
transferArgs
Expand Down Expand Up @@ -226,7 +230,8 @@ export async function createAccountToAccountTransfer(

await destinationAccount.onCredit({
totalReceived,
withdrawalThrottleDelay
withdrawalThrottleDelay,
fetchAssetService
})
}
},
Expand Down
9 changes: 9 additions & 0 deletions packages/backend/src/accounting/tigerbeetle/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import {
getAccountTransfers
} from './transfers'
import { toTigerBeetleId } from './utils'
import { TelemetryService } from '../../telemetry/service'
import { AssetService } from '../../asset/service'

export enum TigerBeetleAccountCode {
LIQUIDITY_WEB_MONETIZATION = 1,
Expand Down Expand Up @@ -67,8 +69,10 @@ export const convertToTigerBeetleTransferCode: {
}

export interface ServiceDependencies extends BaseService {
fetchAssetService?: () => Promise<AssetService>
tigerBeetle: Client
withdrawalThrottleDelay?: number
telemetry?: TelemetryService
}

export function createAccountingService(
Expand Down Expand Up @@ -218,10 +222,15 @@ export async function getAccountTotalSent(
deps: ServiceDependencies,
id: string
): Promise<bigint | undefined> {
const stopTimer = deps.telemetry?.startTimer('tb_getAccountTotalSent', {
callName: 'tb_getAccountTotalSent'
})
const account = (await getAccounts(deps, [id]))[0]
if (account) {
stopTimer && stopTimer()
return account.debits_posted
}
stopTimer && stopTimer()
}

export async function getAccountsTotalSent(
Expand Down
48 changes: 41 additions & 7 deletions packages/backend/src/asset/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { BaseService } from '../shared/baseService'
import { AccountingService, LiquidityAccountType } from '../accounting/service'
import { WalletAddress } from '../open_payments/wallet_address/model'
import { Peer } from '../payment-method/ilp/peer/model'
import { Quote } from '../open_payments/quote/model'
import { IncomingPayment } from '../open_payments/payment/incoming/model'
import { CacheDataStore } from '../cache'

export interface AssetOptions {
code: string
Expand All @@ -28,37 +31,46 @@ export interface DeleteOptions {
deletedAt: Date
}

export type ToSetOn = Quote | IncomingPayment | WalletAddress | Peer | undefined

export interface AssetService {
create(options: CreateOptions): Promise<Asset | AssetError>
update(options: UpdateOptions): Promise<Asset | AssetError>
delete(options: DeleteOptions): Promise<Asset | AssetError>
get(id: string): Promise<void | Asset>
setOn(obj: ToSetOn): Promise<void | Asset>
getPage(pagination?: Pagination, sortOrder?: SortOrder): Promise<Asset[]>
getAll(): Promise<Asset[]>
}

interface ServiceDependencies extends BaseService {
accountingService: AccountingService
cacheDataStore: CacheDataStore
}

export async function createAssetService({
logger,
knex,
accountingService
accountingService,
cacheDataStore
}: ServiceDependencies): Promise<AssetService> {
const log = logger.child({
service: 'AssetService'
})

const deps: ServiceDependencies = {
logger: log,
knex,
accountingService
accountingService,
cacheDataStore
}

return {
create: (options) => createAsset(deps, options),
update: (options) => updateAsset(deps, options),
delete: (options) => deleteAsset(deps, options),
get: (id) => getAsset(deps, id),
setOn: (toSetOn) => setAssetOn(deps, toSetOn),
getPage: (pagination?, sortOrder?) =>
getAssetsPage(deps, pagination, sortOrder),
getAll: () => getAll(deps)
Expand All @@ -78,6 +90,8 @@ async function createAsset(
.first()

if (deletedAsset) {
await deps.cacheDataStore.delete(deletedAsset.id)

// if found, enable
return await Asset.query(deps.knex)
.patchAndFetchById(deletedAsset.id, { deletedAt: null })
Expand Down Expand Up @@ -120,9 +134,12 @@ async function updateAsset(
throw new Error('Knex undefined')
}
try {
return await Asset.query(deps.knex)
const returnVal = await Asset.query(deps.knex)
.patchAndFetchById(id, { withdrawalThreshold, liquidityThreshold })
.throwIfNotFound()

await deps.cacheDataStore.delete(id)
return returnVal
} catch (err) {
if (err instanceof NotFoundError) {
return AssetError.UnknownAsset
Expand All @@ -139,6 +156,8 @@ async function deleteAsset(
if (!deps.knex) {
throw new Error('Knex undefined')
}

await deps.cacheDataStore.delete(id)
try {
// return error in case there is a peer or wallet address using the asset
const peer = await Peer.query(deps.knex).where('assetId', id).first()
Expand All @@ -152,7 +171,6 @@ async function deleteAsset(
if (walletAddress) {
return AssetError.CannotDeleteInUseAsset
}

return await Asset.query(deps.knex)
.patchAndFetchById(id, { deletedAt: deletedAt.toISOString() })
.throwIfNotFound()
Expand All @@ -168,19 +186,35 @@ async function getAsset(
deps: ServiceDependencies,
id: string
): Promise<void | Asset> {
return await Asset.query(deps.knex).whereNull('deletedAt').findById(id)
const inMem = (await deps.cacheDataStore.get(id)) as Asset
if (inMem) return inMem

const asset = await Asset.query(deps.knex).whereNull('deletedAt').findById(id)
if (asset) await deps.cacheDataStore.set(asset.id, asset)

return asset
}

async function getAssetsPage(
deps: ServiceDependencies,
pagination?: Pagination,
sortOrder?: SortOrder
): Promise<Asset[]> {
return await Asset.query(deps.knex)
return Asset.query(deps.knex)
.whereNull('deletedAt')
.getPage(pagination, sortOrder)
}

async function getAll(deps: ServiceDependencies): Promise<Asset[]> {
return await Asset.query(deps.knex).whereNull('deletedAt')
return Asset.query(deps.knex).whereNull('deletedAt')
}

async function setAssetOn(
deps: ServiceDependencies,
obj: ToSetOn
): Promise<void | Asset> {
if (!obj) return
const asset = await getAsset(deps, obj.assetId)
if (asset) obj.asset = asset
return asset
}
46 changes: 46 additions & 0 deletions packages/backend/src/cache/cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { CacheDataStore, CacheSupport } from './index'

export interface Cached {
expiry: number
data: CacheSupport
}

export function createInMemoryDataStore(keyTtlMs: number): CacheDataStore {
const map = new Map<string, Cached>()

const getAndCheckExpiry = (key: string): Cached | undefined => {
const cached = map.get(key)
if (cached?.expiry && Date.now() >= cached.expiry) {
deleteKey(key)
return undefined
}
return cached
}

const deleteKey = (key: string) => map.delete(key)

return {
async get(key: string): Promise<CacheSupport> {
if (keyTtlMs < 1) return undefined
const cached = getAndCheckExpiry(key)
return cached?.data
},
async getKeyExpiry(key: string): Promise<Date | undefined> {
const cached = getAndCheckExpiry(key)

return cached ? new Date(cached.expiry) : undefined
},
async delete(key: string): Promise<void> {
deleteKey(key)
},
async set(key: string, value: CacheSupport): Promise<boolean> {
if (keyTtlMs < 1) return true
map.set(key, { expiry: Date.now() + keyTtlMs, data: value })
return true
},
async deleteAll(): Promise<void> {
map.clear()
}
}
}
export { CacheDataStore }
19 changes: 19 additions & 0 deletions packages/backend/src/cache/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Quote } from '../open_payments/quote/model'
import { WalletAddress } from '../open_payments/wallet_address/model'
import { OutgoingPayment } from '../open_payments/payment/outgoing/model'
import { Asset } from '../asset/model'

export type CacheSupport =
| Quote
| WalletAddress
| OutgoingPayment
| Asset
| undefined

export interface CacheDataStore {
get(key: string): Promise<CacheSupport>
getKeyExpiry(key: string): Promise<Date | undefined>
set(key: string, value: CacheSupport): Promise<boolean>
delete(key: string): Promise<void>
deleteAll(): Promise<void>
}
5 changes: 3 additions & 2 deletions packages/backend/src/config/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export const Config = {
authServerGrantUrl: envString('AUTH_SERVER_GRANT_URL'),
authServerIntrospectionUrl: envString('AUTH_SERVER_INTROSPECTION_URL'),

outgoingPaymentWorkers: envInt('OUTGOING_PAYMENT_WORKERS', 4),
outgoingPaymentWorkers: envInt('OUTGOING_PAYMENT_WORKERS', 1),
outgoingPaymentWorkerIdle: envInt('OUTGOING_PAYMENT_WORKER_IDLE', 200), // milliseconds

incomingPaymentWorkers: envInt('INCOMING_PAYMENT_WORKERS', 1),
Expand Down Expand Up @@ -187,7 +187,8 @@ export const Config = {
'INCOMING_PAYMENT_EXPIRY_MAX_MS',
2592000000
), // 30 days
enableSpspPaymentPointers: envBool('ENABLE_SPSP_PAYMENT_POINTERS', true)
enableSpspPaymentPointers: envBool('ENABLE_SPSP_PAYMENT_POINTERS', true),
localCacheDuration: envInt('LOCAL_CACHE_DURATION', 5000)
}

function parseRedisTlsConfig(
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/fee/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async function getLatestFee(
type: FeeType
): Promise<Fee | undefined> {
return await Fee.query(deps.knex)
// TODO: index on assetId/type
.where({ assetId, type })
.orderBy('createdAt', 'desc')
.first()
Expand Down
5 changes: 4 additions & 1 deletion packages/backend/src/graphql/resolvers/liquidity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ describe('Liquidity Resolvers', (): void => {
const timeoutTwoPhase = 10

beforeAll(async (): Promise<void> => {
deps = await initIocContainer(Config)
deps = initIocContainer({
...Config,
localCacheDuration: 0
})
appContainer = await createTestApp(deps)
knex = appContainer.knex
accountingService = await deps.use('accountingService')
Expand Down
Loading
Loading