Skip to content

Commit

Permalink
Merge pull request #339 from wavesplatform/release/v0.31.4
Browse files Browse the repository at this point in the history
Release/v0.31.4
  • Loading branch information
stuffy-idler authored Apr 8, 2021
2 parents fc22593 + 3d756d4 commit 5903c62
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 151 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "data-service",
"version": "0.31.2",
"version": "0.31.4",
"description": "Waves data service",
"main": "src/index.js",
"repository": "git@github.com:wavesplatform/data-service.git",
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import * as notFoundHandler from './middleware/notFoundHandler';
import { loadConfig } from './loadConfig';
import router from './endpoints';

export const WavesId: string = 'WAVES';

const app = unsafeKoaQs(new Koa());

const options = loadConfig();
Expand Down
31 changes: 28 additions & 3 deletions src/loadConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ export type MatcherConfig = {
};
};

export type RatesConfig = {
pairAcceptanceVolumeThreshold: number;
thresholdAssetId: string;
};

export type DefaultConfig = PostgresConfig & ServerConfig & LoggerConfig;

export type DataServiceConfig = PostgresConfig &
ServerConfig &
LoggerConfig &
MatcherConfig;
MatcherConfig &
RatesConfig;

const commonEnvVariables = ['PGHOST', 'PGDATABASE', 'PGUSER', 'PGPASSWORD'];

Expand All @@ -49,14 +55,22 @@ export const loadDefaultConfig = (): DefaultConfig => {
postgresPoolSize: process.env.PGPOOLSIZE ? parseInt(process.env.PGPOOLSIZE) : 20,
postgresStatementTimeout:
isNil(process.env.PGSTATEMENTTIMEOUT) ||
isNaN(parseInt(process.env.PGSTATEMENTTIMEOUT))
isNaN(parseInt(process.env.PGSTATEMENTTIMEOUT))
? false
: parseInt(process.env.PGSTATEMENTTIMEOUT),
logLevel: process.env.LOG_LEVEL || 'info',
};
};

const envVariables = ['DEFAULT_MATCHER'];
const envVariables = ['DEFAULT_MATCHER', 'RATE_PAIR_ACCEPTANCE_VOLUME_THRESHOLD', 'RATE_THRESHOLD_ASSET_ID'];

const ensurePositiveNumber = (x: number, msg: string) => {
if (x > 0) {
return x;
}

throw new Error(msg);
};

const load = (): DataServiceConfig => {
// assert all necessary env vars are set
Expand All @@ -75,9 +89,20 @@ const load = (): DataServiceConfig => {
matcher.matcher.settingsURL = process.env.MATCHER_SETTINGS_URL;
}

const volumeThreshold = ensurePositiveNumber(
parseInt(process.env.RATE_PAIR_ACCEPTANCE_VOLUME_THRESHOLD as string),
'RATE_PAIR_ACCEPTANCE_VOLUME_THRESHOLD environment variable should be a positive integer'
);

const rate: RatesConfig = {
pairAcceptanceVolumeThreshold: volumeThreshold,
thresholdAssetId: process.env.RATE_THRESHOLD_ASSET_ID as string
};

return {
...loadDefaultConfig(),
...matcher,
...rate,
};
};

Expand Down
31 changes: 20 additions & 11 deletions src/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import createTransferTxsService, { TransferTxsService } from './transactions/tra
import createUpdateAssetInfoTxsService, { UpdateAssetInfoTxsService } from './transactions/updateAssetInfo';
import { DataServiceConfig } from '../loadConfig';
import createRateService, { RateCacheImpl } from './rates';
import { IThresholdAssetRateService, ThresholdAssetRateService } from './rates/ThresholdAssetRateService';

import { PairOrderingServiceImpl } from './PairOrderingService';

