Skip to content

Commit

Permalink
Merge branch 'main' into harshbhat/roadmap
Browse files Browse the repository at this point in the history
  • Loading branch information
mcstepp authored Oct 10, 2024
2 parents 9cd122e + 4991094 commit 46207a5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 20 deletions.
40 changes: 30 additions & 10 deletions apps/workflows/jobs/refill-daily.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ client.defineJob({
const db = connectDatabase();
const t = new Date();
t.setUTCHours(t.getUTCHours() - 24);
const BUCKET_NAME = "unkey_mutations";

type Key = `${string}::${string}`;
type BucketId = string;
const bucketCache = new Map<Key, BucketId>();
const keys = await io.runTask("list keys", () =>
db.query.keys.findMany({
where: (table, { isNotNull, isNull, eq, and, gt, or }) =>
Expand All @@ -35,17 +39,33 @@ client.defineJob({
io.logger.info(`found ${keys.length} keys with daily refill set`);

for (const key of keys) {
const bucket = await io.runTask(`get bucket for ${key.workspaceId}`, async () => {
return await db.query.auditLogBucket.findFirst({
const cacheKey: Key = `${key.workspaceId}::${BUCKET_NAME}`;
let bucketId = "";
const cachedBucketId = bucketCache.get(cacheKey);
if (cachedBucketId) {
bucketId = cachedBucketId;
} else {
const bucket = await db.query.auditLogBucket.findFirst({
where: (table, { eq, and }) =>
and(eq(table.workspaceId, key.workspaceId), eq(table.name, "unkey_mutations")),
and(eq(table.workspaceId, key.workspaceId), eq(table.name, BUCKET_NAME)),
columns: {
id: true,
},
});
});
if (!bucket) {
io.logger.error(`bucket for ${key.workspaceId} does not exist`);
continue;

if (bucket) {
bucketId = bucket.id;
} else {
bucketId = newId("auditLogBucket");
await db.insert(schema.auditLogBucket).values({
id: bucketId,
workspaceId: key.workspaceId,
name: BUCKET_NAME,
});
}
}

bucketCache.set(cacheKey, bucketId);
await io.runTask(`refill for ${key.id}`, async () => {
await db.transaction(async (tx) => {
await tx
Expand All @@ -60,7 +80,7 @@ client.defineJob({
await tx.insert(schema.auditLog).values({
id: auditLogId,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
time: Date.now(),
event: "key.update",
actorId: "trigger",
Expand All @@ -72,15 +92,15 @@ client.defineJob({
type: "workspace",
id: key.workspaceId,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
auditLogId,
displayName: `workspace ${key.workspaceId}`,
},
{
type: "key",
id: key.id,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
auditLogId,
displayName: `key ${key.id}`,
},
Expand Down
40 changes: 30 additions & 10 deletions apps/workflows/jobs/refill-monthly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ client.defineJob({
const db = connectDatabase();
const t = new Date();
t.setUTCMonth(t.getUTCMonth() - 1);
const BUCKET_NAME = "unkey_mutations";

type Key = `${string}::${string}`;
type BucketId = string;
const bucketCache = new Map<Key, BucketId>();
const keys = await io.runTask("list keys", () =>
db.query.keys.findMany({
where: (table, { isNotNull, isNull, eq, and, or }) =>
Expand All @@ -33,17 +37,33 @@ client.defineJob({
);
io.logger.info(`found ${keys.length} keys with monthly refill set`);
for (const key of keys) {
const bucket = await io.runTask(`get bucket for ${key.workspaceId}`, async () => {
return await db.query.auditLogBucket.findFirst({
const cacheKey: Key = `${key.workspaceId}::${BUCKET_NAME}`;
let bucketId = "";
const cachedBucketId = bucketCache.get(cacheKey);
if (cachedBucketId) {
bucketId = cachedBucketId;
} else {
const bucket = await db.query.auditLogBucket.findFirst({
where: (table, { eq, and }) =>
and(eq(table.workspaceId, key.workspaceId), eq(table.name, "unkey_mutations")),
and(eq(table.workspaceId, key.workspaceId), eq(table.name, BUCKET_NAME)),
columns: {
id: true,
},
});
});
if (!bucket) {
io.logger.error(`bucket for ${key.workspaceId} does not exist`);
continue;

if (bucket) {
bucketId = bucket.id;
} else {
bucketId = newId("auditLogBucket");
await db.insert(schema.auditLogBucket).values({
id: bucketId,
workspaceId: key.workspaceId,
name: BUCKET_NAME,
});
}
}

bucketCache.set(cacheKey, bucketId);
await io.runTask(`refill for ${key.id}`, async () => {
await db.transaction(async (tx) => {
await tx
Expand All @@ -58,7 +78,7 @@ client.defineJob({
await tx.insert(schema.auditLog).values({
id: auditLogId,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
time: Date.now(),
event: "key.update",
actorId: "trigger",
Expand All @@ -70,15 +90,15 @@ client.defineJob({
type: "workspace",
id: key.workspaceId,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
auditLogId,
displayName: `workspace ${key.workspaceId}`,
},
{
type: "key",
id: key.id,
workspaceId: key.workspaceId,
bucketId: bucket.id,
bucketId: bucketId,
auditLogId,
displayName: `key ${key.id}`,
},
Expand Down

0 comments on commit 46207a5

Please sign in to comment.