Skip to content

Commit

Permalink
fix: improved organization sync
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Oct 20, 2023
1 parent 7833846 commit ca66c9c
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 88 deletions.
3 changes: 3 additions & 0 deletions .jshintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"esversion": 11
}
18 changes: 9 additions & 9 deletions src/datalayer/persistance.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,17 @@ const getStoreData = async (storeId, rootHash) => {
}
return data;
}

logger.error(
`FAILED GETTING STORE DATA FOR ${storeId}: ${JSON.stringify(data)}`,
);
} catch (error) {
logger.info(
`Unable to find store data for ${storeId} at root ${
rootHash || 'latest'
}`,
);
logger.error(error.message);
return false;
}
}
Expand All @@ -330,20 +335,15 @@ const getRoot = async (storeId, ignoreEmptyStore = false) => {
.timeout(timeout)
.send({ id: storeId });

const data = response.body;
const { confirmed, hash } = response.body;

if (
(data.confirmed && !ignoreEmptyStore) ||
(data.confirmed &&
ignoreEmptyStore &&
!data.hash.includes('0x00000000000'))
) {
return data;
if (confirmed && (!ignoreEmptyStore || !hash.includes('0x00000000000'))) {
return response.body;
}

return false;
} catch (error) {
logger.error(error);
logger.error(error.message);
return false;
}
};
Expand Down
78 changes: 54 additions & 24 deletions src/datalayer/syncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,39 @@ const getRootDiff = (storeId, root1, root2) => {
}
};

const getStoreData = async (storeId, callback, onFail, retry = 0) => {
/**
* Fetches store data and invokes either a callback or an error handler.
*
* @param {string} storeId - The ID of the store to fetch data for.
* @param {Function} callback - Function to call on successful data retrieval.
* @param {Function} onFail - Function to call when data retrieval fails.
* @param {number} retry - Number of retry attempts.
*/
const getStoreData = async (storeId, callback, onFail, rootHash, retry = 0) => {
const MAX_RETRIES = 50;
const RETRY_DELAY = 120000;

try {
logger.info(`Getting store data, retry: ${retry}`);
if (retry <= 10) {
const encodedData = await dataLayer.getStoreData(storeId);
if (_.isEmpty(encodedData?.keys_values)) {
await new Promise((resolve) => setTimeout(() => resolve(), 120000));
return getStoreData(storeId, callback, onFail, retry + 1);
} else {
callback(decodeDataLayerResponse(encodedData));
}
} else {
onFail();

if (retry > MAX_RETRIES) {
return onFail(`Max retries exceeded for store ${storeId}`);
}

const encodedData = await dataLayer.getStoreData(storeId, rootHash);

if (!encodedData || _.isEmpty(encodedData?.keys_values)) {
logger.debug(`No data found for store ${storeId}, retrying...`);
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY));
return getStoreData(storeId, callback, onFail, rootHash, retry + 1);
}

const decodedData = decodeDataLayerResponse(encodedData);

callback(decodedData);
} catch (error) {
logger.error(error.message);
onFail();
onFail(error.message);
}
};

Expand All @@ -320,18 +336,32 @@ const getCurrentStoreData = async (storeId) => {
}
};

const getStoreIfUpdated = async (
storeId,
lastRootHash,
onUpdate,
callback,
onFail,
) => {
const rootResponse = await dataLayer.getRoot(storeId);
if (rootResponse.confirmed && rootResponse.hash !== lastRootHash) {
logger.debug(`Updating orgUid ${storeId} with hash ${rootResponse.hash}`);
onUpdate(rootResponse.hash);
await getStoreData(storeId, callback, onFail);
/**
* Checks if the store data has been updated and triggers the appropriate callbacks.
*
* @param {string} storeId - The ID of the store to check.
* @param {string} lastRootHash - The last known root hash for comparison.
* @param {function} callback - Callback to invoke to process the store data.
* @param {function} onFail - Callback to invoke if an operation fails.
*/
const getStoreIfUpdated = async (storeId, lastRootHash, callback, onFail) => {
try {
const rootResponse = await dataLayer.getRoot(storeId);

if (rootResponse.confirmed && rootResponse.hash !== lastRootHash) {
const curriedCallback = (data) => callback(rootResponse.hash, data);

await getStoreData(
storeId,
curriedCallback,
onFail,
rootResponse.hash,
0,
);
}
} catch (error) {
logger.error(error.message);
onFail(error.message);
}
};

Expand Down
118 changes: 63 additions & 55 deletions src/models/organizations/organizations.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ class Organization extends Model {
});
}

