Skip to content

Workflows #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 72 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
4349cf7
Catch gcp HttpError
lbrndnr Mar 3, 2022
8e9162c
Add gcp workflow
lbrndnr Mar 7, 2022
7896ddd
Basic workflow invocation
lbrndnr Mar 7, 2022
e079d23
Correct workflow execution
lbrndnr Mar 7, 2022
578b483
Implement workflow updating
lbrndnr Mar 7, 2022
42a6c50
Add fixme comment
lbrndnr Mar 7, 2022
7fe0ef2
Extend aws role policies
lbrndnr Mar 9, 2022
0514130
Basic aws workflow execution
lbrndnr Mar 9, 2022
98ca73f
Fix trigger deserialization
lbrndnr Mar 9, 2022
fe4d473
Fix gcp trigger deserialization
lbrndnr Mar 9, 2022
a15dff0
Make Trigger.deserialize a classmethod
lbrndnr Mar 9, 2022
e7e2bfd
Run azure cmds as docker_user
lbrndnr Mar 11, 2022
1fe2c84
Merge branch 'azure_fix' into workflows
lbrndnr Mar 11, 2022
9b7323b
Add __init__ for aws only
lbrndnr Mar 14, 2022
0b95a11
Minor bug fixes and clean ups
lbrndnr Mar 17, 2022
a2d7a81
Basic azure workflows
lbrndnr Mar 17, 2022
a336c55
Cleanup benchmark interface
lbrndnr Mar 17, 2022
714bed7
Make sure to update aws functions
lbrndnr Mar 17, 2022
8a361fd
Wait for aws function before triggering
lbrndnr Mar 17, 2022
faaa2c2
Improve function code update
lbrndnr Mar 18, 2022
1d4c346
New aws role/policies
lbrndnr Mar 22, 2022
5cbe0af
Merge branch 'master' into workflows
lbrndnr Mar 22, 2022
e11748c
Cleanup
lbrndnr Mar 22, 2022
6738357
Merge branch 'master' into workflows
lbrndnr Mar 22, 2022
c4683c0
Fix aws function waiting bug
lbrndnr Mar 24, 2022
7123245
Separate aws sfm func handler
lbrndnr Mar 24, 2022
48618e2
Save container id to s3
lbrndnr Mar 24, 2022
bfa365a
Fix s3 storage extension
lbrndnr Mar 24, 2022
0681836
Rename benchmark to code package
lbrndnr Mar 25, 2022
83e5b39
Common benchmark class for function and workflow
lbrndnr Mar 25, 2022
8d1a936
Cache workflows
lbrndnr Mar 25, 2022
3fd886d
Delete faas.function
lbrndnr Mar 25, 2022
0ee923c
Generate workflow definitions
lbrndnr Mar 29, 2022
0b82a1a
Generate azure workflows
lbrndnr Mar 31, 2022
a4b2fa5
Basic azure workflow generation
lbrndnr Apr 4, 2022
62f5b5a
Basic workflow switch statements
lbrndnr Apr 5, 2022
3259fc1
Fix workflow def generation
lbrndnr Apr 5, 2022
75d2dbc
Fix azure blob storage
lbrndnr Apr 5, 2022
d5dfc44
Fix azure credential script
lbrndnr Apr 6, 2022
4961349
Send measurements to redis cache
lbrndnr Apr 18, 2022
8a6c404
Fix minor workflow measurement issues
lbrndnr Apr 20, 2022
60b9ddc
Add generic workflow benchmark
lbrndnr Apr 20, 2022
2a253d5
Add aws map state
lbrndnr Apr 20, 2022
c4db885
Add azure map support
lbrndnr Apr 20, 2022
afcdf5d
Make sure keys are unique
lbrndnr Apr 20, 2022
57d1fe8
Remove max_concurrency
lbrndnr Apr 21, 2022
22d10ea
Fix workflow running variable
lbrndnr Apr 21, 2022
af75dc3
Add benchmarks
lbrndnr Apr 21, 2022
5635608
Return correct result
lbrndnr Apr 21, 2022
98f38a9
Fix benchmarks
lbrndnr Apr 22, 2022
25ddf37
Improve measurement download
lbrndnr Apr 22, 2022
7efe689
Clean up azure platform
lbrndnr Apr 26, 2022
2894b62
Linting
lbrndnr Apr 26, 2022
9ea8d39
Linting 2
lbrndnr Apr 26, 2022
89dafe4
Linting 3
lbrndnr Apr 26, 2022
02cbce7
Linting 4
lbrndnr Apr 26, 2022
5046403
Linting 5
lbrndnr Apr 26, 2022
7ab52da
Fix azure main
lbrndnr Apr 26, 2022
f22e41b
Linting 6
lbrndnr Apr 27, 2022
33e1ee2
Linting 7
lbrndnr Apr 27, 2022
ad62620
Relative soft link
lbrndnr Apr 27, 2022
43a8a6e
Error message workflow local deployment
lbrndnr Apr 27, 2022
4f06498
Merge branch 'master' into workflows
lbrndnr Apr 27, 2022
6355ea5
Basic workflow docs
lbrndnr Apr 27, 2022
f49290c
Write new line first
lbrndnr Apr 28, 2022
2d90c54
Hash definition.json too
lbrndnr May 5, 2022
dad2b4b
Remove exponential backoff
lbrndnr May 6, 2022
fff3c11
Unique aws map func name
lbrndnr May 19, 2022
e47dedb
Add loop state
lbrndnr May 20, 2022
c92ceab
Rename loop to repeat
lbrndnr May 20, 2022
d1b1baa
Add loop state
lbrndnr May 20, 2022
cc5e1a0
Update benchmarks-data
lbrndnr Dec 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ ignore_missing_imports = True
[mypy-minio]
ignore_missing_imports = True