Expand All @@ -60,6 +61,9 @@ export type CommonServiceDependencies = {

export type RateSerivceCreatorDependencies = CommonServiceDependencies & {
cache: RateCache;
pairs: PairsService;
pairAcceptanceVolumeThreshold: number,
thresholdAssetRateService: IThresholdAssetRateService
};

export type ServiceMesh = {
Expand Down Expand Up @@ -130,6 +134,19 @@ export default ({
cache: assetsCache,
});

const pairsNoAsyncValidation = createPairsService({
...commonDeps,
cache: pairsCache,
validatePairs: () => taskOf(undefined),
});
const pairsWithAsyncValidation = createPairsService({
...commonDeps,
cache: pairsCache,
validatePairs: validatePairs(assets, pairOrderingService),
});

const thresholdAssetRateService = new ThresholdAssetRateService(options.thresholdAssetId, options.matcher.defaultMatcherAddress, pairsNoAsyncValidation);

const aliasTxs = createAliasTxsService(commonDeps);
const burnTxs = createBurnTxsService(commonDeps);
const dataTxs = createDataTxsService(commonDeps);
Expand All @@ -150,17 +167,9 @@ export default ({
const rates = createRateService({
...commonDeps,
cache: ratesCache,
});

const pairsNoAsyncValidation = createPairsService({
...commonDeps,
cache: pairsCache,
validatePairs: () => taskOf(undefined),
});
const pairsWithAsyncValidation = createPairsService({
...commonDeps,
cache: pairsCache,
validatePairs: validatePairs(assets, pairOrderingService),
pairs: pairsNoAsyncValidation,
pairAcceptanceVolumeThreshold: options.pairAcceptanceVolumeThreshold,
thresholdAssetRateService: thresholdAssetRateService,
});

const candlesNoAsyncValidation = createCandlesService({
Expand Down
114 changes: 76 additions & 38 deletions src/services/rates/RateEstimator.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
import { BigNumber } from '@waves/data-entities';
import { Task } from 'folktale/concurrency/task';
import { Maybe } from 'folktale/maybe';
import { isNil } from 'ramda';

import { tap } from '../../utils/tap';
import { AssetIdsPair, RateMgetParams } from '../../types';
import { AppError, DbError, Timeout } from '../../errorHandling';
import { AssetIdsPair, Pair, RateMgetParams } from '../../types';
import { tap } from '../../utils/tap';
import { isEmpty } from '../../utils/fp/maybeOps';

import { partitionByPreCount, AsyncMget, RateCache } from './repo';
import { PairsService } from '../pairs';
import { RateWithPairIds } from '../rates';
import { IThresholdAssetRateService } from './ThresholdAssetRateService';
import { partitionByPreComputed, AsyncMget, RateCache } from './repo';
import { RateCacheKey } from './repo/impl/RateCache';
import RateInfoLookup from './repo/impl/RateInfoLookup';
import { isEmpty } from '../../utils/fp/maybeOps';
import { RateWithPairIds } from '../rates';

type ReqAndRes<TReq, TRes> = {
req: TReq;
res: Maybe<TRes>;
};

export type VolumeAwareRateInfo = RateWithPairIds & { volumeWaves: BigNumber | null };

export default class RateEstimator
implements
AsyncMget<
RateMgetParams,
ReqAndRes<AssetIdsPair, RateWithPairIds>,
AppError
> {
AsyncMget<RateMgetParams, ReqAndRes<AssetIdsPair, RateWithPairIds>, AppError> {
constructor(
private readonly cache: RateCache,
private readonly remoteGet: AsyncMget<
RateMgetParams,
RateWithPairIds,
DbError | Timeout
>
) {}
>,
private readonly pairs: PairsService,
private readonly pairAcceptanceVolumeThreshold: number,
private readonly thresholdAssetRateService: IThresholdAssetRateService
) { }

mget(
request: RateMgetParams
Expand All @@ -45,18 +49,18 @@ export default class RateEstimator
matcher,
});

const cacheUnlessCached = (item: AssetIdsPair, rate: BigNumber) => {
const cacheUnlessCached = (item: VolumeAwareRateInfo) => {
const key = getCacheKey(item);

if (!this.cache.has(key)) {
this.cache.set(key, rate);
this.cache.set(key, item);
}
};

const cacheAll = (items: Array<RateWithPairIds>) =>
items.forEach(it => cacheUnlessCached(it, it.rate));
const cacheAll = (items: Array<VolumeAwareRateInfo>) =>
items.forEach((it) => cacheUnlessCached(it));

const { preCount, toBeRequested } = partitionByPreCount(
const { preComputed, toBeRequested } = partitionByPreComputed(
this.cache,
pairs,
getCacheKey,
Expand All @@ -65,29 +69,63 @@ export default class RateEstimator

return this.remoteGet
.mget({ pairs: toBeRequested, matcher, timestamp })
.map(results => {
if (shouldCache) cacheAll(results);
return results;
})
.map(data => new RateInfoLookup(data.concat(preCount)))
.map(lookup =>
pairs.map(idsPair => ({
req: idsPair,
res: lookup.get(idsPair),
}))
)
.map(
tap(data =>
data.forEach(reqAndRes =>
reqAndRes.res.map(
tap(res => {
if (shouldCache) {
cacheUnlessCached(reqAndRes.req, res.rate);
}
})
.chain((pairsWithRates) =>
this.pairs
.mget({
pairs: pairsWithRates.map((pairWithRate) => ({
amountAsset: pairWithRate.amountAsset,
priceAsset: pairWithRate.priceAsset,
})),
matcher: request.matcher,
})
.map((foundPairs) =>
foundPairs.data.map((pair: Pair, idx: number) => {
if (isNil(pair.data)) {
return {
...pairsWithRates[idx],
volumeWaves: new BigNumber(0),
};
} else {
return {
amountAsset: pair.amountAsset as string,
priceAsset: pair.priceAsset as string,
volumeWaves: pair.data.volumeWaves,
rate: pairsWithRates[idx].rate,
};
}
})
)
.map(
tap((results) => {
if (shouldCache) cacheAll(results);
})
)
.chain(
(data: Array<{ amountAsset: string, priceAsset: string, volumeWaves: BigNumber | null, rate: Maybe<BigNumber> }>) =>
this.thresholdAssetRateService.get().map(thresholdAssetRate => new RateInfoLookup(
data.concat(preComputed),
new BigNumber(this.pairAcceptanceVolumeThreshold).dividedBy(thresholdAssetRate),
))
)
.map((lookup) =>
pairs.map((idsPair) => ({
req: idsPair,
res: lookup.get(idsPair),
}))
)
.map(
tap((data) =>
data.forEach((reqAndRes) =>
reqAndRes.res.map(
tap((res) => {
if (shouldCache) {
cacheUnlessCached(res);
}
})
)
)
)
)
)
);
}
}
47 changes: 47 additions & 0 deletions src/services/rates/ThresholdAssetRateService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as LRU from 'lru-cache';
import { BigNumber } from "@waves/data-entities";
import { Task, of as taskOf, rejected } from 'folktale/concurrency/task';

import { AppError } from "../../errorHandling";
import { WavesId } from "../..";
import { PairsService } from "../pairs";

export interface IThresholdAssetRateService {
get(): Task<AppError, BigNumber>
};

export class ThresholdAssetRateService implements IThresholdAssetRateService {
private cache: LRU<string, BigNumber>;

constructor(private readonly thresholdAssetId: string, private readonly matcherAddress: string, private readonly pairsService: PairsService) {
this.cache = new LRU({ maxAge: 60000 });
}

get(): Task<AppError, BigNumber> {
let rate = this.cache.get(this.thresholdAssetId);
if (rate === undefined) {
// rate was not set or is stale
return this.pairsService.get({
pair: {
amountAsset: WavesId,
priceAsset: this.thresholdAssetId,
}, matcher: this.matcherAddress
}).chain(m => {
return m.matchWith({
Just: ({ value }) => {
if (value.data === null) {
return rejected(AppError.Resolver(`Rate for pair WAVES/${this.thresholdAssetId} not found`));
}
this.cache.set(this.thresholdAssetId, value.data.weightedAveragePrice);
return taskOf(value.data.weightedAveragePrice);
},
Nothing: () => {
return rejected(AppError.Resolver(`Pair WAVES/${this.thresholdAssetId} not found`));
}
})
});
} else {
return taskOf(rate);
}
}
}
2 changes: 1 addition & 1 deletion src/services/rates/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ export function generatePossibleRequestItems(
priceAsset: WavesId,
};

return [wavesL, flip(wavesL), wavesR, flip(wavesR)];
return [wavesL, flip(wavesL), wavesR, flip(wavesR), pair, flip(pair)];
}
Loading

0 comments on commit 5903c62

Please sign in to comment.