// eslint-disable-next-line
static importOrganization = async (orgUid) => {
static async importOrganization(orgUid) {
try {
console.log('Importing organization ' + orgUid);
const orgData = await datalayer.getSubscribedStoreData(orgUid);
Expand Down Expand Up @@ -310,10 +309,9 @@ class Organization extends Model {
} catch (error) {
logger.info(error.message);
}
};
}

// eslint-disable-next-line
static subscribeToOrganization = async (orgUid) => {
static async subscribeToOrganization(orgUid) {
const exists = await Organization.findOne({ where: { orgUid } });
if (exists) {
await Organization.update({ subscribed: true }, { where: { orgUid } });
Expand All @@ -322,65 +320,75 @@ class Organization extends Model {
'Can not subscribe, please import this organization first',
);
}
};
}

// eslint-disable-next-line
static unsubscribeToOrganization = async (orgUid) => {
static async unsubscribeToOrganization(orgUid) {
await Organization.update({ subscribed: false }, { orgUid });
};
}

static syncOrganizationMeta = async () => {
/**
* Synchronizes metadata for all subscribed organizations.
*/
static async syncOrganizationMeta() {
try {
const allSubscribedOrganizations = await Organization.findAll({
subscribed: true,
});

await Promise.all(
allSubscribedOrganizations.map((organization) => {
const onResult = (data) => {
const updateData = data
.filter((pair) => !pair.key.includes('meta_'))
.reduce((update, current) => {
update[current.key] = current.value;
return update;
}, {});

// will return metadata fields. i.e.: { meta_key1: 'value1', meta_key2: 'value2' }
const metadata = data
.filter((pair) => pair.key.includes('meta_'))
.reduce((update, current) => {
update[current.key] = current.value;
return update;
}, {});

Organization.update(
{
..._.omit(updateData, ['registryId']),
metadata: JSON.stringify(metadata),
},
{
where: { orgUid: organization.orgUid },
},
);
};

const onUpdate = (updateHash) => {
allSubscribedOrganizations.map(async (organization) => {
const processData = (data, keyFilter) =>
data
.filter(({ key }) => keyFilter(key))
.reduce(
(update, { key, value }) => ({ ...update, [key]: value }),
{},
);

const onFail = (message) => {
logger.info(`Unable to sync metadata from ${organization.orgUid}`);
logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`);
Organization.update(
{ orgHash: updateHash },
{
where: { orgUid: organization.orgUid },
},
{ orgHash: '0' },
{ where: { orgUid: organization.orgUid } },
);
};

const onFail = () => {
logger.info(`Unable to sync metadata from ${organization.orgUid}`);
const onResult = async (updateHash, data) => {
try {
const updateData = processData(
data,
(key) => !key.includes('meta_'),
);
const metadata = processData(data, (key) =>
key.includes('meta_'),
);

await Organization.update(
{
..._.omit(updateData, ['registryId']),
prefix: updateData.prefix || '0',
metadata: JSON.stringify(metadata),
},
{ where: { orgUid: organization.orgUid } },
);

logger.debug(
`Updating orgUid ${organization.orgUid} with hash ${updateHash}`,
);
await Organization.update(
{ orgHash: updateHash },
{ where: { orgUid: organization.orgUid } },
);
} catch (error) {
logger.info(error.message);
onFail(error.message);
}
};

datalayer.getStoreIfUpdated(
organization.orgUid,
organization.orgHash,
onUpdate,
onResult,
onFail,
);
Expand All @@ -389,9 +397,9 @@ class Organization extends Model {
} catch (error) {
logger.info(error.message);
}
};
}

static subscribeToDefaultOrganizations = async () => {
static async subscribeToDefaultOrganizations() {
try {
const defaultOrgs = await getDefaultOrganizationList();
if (!Array.isArray(defaultOrgs)) {
Expand All @@ -414,9 +422,9 @@ class Organization extends Model {
} catch (error) {
logger.info(error);
}
};
}

static editOrgMeta = async ({ name, icon }) => {
static async editOrgMeta({ name, icon }) {
const myOrganization = await Organization.getHomeOrg();

const payload = {};
Expand All @@ -430,20 +438,20 @@ class Organization extends Model {
}

await datalayer.upsertDataLayer(myOrganization.orgUid, payload);
};
}

static addMetadata = async (payload) => {
static async addMetadata(payload) {
const myOrganization = await Organization.getHomeOrg();

// Prefix keys with "meta_"
const metadata = _.mapKeys(payload, (_value, key) => `meta_${key}`);

await datalayer.upsertDataLayer(myOrganization.orgUid, metadata);
};
}

static removeMirror = async (storeId, coinId) => {
static async removeMirror(storeId, coinId) {
datalayer.removeMirror(storeId, coinId);
};
}
}

Organization.init(ModelTypes, {
Expand Down

0 comments on commit ca66c9c

Please sign in to comment.