[mypy-google.cloud]
ignore_missing_imports = True

[mypy-google.api_core]
[mypy-google.*]
ignore_missing_imports = True

[mypy-testtools]
ignore_missing_imports = True

[mypy-redis]
ignore_missing_imports = True
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/610.gen/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"timeout": 120,
"memory": 128,
"languages": ["python"]
}
48 changes: 48 additions & 0 deletions benchmarks/600.workflows/610.gen/definition.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"root": "get_astros",
"states": {
"get_astros": {
"type": "task",
"func_name": "get_astros",
"next": "select_astros_number"
},
"select_astros_number": {
"type": "switch",
"cases": [
{
"var": "astros.number",
"op": "<",
"val": 10,
"next": "few_people"
},
{
"var": "astros.number",
"op": ">=",
"val": 10,
"next": "many_people"
}
],
"default": "few_people"
},
"few_people": {
"type": "task",
"func_name": "few_people",
"next": "map_astros"
},
"many_people": {
"type": "task",
"func_name": "many_people",
"next": "map_astros"
},
"map_astros": {
"type": "map",
"array": "astros.people",
"func_name": "map_astros",
"next": "process_astros"
},
"process_astros": {
"type": "task",
"func_name": "process_astros"
}
}
}
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/610.gen/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
return dict()
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/610.gen/python/few_people.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def handler(event):
return {
"many_astros": False,
**event
}
8 changes: 8 additions & 0 deletions benchmarks/600.workflows/610.gen/python/get_astros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import requests

def handler(event):
res = requests.get("http://api.open-notify.org/astros.json")

return {
"astros": res.json()
}
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/610.gen/python/many_people.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def handler(event):
return {
"many_astros": True,
**event
}
7 changes: 7 additions & 0 deletions benchmarks/600.workflows/610.gen/python/map_astros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def handler(elem):
name = elem["name"]
fn, ln = name.split(" ")
name = " ".join([ln, fn])
elem["name_rev"] = name

return elem
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/610.gen/python/process_astros.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
def handler(arr):
return {
"astros": arr,
"done": True
}
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/620.func_invo/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"timeout": 120,
"memory": 128,
"languages": ["python"]
}
10 changes: 10 additions & 0 deletions benchmarks/600.workflows/620.func_invo/definition.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"root": "process",
"states": {
"process": {
"type": "loop",
"func_name": "process",
"count": 10
}
}
}
11 changes: 11 additions & 0 deletions benchmarks/600.workflows/620.func_invo/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
size_generators = {
'test' : 10,
'small' : 2**10,
'large': 2**15
}

def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
return { 'size': size_generators[size] }
14 changes: 14 additions & 0 deletions benchmarks/600.workflows/620.func_invo/python/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from random import shuffle

def handler(event):
size = int(event["size"]) if isinstance(event, dict) else len(event)
elems = list(range(size))
shuffle(elems)

data = ""
for i in elems:
data += str(i % 255)
if len(data) > size:
break

return data[:size]
5 changes: 5 additions & 0 deletions benchmarks/600.workflows/630.parallel/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"timeout": 120,
"memory": 128,
"languages": ["python"]
}
15 changes: 15 additions & 0 deletions benchmarks/600.workflows/630.parallel/definition.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"root": "generate",
"states": {
"generate": {
"type": "task",
"func_name": "generate",
"next": "process"
},
"process": {
"type": "map",
"func_name": "process",
"array": "buffer"
}
}
}
11 changes: 11 additions & 0 deletions benchmarks/600.workflows/630.parallel/input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
size_generators = {
'test' : 5,
'small' : 100,
'large': 1000
}

