Skip to content

feat: update metadata rework #436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
24 changes: 24 additions & 0 deletions rfcs/inactive_users/user_and_organization_meta_update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# User/Organization metadata update
## Overview and Motivation
When user or organization metadata is updated, the Service should track audiences with assigned metadata.
For each assigned meta hash always exists a single `audience`, but there is no list of `audiences` assigned to the user or organization.

To achieve this ability, I advise these updates:

## Audience lists
Audiences stored in sets with names created from `USERS_AUDIENCE` or `ORGANISATION_AUDIENCE` constants and `Id`
(e.g.: `{ms-users}10110110111!audiences`). Both keys contain `audience` names that are currently have assigned values.

The `audience` list will be updated on each update of the metadata.

## Metadata Handling classes
Service logic is updated to use 2 specific classes that will perform all CRUD operations on User or Organization metadata.

* Classes located in: `utils/metadata/{user|organization}.js`.
* Both classes use same [Redis backend](#redis-metadata-backend-class).

## Redis Metadata Backend class
The class performs all work on metadata using Redis DB as a backend.

## Notice
* All User or Organization metadata operations should be performed using Provided classes otherwise, audiences won't be tracked.
22 changes: 7 additions & 15 deletions src/actions/activate.js
Original file line number Diff line number Diff line change
@@ -6,7 +6,8 @@ const jwt = require('../utils/jwt.js');
const { getInternalData } = require('../utils/userData');
const getMetadata = require('../utils/get-metadata');
const handlePipeline = require('../utils/pipeline-error');
const setMetadata = require('../utils/update-metadata');
const UserMetadata = require('../utils/metadata/user');

const {
USERS_INDEX,
USERS_DATA,
@@ -19,7 +20,7 @@ const {
USERS_USERNAME_FIELD,
USERS_ACTION_ACTIVATE,
USERS_ACTIVATED_FIELD,
} = require('../constants.js');
} = require('../constants');

// cache error
const Forbidden = new HttpStatusError(403, 'invalid token');
@@ -121,19 +122,6 @@ async function activateAccount(data, metadata) {
const userKey = redisKey(userId, USERS_DATA);
const { defaultAudience, service } = this;
const { redis } = service;

// if this goes through, but other async calls fail its ok to repeat that
// adds activation field
await setMetadata.call(service, {
userId,
audience: defaultAudience,
metadata: {
$set: {
[USERS_ACTIVATED_FIELD]: Date.now(),
},
},
});

// WARNING: `persist` is very important, otherwise we will lose user's information in 30 days
// set to active & persist
const pipeline = redis
@@ -143,6 +131,10 @@ async function activateAccount(data, metadata) {
.persist(userKey)
.sadd(USERS_INDEX, userId);

UserMetadata
.using(userId, defaultAudience, pipeline)
.update(USERS_ACTIVATED_FIELD, Date.now());

if (alias) {
pipeline.sadd(USERS_PUBLIC_INDEX, userId);
}
13 changes: 8 additions & 5 deletions src/actions/alias.js
Original file line number Diff line number Diff line change
@@ -7,9 +7,10 @@ const isBanned = require('../utils/is-banned');
const DetailedHttpStatusError = require('../utils/detailed-error');
const key = require('../utils/key');
const handlePipeline = require('../utils/pipeline-error');
const UserMetadata = require('../utils/metadata/user');

const {
USERS_DATA,
USERS_METADATA,
USERS_ALIAS_TO_ID,
USERS_ID_FIELD,
USERS_ALIAS_FIELD,
@@ -71,10 +72,12 @@ async function assignAlias({ params }) {
return Promise.reject(err);
}

const pipeline = redis.pipeline([
['hset', key(userId, USERS_DATA), USERS_ALIAS_FIELD, alias],
['hset', key(userId, USERS_METADATA, defaultAudience), USERS_ALIAS_FIELD, JSON.stringify(alias)],
]);
const pipeline = redis.pipeline();

pipeline.hset(key(userId, USERS_DATA), USERS_ALIAS_FIELD, alias);
UserMetadata
.using(userId, defaultAudience, pipeline)
.update(USERS_ALIAS_FIELD, JSON.stringify(alias));

if (activeUser) {
pipeline.sadd(USERS_PUBLIC_INDEX, username);
37 changes: 22 additions & 15 deletions src/actions/ban.js
Original file line number Diff line number Diff line change
@@ -4,9 +4,10 @@ const mapValues = require('lodash/mapValues');
const redisKey = require('../utils/key.js');
const { getInternalData } = require('../utils/userData');
const handlePipeline = require('../utils/pipeline-error');
const UserMetadata = require('../utils/metadata/user');

const {
USERS_DATA, USERS_METADATA,
USERS_BANNED_FLAG, USERS_TOKENS, USERS_BANNED_DATA,
USERS_DATA, USERS_BANNED_FLAG, USERS_TOKENS, USERS_BANNED_DATA,
} = require('../constants.js');

// helper
@@ -25,26 +26,32 @@ function lockUser({
remoteip: remoteip || '',
},
};
const pipeline = redis.pipeline();

pipeline.hset(redisKey(id, USERS_DATA), USERS_BANNED_FLAG, 'true');
// set .banned on metadata for filtering & sorting users by that field
UserMetadata
.using(id, defaultAudience, pipeline)
.updateMulti(mapValues(data, stringify));
pipeline.del(redisKey(id, USERS_TOKENS));

return redis
.pipeline()
.hset(redisKey(id, USERS_DATA), USERS_BANNED_FLAG, 'true')
// set .banned on metadata for filtering & sorting users by that field
.hmset(redisKey(id, USERS_METADATA, defaultAudience), mapValues(data, stringify))
.del(redisKey(id, USERS_TOKENS))
.exec();
return pipeline.exec();
}

function unlockUser({ id }) {
const { redis, config } = this;
const { jwt: { defaultAudience } } = config;
const pipeline = redis.pipeline();

return redis
.pipeline()
.hdel(redisKey(id, USERS_DATA), USERS_BANNED_FLAG)
// remove .banned on metadata for filtering & sorting users by that field
.hdel(redisKey(id, USERS_METADATA, defaultAudience), 'banned', USERS_BANNED_DATA)
.exec();
pipeline.hdel(redisKey(id, USERS_DATA), USERS_BANNED_FLAG);
// remove .banned on metadata for filtering & sorting users by that field
UserMetadata
.using(id, defaultAudience, pipeline)
.delete([
'banned',
USERS_BANNED_DATA,
]);
return pipeline.exec();
}

/**
5 changes: 5 additions & 0 deletions src/actions/organization/delete.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ const snakeCase = require('lodash/snakeCase');
const redisKey = require('../../utils/key');
const handlePipeline = require('../../utils/pipeline-error');
const { checkOrganizationExists, getInternalData } = require('../../utils/organization');
const OrganizationMetadata = require('../../utils/metadata/organization');
const {
ORGANIZATIONS_DATA,
ORGANIZATIONS_METADATA,
@@ -32,11 +33,15 @@ async function deleteOrganization({ params }) {
const organizationMembersListKey = redisKey(organizationId, ORGANIZATIONS_MEMBERS);
const organizationMembersIds = await redis.zrange(organizationMembersListKey, 0, -1);
const organization = await getInternalData.call(this, organizationId);
const organizationMetadata = new OrganizationMetadata(redis);

const pipeline = redis.pipeline();

pipeline.del(redisKey(organizationId, ORGANIZATIONS_DATA));
pipeline.del(redisKey(organizationId, ORGANIZATIONS_METADATA, audience));
// delete organization audiences index
pipeline.del(organizationMetadata.audience.getAudienceKey(organizationId));

pipeline.srem(ORGANIZATIONS_INDEX, organizationId);
if (organizationMembersIds) {
organizationMembersIds.forEach((memberId) => {
6 changes: 5 additions & 1 deletion src/actions/organization/members/permission.js
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ const { checkOrganizationExists } = require('../../../utils/organization');
const redisKey = require('../../../utils/key');
const handlePipeline = require('../../../utils/pipeline-error');
const getUserId = require('../../../utils/userData/get-user-id');
const UserMetadata = require('../../../utils/metadata/user');
const { ErrorUserNotMember, USERS_METADATA, ORGANIZATIONS_MEMBERS } = require('../../../constants');

/**
@@ -41,7 +42,10 @@ async function setOrganizationMemberPermission({ params }) {
permissions = JSON.stringify(permissions);

const pipeline = redis.pipeline();
pipeline.hset(memberMetadataKey, organizationId, permissions);

UserMetadata
.using(userId, audience, pipeline)
.update(organizationId, permissions);
pipeline.hset(redisKey(organizationId, ORGANIZATIONS_MEMBERS, userId), 'permissions', permissions);

return pipeline.exec().then(handlePipeline);
5 changes: 4 additions & 1 deletion src/actions/organization/members/remove.js
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ const redisKey = require('../../../utils/key');
const getUserId = require('../../../utils/userData/get-user-id');
const handlePipeline = require('../../../utils/pipeline-error');
const { checkOrganizationExists } = require('../../../utils/organization');
const UserMetadata = require('../../../utils/metadata/user');
const {
ORGANIZATIONS_MEMBERS,
USERS_METADATA,
@@ -36,7 +37,9 @@ async function removeMember({ params }) {
const pipeline = redis.pipeline();
pipeline.del(memberKey);
pipeline.zrem(redisKey(organizationId, ORGANIZATIONS_MEMBERS), memberKey);
pipeline.hdel(memberMetadataKey, organizationId);
UserMetadata
.using(userId, audience, pipeline)
.delete(organizationId);

return pipeline.exec().then(handlePipeline);
}
16 changes: 8 additions & 8 deletions src/actions/register.js
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ const reduce = require('lodash/reduce');
const last = require('lodash/last');

// internal deps
const setMetadata = require('../utils/update-metadata');
const UserMetadata = require('../utils/metadata/user');
const redisKey = require('../utils/key');
const jwt = require('../utils/jwt');
const isDisposable = require('../utils/is-disposable');
@@ -231,13 +231,13 @@ async function performRegistration({ service, params }) {
commonMeta[USERS_ACTIVATED_FIELD] = Date.now();
}

await setMetadata.call(service, {
userId,
audience,
metadata: audience.map((metaAudience) => ({
$set: Object.assign(metadata[metaAudience] || {}, metaAudience === defaultAudience && commonMeta),
})),
});
await UserMetadata
.using(userId, audience, service.redis)
.batchUpdate({
metadata: audience.map((metaAudience) => ({
$set: Object.assign(metadata[metaAudience] || {}, metaAudience === defaultAudience && commonMeta),
})),
});

// assign alias
if (alias) {
7 changes: 6 additions & 1 deletion src/actions/remove.js
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ const key = require('../utils/key');
const { getInternalData } = require('../utils/userData');
const getMetadata = require('../utils/get-metadata');
const handlePipeline = require('../utils/pipeline-error');
const UserMetadata = require('../utils/metadata/user');
const {
USERS_INDEX,
USERS_PUBLIC_INDEX,
@@ -92,6 +93,8 @@ async function removeUser({ params }) {
const alias = internal[USERS_ALIAS_FIELD];
const userId = internal[USERS_ID_FIELD];
const resolvedUsername = internal[USERS_USERNAME_FIELD];
const metaAudiences = await UserMetadata.using(userId, null, redis).getAudience();
const userMetadata = UserMetadata.using(userId, null, transaction);

if (alias) {
transaction.hdel(USERS_ALIAS_TO_ID, alias.toLowerCase(), alias);
@@ -114,7 +117,9 @@ async function removeUser({ params }) {

// remove metadata & internal data
transaction.del(key(userId, USERS_DATA));
transaction.del(key(userId, USERS_METADATA, audience));
for (const metaAudience of metaAudiences) {
userMetadata.deleteMetadata(metaAudience);
}

// remove auth tokens
transaction.del(key(userId, USERS_TOKENS));
16 changes: 9 additions & 7 deletions src/actions/updateMetadata.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const omit = require('lodash/omit');
const Promise = require('bluebird');
const updateMetadata = require('../utils/update-metadata');
const UserMetadata = require('../utils/metadata/user');
const { getUserId } = require('../utils/userData');

/**
@@ -19,12 +18,15 @@ const { getUserId } = require('../utils/userData');
* @apiParam (Payload) {Object} [script] - if present will be called with passed metadata keys & username, provides direct scripting access.
* Be careful with granting access to this function.
*/
module.exports = function updateMetadataAction(request) {
return Promise
module.exports = async function updateMetadataAction(request) {
const { username: _, audience, ...updateParams } = request.params;
const userId = await Promise
.bind(this, request.params.username)
.then(getUserId)
.then((userId) => ({ ...omit(request.params, 'username'), userId }))
.then(updateMetadata);
.then(getUserId);

return UserMetadata
.using(userId, audience, this.redis)
.batchUpdate(updateParams);
};

module.exports.transports = [require('@microfleet/core').ActionTransport.amqp];
29 changes: 15 additions & 14 deletions src/auth/oauth/utils/attach.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
const get = require('lodash/get');
const redisKey = require('../../../utils/key');
const updateMetadata = require('../../../utils/update-metadata');
const UserMetadata = require('../../../utils/metadata/user');
const handlePipeline = require('../../../utils/pipeline-error');
const {
USERS_SSO_TO_ID,
USERS_DATA,
} = require('../../../constants');

module.exports = function attach(account, user) {
module.exports = async function attach(account, user) {
const { redis, config } = this;
const { id: userId } = user;
const {
@@ -23,17 +23,18 @@ module.exports = function attach(account, user) {
// link uid to user id
pipeline.hset(USERS_SSO_TO_ID, uid, userId);

return pipeline.exec().then(handlePipeline)
.bind(this)
.return({
userId,
audience,
metadata: {
$set: {
[provider]: profile,
},
handlePipeline(await pipeline.exec());

const updateParams = {
metadata: {
$set: {
[provider]: profile,
},
})
.then(updateMetadata)
.return(profile);
},
};
await UserMetadata
.using(userId, audience, redis)
.batchUpdate(updateParams);

return profile;
};
12 changes: 7 additions & 5 deletions src/auth/oauth/utils/detach.js
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ const Errors = require('common-errors');

const get = require('../../../utils/get-value');
const redisKey = require('../../../utils/key');
const updateMetadata = require('../../../utils/update-metadata');
const UserMetadata = require('../../../utils/metadata/user');
const handlePipeline = require('../../../utils/pipeline-error');

const {
@@ -30,13 +30,15 @@ module.exports = async function detach(provider, userData) {

handlePipeline(await pipeline.exec());

return updateMetadata.call(this, {
userId,
audience,
const updateParams = {
metadata: {
$remove: [
provider,
],
},
});
};

return UserMetadata
.using(userId, audience, redis)
.batchUpdate(updateParams);
};
2 changes: 2 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ module.exports = exports = {
USERS_DATA: 'data',
USERS_METADATA: 'metadata',
USERS_CONTACTS: 'contacts',
USERS_AUDIENCE: 'users-audiences',
USERS_TOKENS: 'tokens',
USERS_API_TOKENS: 'api-tokens',
USERS_API_TOKENS_ZSET: 'api-tokens-set',
@@ -30,6 +31,7 @@ module.exports = exports = {
USERS_ORGANIZATIONS: 'user-organizations',
ORGANIZATIONS_DATA: 'data',
ORGANIZATIONS_METADATA: 'metadata',
ORGANIZATIONS_AUDIENCE: 'organization-audiences',
ORGANIZATIONS_MEMBERS: 'members',

// standard JWT with TTL
10 changes: 5 additions & 5 deletions src/custom/cappasity-users-activate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const find = require('lodash/find');
const moment = require('moment');
const setMetadata = require('../utils/update-metadata');
const UserMetadata = require('../utils/metadata/user');

/**
* Adds metadata from billing into usermix
@@ -20,9 +20,7 @@ module.exports = function mixPlan(userId, params) {
.then(function mix(plan) {
const subscription = find(plan.subs, ['name', 'month']);
const nextCycle = moment().add(1, 'month').valueOf();
const update = {
userId,
audience,
const updateParams = {
metadata: {
$set: {
plan: id,
@@ -36,6 +34,8 @@ module.exports = function mixPlan(userId, params) {
},
};

return setMetadata.call(this, update);
return UserMetadata
.using(userId, audience, this.redis)
.batchUpdate(updateParams);
});
};
8 changes: 5 additions & 3 deletions src/custom/rfx-create-room-on-activate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const is = require('is');
const Promise = require('bluebird');
const setMetadata = require('../utils/update-metadata');
const UserMetadata = require('../utils/metadata/user');

/**
* @param {String} username
@@ -25,7 +25,7 @@ function createRoom(userId, params, metadata) {
return amqp.publishAndWait(route, roomParams, { timeout: 5000 })
.bind(this)
.then((room) => {
const update = {
const updateParams = {
userId,
audience,
metadata: {
@@ -35,7 +35,9 @@ function createRoom(userId, params, metadata) {
},
};

return setMetadata.call(this, update);
return UserMetadata
.using(userId, audience, this.redis)
.batchUpdate(updateParams);
});
}

19 changes: 1 addition & 18 deletions src/migrations/05-referrals-users-ids/index.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,7 @@
const Promise = require('bluebird');
const calcSlot = require('cluster-key-slot');
const { USERS_REFERRAL_INDEX } = require('../../constants.js');
const { getUserId } = require('../../utils/userData');

/**
* Return master node in case of redisCluster to be able to use
* specific commands like `keys`. We can use usual redis instance in
* other cases.
*/
function getRedisMasterNode(redis, config) {
if (!config.plugins.includes('redisCluster')) {
return redis;
}
const { keyPrefix } = config.redis.options;
const slot = calcSlot(keyPrefix);
const nodeKeys = redis.slots[slot];
const masters = redis.connectionPool.nodes.master;

return nodeKeys.reduce((node, key) => node || masters[key], null);
}
const getRedisMasterNode = require('../utils/get-redis-master-node');

/**
*
41 changes: 41 additions & 0 deletions src/migrations/09-audience/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const fs = require('fs');
const {
USERS_INDEX,
ORGANIZATIONS_INDEX,
USERS_METADATA,
ORGANIZATIONS_METADATA,
USERS_AUDIENCE,
ORGANIZATIONS_AUDIENCE,
} = require('../../constants');
const getRedisMasterNode = require('../utils/get-redis-master-node');

const SCRIPT = fs.readFileSync(`${__dirname}/migrate.lua`, 'utf8');
const USERS_KEYS = [
USERS_INDEX,
`id!${USERS_METADATA}!`,
`id!${USERS_AUDIENCE}`,
];
const USERS_ARGS = [
`.*id!${USERS_METADATA}!`,
];
const ORGANIZATIONS_KEYS = [
ORGANIZATIONS_INDEX,
`id!${ORGANIZATIONS_METADATA}!`,
`id!${ORGANIZATIONS_AUDIENCE}`,
];
const ORGANIZATIONS_ARGS = [
`.*id!${USERS_METADATA}!`,
];

const script = async ({ config, redis }) => {
const masterNode = getRedisMasterNode(redis, config);

await masterNode.eval(SCRIPT, USERS_KEYS.length, USERS_KEYS, USERS_ARGS);
await masterNode.eval(SCRIPT, ORGANIZATIONS_KEYS.length, ORGANIZATIONS_KEYS, ORGANIZATIONS_ARGS);
};

module.exports = {
script,
min: 8,
final: 9,
};
17 changes: 17 additions & 0 deletions src/migrations/09-audience/migrate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
local index = KEYS[1]
local metaData = KEYS[2]
local audience = KEYS[3]

local patternTemplate = ARGV[1]

local ids = redis.call('smembers', index)
for _, id in ipairs(ids) do
local metaDataKey = metaData:gsub('(id)', id, 1)
local audienceKey = audience:gsub('(id)', id, 1)
local pattern = patternTemplate:gsub('(id)', id, 1)
local keys = redis.call('keys', '*' .. metaDataKey .. '*')
for _, key in ipairs(keys) do
local newAudience = key:gsub(pattern, '', 1)
redis.call('sadd', audienceKey, newAudience)
end
end
19 changes: 19 additions & 0 deletions src/migrations/utils/get-redis-master-node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const calcSlot = require('cluster-key-slot');

/**
* Return master node in case of redisCluster to be able to use
* specific commands like `keys`. We can use usual redis instance in
* other cases.
*/

module.exports = function getRedisMasterNode(redis, config) {
if (!config.plugins.includes('redisCluster')) {
return redis;
}
const { keyPrefix } = config.redis.options;
const slot = calcSlot(keyPrefix);
const nodeKeys = redis.slots[slot];
const masters = redis.connectionPool.nodes.master;

return nodeKeys.reduce((node, key) => node || masters[key], null);
};
3 changes: 3 additions & 0 deletions src/utils/asserts/id.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module.exports = function isId(value) {
return Number.isInteger(value) || (typeof value === 'string' && value.length > 0);
};
14 changes: 14 additions & 0 deletions src/utils/asserts/redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const Redis = require('ioredis');

function isRedisPipeline(value) {
return value instanceof Redis.Pipeline;
}

function isRedis(value) {
return value instanceof Redis || value instanceof Redis.Cluster || value instanceof Redis.Pipeline;
}

module.exports = {
isRedisPipeline,
isRedis,
};
5 changes: 5 additions & 0 deletions src/utils/asserts/string-or-array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const { strictEqual } = require('assert');

module.exports = function notEmptyStringOrArray(value, error) {
strictEqual((typeof value === 'string' && value.length !== 0) || Array.isArray(value), true, error);
};
41 changes: 41 additions & 0 deletions src/utils/metadata/organization.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const Audience = require('./redis/audience');
const Metadata = require('./redis/metadata');
const { ORGANIZATIONS_METADATA, ORGANIZATIONS_AUDIENCE } = require('../../constants');

/**
* Class handling Organization Metadata operations
*/
class OrganizationMetadata {
/**
* @param {ioredis|Pipeline} redis
*/
constructor(redis) {
this.redis = redis;
this.metadata = new Metadata(this.redis, ORGANIZATIONS_METADATA);
this.audience = new Audience(this.redis, ORGANIZATIONS_AUDIENCE);
}

async syncAudience(organizationId) {
const metaKeyTemplate = this.metadata.getMetadataKey('{{ID}}', '{{AUDIENCE}}');
return this.audience.resyncSet(organizationId, metaKeyTemplate);
}

/**
* Updates metadata on a organization object
* @param {Object} opts
* @return {Promise}
*/
async batchUpdate(opts) {
const { organizationId, ...restOpts } = opts;
await this.audience.add(organizationId, restOpts.audience);
const updateResult = await this.metadata
.batchUpdate({
id: organizationId,
...opts,
});
await this.syncAudience(organizationId);
return updateResult;
}
}

module.exports = OrganizationMetadata;
93 changes: 93 additions & 0 deletions src/utils/metadata/redis/audience.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
const assert = require('assert');
const { isRedis } = require('../../asserts/redis');
const isNotEmptyString = require('../../asserts/string-not-empty');
const notEmptyStringOrArray = require('../../asserts/string-or-array');
const isValidId = require('../../asserts/id');

/**
* Class handling Audience tracking using Redis backend
*/
class Audience {
/**
* @param {ioredis|Pipeline} redis
* @param {string} audienceKeyBase
*/
constructor(redis, audienceKeyBase) {
assert(isRedis(redis), 'must be ioredis instance');
isNotEmptyString(audienceKeyBase, 'must be not empty string');
this.redis = redis;
this.audienceKeyBase = audienceKeyBase;
}

/**
* Generates Redis key
* Template `{id}!{metadataKeyBase}`
* @param {String|Number} id
* @returns {string}
*/
getAudienceKey(id) {
assert(isValidId(id), 'must be valid Id');
return `${id}!${this.audienceKeyBase}`;
}

/**
* Adds audience
* @param {String|Number} id
* @param {String|Array} audience
* @param {ioredis|Pipeline} [redis]
* @returns {Promise|Pipeline}
*/
add(id, audience, redis = this.redis) {
assert(isRedis(redis), 'must be ioredis instance');
notEmptyStringOrArray(audience, 'must be not empty string or Array');
return redis.sadd(this.getAudienceKey(id), audience);
}

/**
* Deletes audience
* @param {String|Number} id
* @param {String} audience
* @param {ioredis|Pipeline} [redis]
* @returns {Promise|Pipeline}
*/
delete(id, audience, redis = this.redis) {
assert(isRedis(redis), 'must be ioredis instance');
notEmptyStringOrArray(audience, 'must be not empty string or Array');
return redis.srem(this.getAudienceKey(id), audience);
}

/**
* Get list of assigned audiences
* @param {String|Number} id
* @param {ioredis|Pipeline}redis
* @returns {Promise|Pipeline}
*/
get(id, redis = this.redis) {
return redis.smembers(this.getAudienceKey(id));
}

/**
* Synchronizes audience list with currently available metadata
* @param id
* @param metadataKeyTemplate - format '{{ID}}!yourMetadataClass!{{AUDIENCE}}'
* @param redis
* @returns {*}
*/
resyncSet(id, metadataKeyTemplate, redis = this.redis) {
assert(isRedis(this.redis), 'must be ioredis instance');
isNotEmptyString(metadataKeyTemplate, 'must be not empty string');
const luaScript = `
local audiences = redis.call("SMEMBERS", KEYS[1])
for _, audience in pairs(audiences) do
local metaKey = string.gsub(KEYS[2], '{{AUDIENCE}}', audience)
local keyLen = redis.call("HLEN", metaKey)
if (keyLen < 1) then
redis.call('SREM', KEYS[1], audience)
end
end
`;
return redis.eval(luaScript, 2, this.getAudienceKey(id), metadataKeyTemplate);
}
}

module.exports = Audience;
235 changes: 235 additions & 0 deletions src/utils/metadata/redis/metadata.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
const Promise = require('bluebird');
const mapValues = require('lodash/mapValues');
const assert = require('assert');
const { HttpStatusError } = require('common-errors');
const handlePipeline = require('../../pipeline-error');
const sha256 = require('../../sha256');
const { isRedis, isRedisPipeline } = require('../../asserts/redis');
const isNotEmptyString = require('../../asserts/string-not-empty');
const notEmptyStringOrArray = require('../../asserts/string-or-array');
const isValidId = require('../../asserts/id');

const JSONStringify = (data) => JSON.stringify(data);

/**
* Class handling metadata operations using Redis backend
*/
class Metadata {
/**
* @param {ioredis} redis or pipeline instance
* @param metadataKeyBase template base for Metadata key
*/
constructor(redis, metadataKeyBase) {
assert(isRedis(redis), 'must be ioredis instance');
isNotEmptyString(metadataKeyBase, 'must be not empty string');
this.redis = redis;
this.metadataKeyBase = metadataKeyBase;
}

/**
* Generates Redis key
* Template `{id}!{metadataKeyBase}!{audience}`
* @param {String|integer} id
* @param {String} audience
* @returns {String}
*/
getMetadataKey(id, audience) {
assert(isValidId(id), 'must be valid Id');
isNotEmptyString(audience, 'must be not empty string');
return `${id}!${this.metadataKeyBase}!${audience}`;
}

/**
* Updates metadata hash key
* @param {String} id
* @param {String} audience
* @param {String} key - Hash key
* @param {*} value
* @param {ioredis} [redis]
* @returns {Promise|Pipeline}
*/
update(id, audience, key, value, redis = this.redis) {
isNotEmptyString(key, 'must be not empty string');
assert(isRedis(redis), 'must be ioredis instance');
assert(value, 'must not be empty');

return redis.hset(this.getMetadataKey(id, audience), key, value);
}

/**
* Updates metadata hash keys
* @param {String} id
* @param {String} audience
* @param {Object} values - Object with keys and values
* @param {ioredis} [redis]
* @returns {Promise|Pipeline}
*/
updateMulti(id, audience, values, redis = this.redis) {
assert(values !== null && typeof values === 'object', 'must be an object');
return redis.hmset(this.getMetadataKey(id, audience), values);
}

/**
* Deletes metadata hash keys
* @param {String} id
* @param {String} audience
* @param {String} key - Hash key
* @param {ioredis} [redis]
* @returns {Promise|Pipeline}
*/
delete(id, audience, key, redis = this.redis) {
notEmptyStringOrArray(key, 'must be not empty string or Array');
assert(isRedis(redis), 'must be ioredis instance');
return redis.hdel(this.getMetadataKey(id, audience), key);
}

/**
* Deletes metadata key
* @param id
* @param audience
* @param redis
* @returns {void|request.Request}
*/
deleteKey(id, audience, redis = this.redis) {
assert(isValidId(id), 'must be valid id');
assert(isRedis(redis), 'must be ioredis instance');
return redis.del(this.getMetadataKey(id, audience));
}

/**
* Updates metadata hash on provided Id
* @param {Object} opts
* @return {*}
*/
async batchUpdate(opts) {
const { redis } = this;
const {
id, metadata, audience, script,
} = opts;

// we use own pipeline or Promise here
assert(!isRedisPipeline(redis), 'impossible to use with pipeline');
assert(isValidId(id), 'must be valid id');

const audiences = Array.isArray(audience) ? audience : [audience];
const keys = audiences.map((aud) => this.getMetadataKey(id, aud));

// if we have meta, then we can
if (metadata) {
const pipe = redis.pipeline();
const metaOps = Array.isArray(metadata) ? metadata : [metadata];

if (metaOps.length !== audiences.length) {
throw new HttpStatusError(400, 'audiences must match metadata entries');
}

const operations = metaOps.map((meta, idx) => Metadata.handleAudience(pipe, keys[idx], meta));
const result = handlePipeline(await pipe.exec());

return Metadata.mapMetaResponse(operations, result);
}

// dynamic scripts
const $scriptKeys = Object.keys(script);
const scripts = $scriptKeys.map((scriptName) => {
const { lua, argv = [] } = script[scriptName];
const sha = sha256(lua);
const name = `ms_users_${sha}`;
if (typeof redis[name] !== 'function') {
redis.defineCommand(name, { lua });
}
return redis[name](keys.length, keys, argv);
});

const result = await Promise.all(scripts);

return Metadata.mapScriptResponse($scriptKeys, result);
}

/**
* Process metadata update operation for a passed audience
* @param {Object} pipeline
* @param {String} key
* @param {Object} metadata
*/
static handleAudience(pipeline, key, metadata) {
const { $remove } = metadata;
const $removeOps = $remove ? $remove.length : 0;
if ($removeOps > 0) {
pipeline.hdel(key, $remove);
}

const { $set } = metadata;
const $setKeys = $set && Object.keys($set);
const $setLength = $setKeys ? $setKeys.length : 0;
if ($setLength > 0) {
pipeline.hmset(key, mapValues($set, JSONStringify));
}

const { $incr } = metadata;
const $incrFields = $incr && Object.keys($incr);
const $incrLength = $incrFields ? $incrFields.length : 0;
if ($incrLength > 0) {
$incrFields.forEach((fieldName) => {
pipeline.hincrby(key, fieldName, $incr[fieldName]);
});
}

return {
$removeOps, $setLength, $incrLength, $incrFields,
};
}

/**
* Maps updateMetadata ops
* @param {Array} responses
* @param {Array} operations
* @return {Object|Array}
*/
static mapMetaResponse(operations, responses) {
let cursor = 0;
return Promise
.map(operations, (props) => {
const {
$removeOps, $setLength, $incrLength, $incrFields,
} = props;
const output = {};

if ($removeOps > 0) {
output.$remove = responses[cursor];
cursor += 1;
}

if ($setLength > 0) {
output.$set = responses[cursor];
cursor += 1;
}

if ($incrLength > 0) {
const $incrResponse = output.$incr = {};
$incrFields.forEach((fieldName) => {
$incrResponse[fieldName] = responses[cursor];
cursor += 1;
});
}

return output;
})
.then((ops) => (ops.length > 1 ? ops : ops[0]));
}

/**
* Handle script, mutually exclusive with metadata
* @param {Array} scriptKeys
* @param {Array} responses
*/
static mapScriptResponse(scriptKeys, responses) {
const output = {};
scriptKeys.forEach((fieldName, idx) => {
output[fieldName] = responses[idx];
});
return output;
}
}

module.exports = Metadata;
123 changes: 123 additions & 0 deletions src/utils/metadata/user.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
const Promise = require('bluebird');
const { Pipeline } = require('ioredis');
const Metadata = require('./redis/metadata');
const Audience = require('./redis/audience');

const { USERS_METADATA, USERS_AUDIENCE } = require('../../constants');

/**
* Class handles User metadata operations
*/
class UserMetadata {
/**
* @param {ioredis|Pipeline} redis
* @param {String|Number} userId
* @param {Staing|Array} audience
*/
constructor(redis, userId, audience) {
this.pipeline = redis instanceof Pipeline;
this.redis = redis;
this.userAudience = audience;
this.userId = userId;
this.metadata = new Metadata(this.redis, USERS_METADATA);
this.audience = new Audience(this.redis, USERS_AUDIENCE);
}

/**
* Updates metadata field on a user object
* @param {Object} values
* @param {String} [audience]
* @returns {Promise|void}
*/
update(hashKey, value, audience = this.userAudience) {
const work = [
this.audience.add(this.userId, audience),
this.metadata.update(this.userId, audience, hashKey, value),
];
return this.pipeline ? work : Promise.all(work);
}

/**
* Updates metadata on a user object using fields and values from provided Object
* @param {Object} values
* @param {String} [audience]
* @returns {Promise|void}
*/
updateMulti(values, audience = this.userAudience) {
const work = [
this.audience.add(this.userId, audience),
this.metadata.updateMulti(this.userId, audience, values),
];
return this.pipeline ? work : Promise.all(work);
}

/**
* Deletes key from user metadata object
* @param {String|Number} id
* @param {String} hashKey
* @param {String} [audience]
* @returns {Promise|void}
*/
delete(hashKey, audience = this.userAudience) {
return this.pipeline ? this.deletePipeline(hashKey, audience) : this.deleteAsync(hashKey, audience);
}

deletePipeline(hashKey, audience) {
this.metadata.delete(this.userId, audience, hashKey);
return this.syncAudience();
}

async deleteAsync(hashKey, audience) {
const result = await this.metadata.delete(this.userId, audience, hashKey);
await this.syncAudience();
return result;
}

/**
* Deletes user metadata for passed audience
* @param {String} Audience
*/
deleteMetadata(audience) {
const work = [
this.audience.delete(this.userId, audience),
this.metadata.deleteKey(this.userId, audience),
];
return this.pipeline ? work : Promise.all(work);
}

/**
* Gets all audiences assigned
* @returns {*}
*/
getAudience() {
return this.audience.get(this.userId);
}

async syncAudience() {
const metaKeyTemplate = this.metadata.getMetadataKey(this.userId, '{{AUDIENCE}}');
return this.audience.resyncSet(this.userId, metaKeyTemplate);
}

/**
* Updates metadata on a user object using batch operations
* @param {Object} opts
* @return {Promise}
*/
async batchUpdate(opts) {
await this.audience.add(this.userId, this.userAudience);
const updateResult = await this.metadata
.batchUpdate({
id: this.userId,
audience: this.userAudience,
...opts,
});
await this.syncAudience();
return updateResult;
}

static using(userId, audience, redis) {
return new UserMetadata(redis, userId, audience);
}
}

module.exports = UserMetadata;
9 changes: 6 additions & 3 deletions src/utils/organization/add-organization-members.js
Original file line number Diff line number Diff line change
@@ -7,9 +7,10 @@ const sendInviteMail = require('./send-invite-email');
const getInternalData = require('./get-internal-data');
const registerOrganizationMembers = require('./register-organization-members');
const handlePipeline = require('../pipeline-error');
const UserMetadata = require('../metadata/user');

const {
ORGANIZATIONS_MEMBERS,
USERS_METADATA,
ORGANIZATIONS_NAME_FIELD,
ORGANIZATIONS_ID_FIELD,
USERS_ACTION_ORGANIZATION_REGISTER,
@@ -32,7 +33,6 @@ async function addMember({ password, ...member }) {
const { organizationId, audience, pipe, membersKey } = this;

const memberKey = redisKey(organizationId, ORGANIZATIONS_MEMBERS, member.id);
const memberOrganizations = redisKey(member.id, USERS_METADATA, audience);

member.username = member.email;
member.invited = Date.now();
@@ -42,7 +42,9 @@ async function addMember({ password, ...member }) {
const stringifyMember = mapValues(member, JSONStringify);

pipe.hmset(memberKey, stringifyMember);
pipe.hset(memberOrganizations, organizationId, stringifyMember.permissions);
UserMetadata
.using(member.id, audience, pipe)
.update(organizationId, stringifyMember.permissions);
pipe.zadd(membersKey, stringifyMember.invited, memberKey);
}

@@ -74,6 +76,7 @@ async function sendInvite(member) {
/**
* Updates metadata on a organization object
* @param {Object} opts
* @param {Boolean} sendInviteFlag
* @return {Promise}
*/
async function addOrganizationMembers({ organizationId, members, audience }, sendInviteFlag = false) {
29 changes: 15 additions & 14 deletions src/utils/organization/register-organization-members.js
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ const {
USERS_ID_FIELD,
} = require('../../constants.js');
const scrypt = require('../scrypt');
const setMetadata = require('../update-metadata');
const UserMetadata = require('../metadata/user');

async function registerOrganizationMember(member) {
const { redis, config } = this;
@@ -36,20 +36,21 @@ async function registerOrganizationMember(member) {
const userDataKey = redisKey(userId, USERS_DATA);
pipeline.hmset(userDataKey, basicInfo);
pipeline.hset(USERS_USERNAME_TO_ID, email, userId);
await pipeline.exec().then(handlePipeline);
handlePipeline(await pipeline.exec());

await UserMetadata
.using(userId, audience, redis)
.batchUpdate({
metadata: [{
$set: {
[USERS_ID_FIELD]: userId,
[USERS_USERNAME_FIELD]: email,
[USERS_CREATED_FIELD]: basicInfo[USERS_CREATED_FIELD],
[USERS_ACTIVATED_FIELD]: createdAt,
},
}],
});

await setMetadata.call(this, {
userId,
audience,
metadata: [{
$set: {
[USERS_ID_FIELD]: userId,
[USERS_USERNAME_FIELD]: email,
[USERS_CREATED_FIELD]: basicInfo[USERS_CREATED_FIELD],
[USERS_ACTIVATED_FIELD]: createdAt,
},
}],
});
// perform instant activation
// internal username index
const regPipeline = redis.pipeline().sadd(USERS_INDEX, userId);
32 changes: 2 additions & 30 deletions src/utils/set-organization-metadata.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,13 @@
/* eslint-disable no-mixed-operators */
const Promise = require('bluebird');
const is = require('is');
const { HttpStatusError } = require('common-errors');
const redisKey = require('./key');
const handlePipeline = require('./pipeline-error');
const { handleAudience } = require('./update-metadata');
const { ORGANIZATIONS_METADATA } = require('../constants');
const OrganizationMetadata = require('./metadata/organization');

/**
* Updates metadata on a organization object
* @param {Object} opts
* @return {Promise}
*/
async function setOrganizationMetadata(opts) {
const { redis } = this;
const {
organizationId, audience, metadata,
} = opts;
const audiences = is.array(audience) ? audience : [audience];

// keys
const keys = audiences.map((aud) => redisKey(organizationId, ORGANIZATIONS_METADATA, aud));

// if we have meta, then we can
if (metadata) {
const pipe = redis.pipeline();
const metaOps = is.array(metadata) ? metadata : [metadata];

if (metaOps.length !== audiences.length) {
return Promise.reject(new HttpStatusError(400, 'audiences must match metadata entries'));
}

metaOps.forEach((meta, idx) => handleAudience(pipe, keys[idx], meta));
return pipe.exec().then(handlePipeline);
}

return true;
return new OrganizationMetadata(this.redis).batchUpdate(opts);
}

module.exports = setOrganizationMetadata;
144 changes: 0 additions & 144 deletions src/utils/update-metadata.js
Original file line number Diff line number Diff line change
@@ -1,144 +0,0 @@
/* eslint-disable no-mixed-operators */
const Promise = require('bluebird');
const mapValues = require('lodash/mapValues');
const is = require('is');
const { HttpStatusError } = require('common-errors');
const redisKey = require('./key');
const sha256 = require('./sha256');
const handlePipeline = require('./pipeline-error');
const { USERS_METADATA } = require('../constants');

const JSONStringify = (data) => JSON.stringify(data);

/**
* Process metadata update operation for a passed audience
* @param {Object} pipeline
* @param {String} audience
* @param {Object} metadata
*/
function handleAudience(pipeline, key, metadata) {
const { $remove } = metadata;
const $removeOps = $remove && $remove.length || 0;
if ($removeOps > 0) {
pipeline.hdel(key, $remove);
}

const { $set } = metadata;
const $setKeys = $set && Object.keys($set);
const $setLength = $setKeys && $setKeys.length || 0;
if ($setLength > 0) {
pipeline.hmset(key, mapValues($set, JSONStringify));
}

const { $incr } = metadata;
const $incrFields = $incr && Object.keys($incr);
const $incrLength = $incrFields && $incrFields.length || 0;
if ($incrLength > 0) {
$incrFields.forEach((fieldName) => {
pipeline.hincrby(key, fieldName, $incr[fieldName]);
});
}

return {
$removeOps, $setLength, $incrLength, $incrFields,
};
}

/**
* Maps updateMetadata ops
* @param {Array} responses
* @param {Array} operations
* @return {Object|Array}
*/
function mapMetaResponse(operations, responses) {
let cursor = 0;
return Promise
.map(operations, (props) => {
const {
$removeOps, $setLength, $incrLength, $incrFields,
} = props;
const output = {};

if ($removeOps > 0) {
output.$remove = responses[cursor];
cursor += 1;
}

if ($setLength > 0) {
output.$set = responses[cursor];
cursor += 1;
}

if ($incrLength > 0) {
const $incrResponse = output.$incr = {};
$incrFields.forEach((fieldName) => {
$incrResponse[fieldName] = responses[cursor];
cursor += 1;
});
}

return output;
})
.then((ops) => (ops.length > 1 ? ops : ops[0]));
}

/**
* Handle script, mutually exclusive with metadata
* @param {Array} scriptKeys
* @param {Array} responses
*/
function mapScriptResponse(scriptKeys, responses) {
const output = {};
scriptKeys.forEach((fieldName, idx) => {
output[fieldName] = responses[idx];
});
return output;
}

/**
* Updates metadata on a user object
* @param {Object} opts
* @return {Promise}
*/
function updateMetadata(opts) {
const { redis } = this;
const {
userId, audience, metadata, script,
} = opts;
const audiences = is.array(audience) ? audience : [audience];

// keys
const keys = audiences.map((aud) => redisKey(userId, USERS_METADATA, aud));

// if we have meta, then we can
if (metadata) {
const pipe = redis.pipeline();
const metaOps = is.array(metadata) ? metadata : [metadata];

if (metaOps.length !== audiences.length) {
return Promise.reject(new HttpStatusError(400, 'audiences must match metadata entries'));
}

const operations = metaOps.map((meta, idx) => handleAudience(pipe, keys[idx], meta));
return pipe.exec()
.then(handlePipeline)
.then((res) => mapMetaResponse(operations, res));
}

// dynamic scripts
const $scriptKeys = Object.keys(script);
const scripts = $scriptKeys.map((scriptName) => {
const { lua, argv = [] } = script[scriptName];
const sha = sha256(lua);
const name = `ms_users_${sha}`;
if (!is.fn(redis[name])) {
redis.defineCommand(name, { lua });
}
return redis[name](keys.length, keys, argv);
});

return Promise.all(scripts).then((res) => mapScriptResponse($scriptKeys, res));
}

updateMetadata.handleAudience = handleAudience;
module.exports = updateMetadata;
3 changes: 1 addition & 2 deletions test/suites/actions/ban.js
Original file line number Diff line number Diff line change
@@ -34,9 +34,8 @@ describe('#ban', function banSuite() {

it('must be able to ban an existing user', async function test() {
const response = await this.dispatch('users.ban', { username, ban: true });

assert.equal(response[0], 1);
assert.equal(response[1], 'OK');
assert.equal(response[2], 'OK');
});

it('requesting metadata with a special flag verifies ban state and throws', async function test() {
70 changes: 57 additions & 13 deletions test/suites/actions/update-metadata.js
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@ describe('#updateMetadata', function getMetadataSuite() {
$incr: {
b: 2,
},
$remove: ['c'],
},
{
$incr: {
@@ -75,15 +76,13 @@ describe('#updateMetadata', function getMetadataSuite() {
.then(inspectPromise())
.then((data) => {
const [mainData, extraData] = data;

expect(mainData.$set).to.be.eq('OK');
expect(mainData.$incr.b).to.be.eq(2);
expect(extraData.$incr.b).to.be.eq(3);
});
});

it('must be able to run dynamic scripts', function test() {
const dispatch = simpleDispatcher(this.users.router);
it('must be able to run dynamic scripts', async function test() {
const params = {
username,
audience: [audience, extra],
@@ -95,15 +94,60 @@ describe('#updateMetadata', function getMetadataSuite() {
},
};

return dispatch('users.updateMetadata', params)
.reflect()
.then(inspectPromise())
.then((data) => {
expect(data.balance).to.be.deep.eq([
`{ms-users}${this.userId}!metadata!${audience}`,
`{ms-users}${this.userId}!metadata!${extra}`,
'nom-nom',
]);
});
const updated = await this.dispatch('users.updateMetadata', params);

expect(updated.balance).to.be.deep.eq([
`{ms-users}${this.userId}!metadata!${audience}`,
`{ms-users}${this.userId}!metadata!${extra}`,
'nom-nom',
]);
});

describe('tracks audienceList', function audienceTrackSuite() {
beforeEach(async function updateUser() {
const params = {
username,
audience: [
audience,
'*.extra',
],
metadata: [
{
$set: {
x: 10,
b: 12,
c: 'cval',
},
}, {
$set: {
x: 20,
b: 22,
c: 'xval',
},
},
],
};

await this.dispatch('users.updateMetadata', params);
});
it('adds audience', async function test() {
const audiencesList = await this.users.redis.smembers(`${this.userId}!users-audiences`);
expect(audiencesList).to.include.members(['*.localhost', '*.extra']);
});

it('deletes audience when no metadata left', async function test() {
const deleteParams = {
username,
audience: ['*.extra'],
metadata: [
{
$rem: ['x', 'b', 'c'],
},
],
};
await this.dispatch('users.updateMetadata', deleteParams);
const audiencesList = await this.users.redis.smembers(`${this.userId}!users-audiences`);
expect(audiencesList).to.include.members(['*.localhost']);
});
});
});