Skip to content

Commit

Permalink
🚀Realtime features (#1131)
Browse files Browse the repository at this point in the history
* 🚀Realtime presence
  • Loading branch information
awtkns authored Jul 24, 2023
1 parent f873f6e commit ec6ed1e
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 205 deletions.
86 changes: 71 additions & 15 deletions next/src/hooks/useSocket.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,89 @@
/* eslint-disable react-hooks/exhaustive-deps */
import Pusher from "pusher-js";
import { useEffect } from "react";
import type { z } from "zod";
import { useEffect, useState } from "react";
import { z } from "zod";

import { env } from "../env/client.mjs";

export default function useSocket<T extends z.Schema>(
const PresenceInfoSchema = z.object({
name: z.string().nullish(),
email: z.string().nullish(),
image: z.string().nullish(),
});

const PresenceSubscriptionSucceededSchema = z.object({
count: z.number(),
me: z.object({
id: z.string(),
info: PresenceInfoSchema,
}),
members: z.record(PresenceInfoSchema),
});

const PresenceMemberEventSchema = z.object({
id: z.string(),
info: PresenceInfoSchema,
});

type PresenceInfo = z.infer<typeof PresenceInfoSchema>;

export default function useSocket<T extends z.Schema, R extends z.Schema>(
channelName: string,
eventSchema: T,
callback: (data: z.infer<T>) => void
accessToken: string | undefined,
callbacks: {
event: string;
callback: (data: unknown) => Promise<void> | void;
}[]
) {
const [members, setMembers] = useState<Record<string, PresenceInfo>>({});

useEffect(() => {
const app_key = env.NEXT_PUBLIC_PUSHER_APP_KEY;
if (!app_key) return () => void 0;
if (!app_key || !accessToken) return () => void 0;

const pusher = new Pusher(app_key, { cluster: "mt1" });
const channel = pusher.subscribe(channelName);
const pusher = new Pusher(app_key, {
cluster: "mt1",
channelAuthorization: {
transport: "ajax",
endpoint: `${env.NEXT_PUBLIC_BACKEND_URL}/api/auth/pusher`,
headers: {
Authorization: `Bearer ${accessToken || ""}`,
},
},
});

console.log("subscribed to channel", channelName);
const channel = pusher.subscribe("presence-" + channelName);
callbacks.map(({ event, callback }) => {
channel.bind(event, async (data) => {
await callback(data);
});
});

channel.bind("my-event", async (data) => {
const obj = (await eventSchema.parse(data)) as z.infer<T>;
callback(obj);
channel.bind("pusher:subscription_succeeded", async (data) => {
const event = await PresenceSubscriptionSucceededSchema.parseAsync(data);
setMembers(event.members);
});

channel.bind("pusher:member_added", async (data) => {
const event = await PresenceMemberEventSchema.parseAsync(data);

setMembers((prev) => ({
...prev,
[event.id]: event.info,
}));
});

channel.bind("pusher:member_removed", async (data) => {
const event = await PresenceMemberEventSchema.parseAsync(data);
setMembers(({ [event.id]: _, ...rest }) => rest);
});

return () => {
pusher.unsubscribe(channelName);
pusher.unsubscribe(channel.name);
pusher.disconnect();
console.warn("unsubscribed from channel", channelName);
setMembers({});
};
}, []);
}, [accessToken]);

return members;
}
45 changes: 33 additions & 12 deletions next/src/hooks/useWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import { useWorkflowStore } from "../stores/workflowStore";
import type { NodeBlock, Workflow, WorkflowEdge, WorkflowNode } from "../types/workflow";
import { getNodeType, toReactFlowEdge, toReactFlowNode } from "../types/workflow";

const eventSchema = z.object({
const StatusEventSchema = z.object({
nodeId: z.string(),
status: z.enum(["running", "success", "failure"]),
remaining: z.number().optional(),
});

const SaveEventSchema = z.object({
user_id: z.string(),
});

const updateValue = <
DATA extends WorkflowEdge | WorkflowNode,
KEY extends keyof DATA,
Expand Down Expand Up @@ -82,17 +86,31 @@ export const useWorkflow = (workflowId: string, session: Session | null) => {
else setSelectedNode(selectedNodes[0]);
}, [nodes]);

useSocket(workflowId, eventSchema, ({ nodeId, status, remaining }) => {
updateValue(setNodes, "status", status, (n) => n?.id === nodeId);
updateValue(setEdges, "status", status, (e) => e?.target === nodeId);

if (remaining === 0) {
setTimeout(() => {
updateValue(setNodes, "status", undefined);
updateValue(setEdges, "status", undefined);
}, 1000);
}
});
const members = useSocket(workflowId, session?.accessToken, [
{
event: "workflow:node:status",
callback: async (data) => {
const { nodeId, status, remaining } = await StatusEventSchema.parseAsync(data);

updateValue(setNodes, "status", status, (n) => n?.id === nodeId);
updateValue(setEdges, "status", status, (e) => e?.target === nodeId);

if (remaining === 0) {
setTimeout(() => {
updateValue(setNodes, "status", undefined);
updateValue(setEdges, "status", undefined);
}, 1000);
}
},
},
{
event: "workflow:updated",
callback: async (data) => {
const { user_id } = await SaveEventSchema.parseAsync(data);
if (user_id !== session?.user?.id) await refetchWorkflow();
},
},
]);

const createNode: createNodeType = (block: NodeBlock) => {
const ref = nanoid(11);
Expand Down Expand Up @@ -146,6 +164,8 @@ export const useWorkflow = (workflowId: string, session: Session | null) => {
})),
file,
});