def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
return { 'size': size_generators[size] }
7 changes: 7 additions & 0 deletions benchmarks/600.workflows/630.parallel/python/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
def handler(event):
size = int(event["size"])
buffer = size * ["asdf"]

return {
"buffer": buffer
}
2 changes: 2 additions & 0 deletions benchmarks/600.workflows/630.parallel/python/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def handler(elem):
return elem[::-1]
56 changes: 56 additions & 0 deletions benchmarks/wrappers/aws/python/handler_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

import datetime
import io
import json
import os
import sys
import uuid
import importlib

# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

from redis import Redis

def probe_cold_start():
is_cold = False
fname = os.path.join("/tmp", "cold_run")
if not os.path.exists(fname):
is_cold = True
container_id = str(uuid.uuid4())[0:8]
with open(fname, "a") as f:
f.write(container_id)
else:
with open(fname, "r") as f:
container_id = f.read()

return is_cold, container_id


def handler(event, context):
start = datetime.datetime.now().timestamp()

workflow_name, func_name = context.function_name.split("___")
function = importlib.import_module(f"function.{func_name}")
res = function.handler(event)

end = datetime.datetime.now().timestamp()

is_cold, container_id = probe_cold_start()
payload = json.dumps({
"func": func_name,
"start": start,
"end": end,
"is_cold": is_cold,
"container_id": container_id
})

redis = Redis(host={{REDIS_HOST}},
port=6379,
decode_responses=True,
socket_connect_timeout=10)

key = os.path.join(workflow_name, func_name, str(uuid.uuid4())[0:8])
redis.set(key, payload)

return res
8 changes: 4 additions & 4 deletions benchmarks/wrappers/aws/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ def __init__(self):
@staticmethod
def unique_name(name):
name, extension = os.path.splitext(name)
return '{name}.{random}.{extension}'.format(
return '{name}.{random}{extension}'.format(
name=name,
extension=extension,
random=str(uuid.uuid4()).split('-')[0]
)

def upload(self, bucket, file, filepath):
key_name = storage.unique_name(file)
self.client.upload_file(filepath, bucket, key_name)
return key_name

def download(self, bucket, file, filepath):
self.client.download_file(bucket, file, filepath)

Expand All @@ -46,7 +46,7 @@ def download_stream(self, bucket, file):
data = io.BytesIO()
self.client.download_fileobj(bucket, file, data)
return data.getbuffer()

def get_instance():
if storage.instance is None:
storage.instance = storage()
Expand Down
1 change: 1 addition & 0 deletions benchmarks/wrappers/azure/python/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# TODO: usual trigger
# implement support for blob and others
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
def main(req: func.HttpRequest, starter: str, context: func.Context) -> func.HttpResponse:
income_timestamp = datetime.datetime.now().timestamp()
req_json = req.get_json()
if 'connection_string' in req_json:
Expand Down
59 changes: 59 additions & 0 deletions benchmarks/wrappers/azure/python/handler_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import datetime
import json
import os
import uuid
import importlib

from azure.storage.blob import BlobServiceClient
import azure.functions as func
from redis import Redis

def probe_cold_start():
is_cold = False
fname = os.path.join("/tmp", "cold_run")
if not os.path.exists(fname):
is_cold = True
container_id = str(uuid.uuid4())[0:8]
with open(fname, "a") as f:
f.write(container_id)
else:
with open(fname, "r") as f:
container_id = f.read()

return is_cold, container_id


def main(event):
start = datetime.datetime.now().timestamp()

workflow_name = os.getenv("APPSETTING_WEBSITE_SITE_NAME")
func_name = os.path.basename(os.path.dirname(__file__))

module_name = f"{func_name}.{func_name}"
module_path = f"{func_name}/{func_name}.py"
spec = importlib.util.spec_from_file_location(module_name, module_path)
function = importlib.util.module_from_spec(spec)
spec.loader.exec_module(function)

res = function.handler(event)

end = datetime.datetime.now().timestamp()

is_cold, container_id = probe_cold_start()
payload = json.dumps({
"func": func_name,
"start": start,
"end": end,
"is_cold": is_cold,
"container_id": container_id,
})

redis = Redis(host={{REDIS_HOST}},
port=6379,
decode_responses=True,
socket_connect_timeout=10)

key = os.path.join(workflow_name, func_name, str(uuid.uuid4())[0:8])
redis.set(key, payload)

return res
Loading