Skip to content

Commit

Permalink
Merge pull request #24 from Agoric/rs-refactor-fetching-logs
Browse files Browse the repository at this point in the history
refactor fetching logs
  • Loading branch information
rabi-siddique authored Nov 22, 2024
2 parents 0ad30a2 + 7872b6f commit d9f181d
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 203 deletions.
12 changes: 2 additions & 10 deletions controllers/dateRangeController.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,12 @@ export const handleDateRange = async (req, res) => {
console.log(`Namespace Name: ${networks[network].namespace_name}`);
console.log(`Pod Name: ${networks[network].pod_name}`);

const queryfilter = `
resource.labels.container_name="${networks[network].container_name}" AND
resource.labels.cluster_name="${networks[network].cluster_name}" AND
resource.labels.namespace_name="${networks[network].namespace_name}" AND
resource.labels.pod_name="${networks[network].pod_name}" AND
resource.type="k8s_container"
`;

console.log(`Fetching data from GCP for...`);
console.log(`Fetching data from GCP...`);
const isSuccessful = await fetchAndStoreLogsFromGCP({
startTime: formattedStartDate,
endTime: formattedEndDate,
inputFile,
queryfilter,
network,
});

if (!isSuccessful) {
Expand Down
10 changes: 10 additions & 0 deletions helpers/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ export const networks = {
pod_name: 'validator-primary-0',
},
};

export const ADDITIONAL_QUERY_FILTERS = `
(
jsonPayload.type = "create-vat" OR
jsonPayload.type = "cosmic-swingset-end-block-start" OR
jsonPayload.type = "deliver" OR
jsonPayload.type = "deliver-result" OR
jsonPayload.type = "syscall"
)
`;
81 changes: 81 additions & 0 deletions helpers/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// @ts-check
import { fs } from 'zx';
import { getCredentials } from './getGCPCredentials.js';
import { Logging } from '@google-cloud/logging';
import { networks } from './constants.js';

export const checkFileExists = async ({ filePath, description = 'File' }) => {
try {
Expand Down Expand Up @@ -107,3 +110,81 @@ export const findEntryWithTimestamp = (logs) => {
}
return null;
};

export const fetchLogs = async ({
network,
searchQuery,
startTime,
endTime,
}) => {
const projectId = getCredentials().project_id;
const logging = new Logging({ projectId });

const queryfilter = `
resource.labels.container_name="${networks[network].container_name}" AND
resource.labels.cluster_name="${networks[network].cluster_name}" AND
resource.labels.namespace_name="${networks[network].namespace_name}" AND
resource.labels.pod_name="${networks[network].pod_name}" AND
resource.type="k8s_container" AND
${searchQuery} AND
timestamp >= "${startTime}" AND timestamp <= "${endTime}"
`;

const [entries] = await logging.getEntries({ filter: queryfilter });
return entries;
};

export const calculateDaysDifference = (startTimestamp) => {
const startTime = new Date(startTimestamp);
const currentDate = new Date();

console.log(`Calculating days difference...`);
console.log(`Start Time: ${startTime.toISOString()}`);
console.log(`Current Time: ${currentDate.toISOString()}`);

const timeDifference = currentDate.getTime() - startTime.getTime();
console.log(`Time Difference in Milliseconds: ${timeDifference}`);

const daysDifference = Math.round(timeDifference / (1000 * 60 * 60 * 24));
console.log(`Days since START_BLOCK_EVENT_TYPE: ${daysDifference} days`);

return daysDifference;
};

export const fetchLogsInBatches = async ({
network,
searchQuery,
batchSize = 10,
totalDaysCoverage = 90,
}) => {
try {
let promises = [];

for (
let batchStartIndex = 0;
batchStartIndex < totalDaysCoverage;
batchStartIndex += batchSize
) {
const { startTime, endTime } = getTimestampsForBatch(
batchStartIndex,
totalDaysCoverage
);
console.log(`Fetching logs for ${startTime} to ${endTime}`);

promises.push(
fetchLogs({
network,
searchQuery,
startTime,
endTime,
})
);
}

const logs = await Promise.all(promises);

return logs;
} catch (error) {
console.error(error);
}
};
103 changes: 23 additions & 80 deletions services/fetchAndStoreHeightLogs.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,12 @@
// @ts-check
import { networks } from '../helpers/constants.js';
import { getCredentials } from '../helpers/getGCPCredentials.js';
import { fetchGCPLogs } from './fetchGCPLogs.js';
import {
findEntryWithTimestamp,
getTimestampsForBatch,
calculateDaysDifference,
fetchLogsInBatches,
fetchLogs,
} from '../helpers/utils.js';
import { Logging } from '@google-cloud/logging';
import { fs } from 'zx';

const BATCH_SIZE = 10; // 10 days

const calculateDaysDifference = (startTimestamp) => {
const startTime = new Date(startTimestamp);
const currentDate = new Date();

console.log(`Calculating days difference...`);
console.log(`Start Time: ${startTime.toISOString()}`);
console.log(`Current Time: ${currentDate.toISOString()}`);

const timeDifference = currentDate.getTime() - startTime.getTime();
console.log(`Time Difference in Milliseconds: ${timeDifference}`);

const daysDifference = Math.round(timeDifference / (1000 * 60 * 60 * 24));
console.log(`Days since START_BLOCK_EVENT_TYPE: ${daysDifference} days`);

return daysDifference;
};

const fetchLogsForBatch = async ({
type,
network,
blockHeight,
startTime,
endTime,
}) => {
const projectId = getCredentials().project_id;
const logging = new Logging({ projectId });

const queryfilter = `
resource.labels.container_name="${networks[network].container_name}" AND
resource.labels.cluster_name="${networks[network].cluster_name}" AND
resource.labels.namespace_name="${networks[network].namespace_name}" AND
resource.labels.pod_name="${networks[network].pod_name}" AND
resource.type="k8s_container" AND
jsonPayload.type="${type}" AND
jsonPayload.blockHeight="${blockHeight}" AND
timestamp >= "${startTime}" AND timestamp <= "${endTime}"
`;

const [entries] = await logging.getEntries({ filter: queryfilter });
return entries;
};
import { ADDITIONAL_QUERY_FILTERS } from '../helpers/constants.js';

const fetchLogsByBlockEvents = async ({
network,
Expand All @@ -62,31 +17,16 @@ const fetchLogsByBlockEvents = async ({
try {
console.log(`***** Fetching data for event: ${type} *****`);

let promises = [];

for (
let batchStartIndex = 0;
batchStartIndex < totalDaysCoverage;
batchStartIndex += BATCH_SIZE
) {
const { startTime, endTime } = getTimestampsForBatch(
batchStartIndex,
totalDaysCoverage
);
console.log(`Fetching logs for ${startTime} to ${endTime}`);

promises.push(
fetchLogsForBatch({
network,
blockHeight,
startTime,
endTime,
type,
})
);
}
const searchQuery = `
jsonPayload.type="${type}" AND
jsonPayload.blockHeight="${blockHeight}"`;

const logs = await Promise.all(promises);
const logs = await fetchLogsInBatches({
network,
searchQuery,
batchSize: 10,
totalDaysCoverage,
});

return findEntryWithTimestamp(logs);
} catch (error) {
Expand All @@ -97,7 +37,6 @@ const fetchLogsByBlockEvents = async ({
export const fetchAndStoreHeightLogs = async ({
blockHeight,
inputFile,
queryfilter = '',
network,
}) => {
try {
Expand Down Expand Up @@ -148,19 +87,23 @@ export const fetchAndStoreHeightLogs = async ({

let allEntries = [];

const { entries } = await fetchGCPLogs({
const searchQuery = `
${ADDITIONAL_QUERY_FILTERS}
`;

const entries = await fetchLogs({
startTime,
endTime,
filter: queryfilter,
pageSize: 1000,
searchQuery,
network,
});

console.log('Fetched page size: ' + entries.length);
allEntries = allEntries.concat(entries);

const logEntries = allEntries.map((entry) =>
JSON.stringify(entry.jsonPayload)
);
const logEntries = allEntries.map((entry) => {
return JSON.stringify(entry.data);
});

if (!logEntries) {
throw Error('No Entries found for the given Height');
Expand Down
21 changes: 14 additions & 7 deletions services/fetchAndStoreLogsFromGCP.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
// @ts-check
import { fs } from 'zx';
import { fetchGCPLogs } from './fetchGCPLogs.js';
import { fetchLogs } from '../helpers/utils.js';
import { ADDITIONAL_QUERY_FILTERS } from '../helpers/constants.js';

export const fetchAndStoreLogsFromGCP = async ({
startTime,
endTime,
inputFile,
network,
queryfilter = '',
}) => {
try {
let allEntries = [];

const { entries } = await fetchGCPLogs({
const searchQuery = `
${queryfilter}
${ADDITIONAL_QUERY_FILTERS}
`;

const entries = await fetchLogs({
startTime,
endTime,
filter: queryfilter,
pageSize: 1000,
searchQuery,
network,
});

if (!entries) {
Expand All @@ -25,9 +32,9 @@ export const fetchAndStoreLogsFromGCP = async ({
console.log('Fetched page size: ' + entries.length);
allEntries = allEntries.concat(entries);

const logEntries = allEntries.map((entry) =>
JSON.stringify(entry.jsonPayload)
);
const logEntries = allEntries.map((entry) => {
return JSON.stringify(entry.data);
});

fs.writeFile(inputFile, logEntries.join('\n'), (err) => {
if (err) {
Expand Down
Loading

0 comments on commit d9f181d

Please sign in to comment.