Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Enable string templating in the backend #1057

Merged
merged 17 commits into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions next/src/components/drawer/WorkflowSidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ const InspectSection = ({ selectedNode, updateNode, nodes, edges }: InspectSecti
const outputFields = definition.output_fields;
return outputFields.map((outputField) => {
return {
key: `${ancestorNode.id}-${outputField.name}`,
value: `${definition.type}-${outputField.name}`,
key: `{{${ancestorNode.id}.${outputField.name}}}`,
value: `${definition.type}.${outputField.name}`,
};
});
});
Expand Down
4 changes: 2 additions & 2 deletions next/src/components/workflow/BasicNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import clsx from "clsx";
import type { WorkflowNode } from "../../types/workflow";
import { getNodeBlockDefinitions } from "../../services/workflow/node-block-definitions";

function BasicNode({ data }: NodeProps<WorkflowNode>) {
function BasicNode({ data, selected }: NodeProps<WorkflowNode>) {
const definition = getNodeBlockDefinitions().find((d) => d.type === data.block.type);

return (
Expand All @@ -13,7 +13,7 @@ function BasicNode({ data }: NodeProps<WorkflowNode>) {
"border-translucent rounded-md p-3 shadow-2xl shadow-black",
"bg-stone-900 text-white shadow-stone-800",
"transition-colors duration-300",
"hover:border-white",
selected ? "border-white" : "hover:border-gray-400",
data.status === "running" && "border border-amber-500",
data.status === "success" && "border border-green-500",
!data.status && "border border-gray-500"
Expand Down
25 changes: 13 additions & 12 deletions next/src/pages/workflow/[workflow].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import { useWorkflow } from "../../hooks/useWorkflow";

import { useRouter } from "next/router";
import DashboardLayout from "../../layout/dashboard";
import Button from "../../ui/button";
import { languages } from "../../utils/languages";
import { serverSideTranslations } from "next-i18next/serverSideTranslations";
import nextI18NextConfig from "../../../next-i18next.config";
import { getWorkflowSidebar } from "../../components/drawer/WorkflowSidebar";
import { useAuth } from "../../hooks/useAuth";
import PrimaryButton from "../../components/PrimaryButton";
import { FaPlay, FaSave } from "react-icons/fa";

const WorkflowPage: NextPage = () => {
const { session } = useAuth({ protectedRoute: true });
const router = useRouter();

const {
nodesModel,
edgesModel,
Expand Down Expand Up @@ -44,22 +45,22 @@ const WorkflowPage: NextPage = () => {
/>
<div className="relative h-full w-full">
<div className="absolute bottom-4 right-4 flex flex-row items-center justify-center gap-2">
<Button
className="rounded-md bg-purple-600 px-4 py-2 font-medium text-white transition-colors duration-150 hover:bg-purple-700"
onClick={async () => {
await saveWorkflow();
<PrimaryButton
icon={<FaSave size="15" />}
onClick={() => {
saveWorkflow().catch(console.error);
}}
>
Save
</Button>
<Button
className="rounded-md bg-purple-600 px-4 py-2 font-medium text-white transition-colors duration-150 hover:bg-purple-700"
onClick={async () => {
await executeWorkflow();
</PrimaryButton>
<PrimaryButton
icon={<FaPlay size="15" />}
onClick={() => {
executeWorkflow().catch(console.error);
}}
>
Execute
</Button>
</PrimaryButton>
</div>
</div>
</DashboardLayout>
Expand Down
2 changes: 1 addition & 1 deletion next/src/ui/InputWithSuggestions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const InputWithSuggestions = (props: Props) => {
onClick={() => {
const eventMock = {
target: {
value: `${props.value}{{${field.key}}}`,
value: `${props.value}${field.key}`,
},
};
// @ts-ignore
Expand Down
2 changes: 1 addition & 1 deletion next/src/ui/button.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const Button = forwardRef((props: ButtonProps, ref: ForwardedRef<HTMLButtonEleme
onClick={onClick}
>
{props.ping && <Ping color="white" />}
<div className="flex items-center justify-center gap-x-2.5 px-3 py-2 font-inter text-sm font-normal leading-6">
<div className="flex items-center justify-center gap-x-2.5 px-4 py-2 font-inter text-sm font-semibold leading-6">
{loading ? <Loader /> : props.children}
</div>
</button>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ class SlackWebhookInput(BlockIOBase):
message: str


class SlackWebhookOutput(SlackWebhookInput):
url: str
message: str


class SlackWebhook(Block):
type = "SlackWebhook"
description = "Sends a message to a slack webhook"
Expand All @@ -26,4 +31,4 @@ async def run(self) -> BlockIOBase:
logger.error(f"Failed to send message to webhook: {err}")
raise

return BlockIOBase()
return SlackWebhookOutput(**self.input.dict())
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

import requests
import aiohttp
from loguru import logger
from requests import RequestException

Expand All @@ -11,7 +11,7 @@ class UrlStatusCheckBlockInput(BlockIOBase):
url: str


class UrlStatusCheckBlockOutput(BlockIOBase):
class UrlStatusCheckBlockOutput(UrlStatusCheckBlockInput):
code: Optional[int]


Expand All @@ -22,13 +22,14 @@ class UrlStatusCheckBlock(Block):
input: UrlStatusCheckBlockInput

async def run(self) -> BlockIOBase:
logger.info("Starting UrlStatusCheckBlock")
logger.info(f"Starting UrlStatusCheckBlock with url: {self.input.url}")
try:
response = requests.get(self.input.url)
code = response.status_code
async with aiohttp.ClientSession() as session:
async with session.get(self.input.url) as response:
code = response.status
except RequestException:
code = None
logger.info(f"UrlStatusCheckBlock errored: {RequestException}")

logger.info("UrlStatusCheckBlock Code:", code)
output = UrlStatusCheckBlockOutput(code=code)
logger.info(f"UrlStatusCheckBlock Code: {code}")
output = UrlStatusCheckBlockOutput(code=code, **self.input.dict())
return output
1 change: 1 addition & 0 deletions platform/reworkd_platform/services/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

48 changes: 45 additions & 3 deletions platform/reworkd_platform/services/worker/exec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import re
from typing import Any, Dict

from loguru import logger
from networkx import topological_sort

from reworkd_platform.schemas.workflow.base import WorkflowFull
from reworkd_platform.schemas.workflow.base import Block, BlockIOBase, WorkflowFull
from reworkd_platform.services.kafka.event_schemas import WorkflowTaskEvent
from reworkd_platform.services.kafka.producers.task_producer import WorkflowTaskProducer
from reworkd_platform.services.sockets import websockets
Expand All @@ -18,7 +21,7 @@ async def start(self) -> None:

async def loop(self) -> None:
curr = self.workflow.queue.pop(0)
logger.info(f"Running task: {curr.ref}")
logger.info(f"Running task: {curr}")

websockets.emit(
self.workflow.workflow_id,
Expand All @@ -29,8 +32,16 @@ async def loop(self) -> None:
},
)

curr.block = replace_templates(curr.block, self.workflow.outputs)

runner = get_block_runner(curr.block)
await runner.run()
outputs = await runner.run()

# Place outputs in workflow
outputs_with_key = {
get_template(curr.id, key): value for key, value in outputs.dict().items()
}
self.workflow.outputs.update(outputs_with_key)

websockets.emit(
self.workflow.workflow_id,
Expand Down Expand Up @@ -62,3 +73,34 @@ def create_execution_plan(
work_queue=sorted_nodes,
),
)


TEMPLATE_PATTERN = r"\{\{(?P<id>[\w\d\-]+)\.(?P<key>[\w\d\-]+)\}\}"


def get_template(key: str, value: str) -> str:
return f"{{{{{key}.{value}}}}}"


def replace_templates(block: Block, outputs: Dict[str, Any]) -> Block:
block_input = block.input.dict()

for key, value in block.input.dict().items():
matches = re.finditer(TEMPLATE_PATTERN, value)

for match in matches:
full_match = match.group(0)
block_id = match.group("id")
block_key = match.group("key")

matched_key = get_template(block_id, block_key)

if matched_key in outputs.keys():
value = value.replace(full_match, str(outputs[matched_key]))
else:
raise RuntimeError(f"Unable to replace template: {full_match}")

block_input[key] = value

block.input = BlockIOBase(**block_input)
return block
73 changes: 73 additions & 0 deletions platform/reworkd_platform/tests/workflow/test_replace_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import re

import pytest

from reworkd_platform.schemas.workflow.base import Block
from reworkd_platform.services.worker.exec import get_template, replace_templates

TEMPLATE_PATTERN = r"\{\{(?P<id>[\w\d\-]+)\.(?P<key>[\w\d\-]+)\}\}"


def test_get_template() -> None:
assert get_template("123", "curr") == "{{123.curr}}"


@pytest.mark.parametrize(
"test_input,expected_output",
[
# Success cases
("{{1231-asdfds-12312.curr}}", {"id": "1231-asdfds-12312", "key": "curr"}),
("{{12-34-56.current}}", {"id": "12-34-56", "key": "current"}),
("{{abcd1234.test_key}}", {"id": "abcd1234", "key": "test_key"}),
# Fail cases (return None)
("1231-asdfds-12312.curr", None), # no curly braces
("{{1231-asdfds-12312}}", None), # missing key
("{{.curr}}", None), # missing id
("{{1231-asdfds-12312.}}", None), # missing key after dot
("{{.}}", None), # missing id and key
("{{1231 asdfds 12312.curr}}", None), # id with spaces
("", None), # empty string
],
)
def test_template_pattern(test_input: str, expected_output: dict) -> None:
match = re.match(TEMPLATE_PATTERN, test_input)
if match:
assert match.groupdict() == expected_output
else:
assert match is expected_output # should be None for failed matches


def test_replace_string() -> None:
block = Block(
id="12-34-56",
type="test_type",
input={"curr": "{{12-34-56.curr}}"},
)
outputs = {"{{12-34-56.curr}}": "test_value"}

block = replace_templates(block, outputs)
assert block.input.dict() == {"curr": "test_value"}


def test_replace_single_template() -> None:
block = Block(
id="12-34-56",
type="test_type",
input={"message": "The status code is: {{12-34-56.code}}"},
)
outputs = {"{{12-34-56.code}}": "200"}

block = replace_templates(block, outputs)
assert block.input.dict() == {"message": "The status code is: 200"}


def test_error_if_non_existent_template() -> None:
block = Block(
id="1231-asdfds-12312",
type="test_type",
input={"curr": "{{1231-asdfds-12312.curr}}"},
)
outputs = {}

with pytest.raises(RuntimeError):
replace_templates(block, outputs)