// #TODO: WHY IS THIS NEEDED?
await refetchWorkflow();
};

Expand All @@ -160,6 +180,7 @@ export const useWorkflow = (workflowId: string, session: Session | null) => {
executeWorkflow: onExecute,
createNode,
updateNode,
members,
};
};

Expand Down
13 changes: 12 additions & 1 deletion next/src/pages/workflow/[workflow].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import { useAuth } from "../../hooks/useAuth";
import { useWorkflow } from "../../hooks/useWorkflow";
import DashboardLayout from "../../layout/dashboard";
import { languages } from "../../utils/languages";
import { get_avatar } from "../../utils/user";

const WorkflowPage: NextPage = () => {
const { session } = useAuth({ protectedRoute: true });
const { query } = useRouter();

const [file, setFile] = useState<File>();

const { nodesModel, edgesModel, selectedNode, saveWorkflow, createNode, updateNode } =
const { nodesModel, edgesModel, selectedNode, saveWorkflow, createNode, updateNode, members } =
useWorkflow(query.workflow as string, session);

return (
Expand Down Expand Up @@ -49,6 +50,16 @@ const WorkflowPage: NextPage = () => {
className="min-h-screen flex-1"
/>
<div className="relative h-full w-full">
<div className="absolute bottom-4 left-12 flex flex-row">
{Object.entries(members).map(([id, user]) => (
<img
className="h-6 w-6 rounded-full bg-neutral-800 ring-2 ring-gray-200/20"
key={id}
src={get_avatar(user)}
alt="user avatar"
/>
))}
</div>
<div className="absolute bottom-4 right-4 flex flex-row items-center justify-center gap-2">
<PrimaryButton
icon={<FaSave size="15" />}
Expand Down
8 changes: 5 additions & 3 deletions next/src/utils/user.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { User } from "next-auth";

export const get_avatar = (user?: User) =>
export const get_avatar = (user?: {
name?: string | null;
email?: string | null;
image?: string | null;
}) =>
user?.image ||
"https://avatar.vercel.sh/" +
(user?.email || "") +
Expand Down
Loading

0 comments on commit ec6ed1e

Please sign in to comment.