Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running on AWS #19

Open
yuvipanda opened this issue Aug 29, 2022 · 21 comments
Open

Support running on AWS #19

yuvipanda opened this issue Aug 29, 2022 · 21 comments

Comments

@yuvipanda
Copy link
Collaborator

We wanna run on AWS! Specifically, I'm going to try demo this running on AWS at a NASA meeting on September 25, so that's the use case :)

On GCP, we have managed runner with DataFlow, and this solves our problem. nbd.

On AWS, I investigated possible managed beam runner options. The two were:

  1. Spark Runner with AWS EMR
  2. Kinesis Data Analytics with their managed Apache Flink

(1) is only kinda semi-managed - we'll still need to run some additional unmanaged infrastructure to run the Beam jobserver, and that sucks.

(2) is a bit of a mystery, from the inscrutable name (is it only meant for use with Kinesis?) to the complete lack of information on the internet about running beam pipelines with Python on top of this. From what I can gather, it can only run Java pipelines, and there isn't space for the portable runner that'll allow us to run Python.

The other option is to run Apache Flink on Kubernetes, with one of these two Flink operators (https://github.com/apache/flink-kubernetes-operator or https://github.com/GoogleCloudPlatform/flink-on-k8s-operator).

A few questions we need to answer before choosing:

  1. How mature is the Spark Runner? Everyone I see seems to be mostly running on Flink if they aren't using Dataflow. I don't want us to use a runner others aren't using, especially one involving the hadoop / JVM / Spark ecosystem if I can avoid it. This is a huge -1 for the EMR + Spark based runner (along with needing to run our job server somewhere)
  2. What's the deal with Kinesis Data Analytics? How tied is it to Kinesis (which we don't use at all)? Can it actually be configured to run Beam with Python? The docs seem to suggest otherwise.
  3. If using k8s, why are there two operators? Which one do we use?
@yuvipanda
Copy link
Collaborator Author

yuvipanda commented Aug 29, 2022

My current thinking is to disqualify the Spark Runner, try to validate my assumptions about Kinesis Data Analytics, and if not, run this on Flink on k8s (which seems to have decent support) on EKS.

@cisaacstern
Copy link
Member

👋 @charlesa101 @lerms we met at the Beam Summit in Austin this past July after I attended your talk Implementing Cloud Agnostic Machine Learning Workflows With Apache Beam on Kubernetes. As I mentioned then, we are currently running Beam on Dataflow, but would really like to support AWS as well. You two seemed to have this infra really dialed, so I thought I'd reach out in hopes that our team can learn from your experience.

Any insights you could offer would be greatly appreciated! 🙏 As one question to start, are there any particular open repos in https://github.com/MavenCode which would be helpful for us to take a look at?

@sharkinsspatial
Copy link

@yuvipanda I did some initial research on option 2 and 3 when we began the Beam refactoring efforts. To simplify development in the near term @cisaacstern wanted to initially focus efforts on ensuring that our Beam based recipes would work correctly with Dataflow so we paused our work on Beam executor "bakeries" for other cloud providers, but now looks like a good time to start revisiting this.

I agree that the documentation around Kinesis Data Analytics (KDA) as a Flink runner with respect to Python based Beam pipelines is quite sparse. This presentation seems to claim that KDA is a fully featured managed Flink platform which should be capable of running our portable pipelines but I have not seen any tangible examples of this in the public space. @batpad is planning on trying to investigate this option this week and the AWS folks we work with on the ASDI initiative are going to try to put us in contact with some of the KDA team. With @batpad I proposed we

  1. Verify if we can run the basic wordcount pipeline on KDA.
  2. Then research if we can use our custom image to run recipe compiled to Beam.

The Flink on k8s landscape seems to be a bit fragmented as well. To avoid complexity (as I'm not a deep k8s person 😄 ) I was hoping that I might find a straightforward helm chart, but the operators you listed above seemed to be the only publicly available repos and the architecture for flink-kubernetes-operator seemed somewhat intimidating for our usage.

If it functions for our use case, I like the simplicity and lack of infrastructure management of a fully managed service like KDA. The one benefit of Flink on k8s is that we would also have a consistent Beam executor model for use on Azure as well.

@yuvipanda
Copy link
Collaborator Author

@sharkinsspatial I think investigating KDA to see if it works, and falling back to a flink operator on EKS seems the right way to approach this.

Doing this with flink + k8s is definitely more complex than a fully managed service, but I want to reassure you that that architecture diagram isn't so bad - JupyterHub is probably about just as complex :)

@yuvipanda
Copy link
Collaborator Author

@sharkinsspatial I was thinking what would we need to do to make sure it works on KDA:

  1. Supports running Python with our custom container image (very important here!)
  2. Logs are retrievable (Cloudtrail / cloudwatch ok)

So if we can get these two to work, then we can just use KDA!

@batpad
Copy link

batpad commented Aug 31, 2022

Supports running Python with our custom container image (very important here!)

Spent a bit of time trying to figure out how this might work. I might be being a bit naive here, just coming into this situation and missing something, but I don't see any way we can run a custom container using KDA.

The way I'm understanding KDA:

You create a Kinesis stream and then setup a Kinesis Data Analytics Application on AWS. When setting this up, you pick a version of Apache Flink it will run. You configure this application by giving it a Kinesis stream to read events / data from, and a path to s3 that contains your application code in either Java or Python. Here is the example for a python application: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python . It looks like, for example, you need to specify the python packages your app needs with something like https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/python/PythonPackages .

With this configured, one could then send messages to the Kinesis stream, and the AWS managed service will run your code against data coming through the stream. I am seeing absolutely no way to do something like specify a custom container image / where exactly one would submit jobs similar to how we are using DataFlow.

I feel like I don't completely understand our setup and maybe missing something. I could go ahead and create a KDA on AWS and try doing things, but it does seem like a bit of work and currently I do not see any pathway at all to getting it to do what we need it to.

@sharkinsspatial let me know if it helps to chat.

@yuvipanda
Copy link
Collaborator Author

@batpad That matches what my understanding is too from reading docs. But I think you and @sharkinsspatial have more AWS experience. If we can confirm to both your satisfaction that this isn't possible, we can tick that box and move on to EKS.

@charlesa101
Copy link

👋 @charlesa101 @lerms we met at the Beam Summit in Austin this past July after I attended your talk Implementing Cloud Agnostic Machine Learning Workflows With Apache Beam on Kubernetes. As I mentioned then, we are currently running Beam on Dataflow, but would really like to support AWS as well. You two seemed to have this infra really dialed, so I thought I'd reach out in hopes that our team can learn from your experience.

Any insights you could offer would be greatly appreciated! 🙏 As one question to start, are there any particular open repos in https://github.com/MavenCode which would be helpful for us to take a look at?

Hi @cisaacstern
We ran our setup on Kubernetes with Spark Master/Worker containers created as Statefulsets and we had the Jobservice pod that connects to the Spark Master to submit our beam jobs.

SparkOperator on Kubernetes option worked as well but the version of Spark currently supported on SparkOperator won't work with the Jobservice because we wanted to use the PortableRunner

@lerms and I will open source what we have on our git repo https://github.com/MavenCode in the next few days, Sorry it's been a bit busy here

@cisaacstern
Copy link
Member

@charlesa101 thanks so much for these insights! I believe we will want to use PortableRunner as well, because we want to use our own Docker images (and I think that's how you do that in Beam).

Thank you so much for being willing to open source your work! If it's not too much to ask, could you post a link to the repo here once it's up? Again, can't thank you enough for sharing this experience. 🙏

@charlesa101
Copy link

Sure! You're welcome! we need more contributors and people to kick it around as well to make it better

@sharkinsspatial
Copy link

Looping in @briannapagan.

yuvipanda added a commit that referenced this issue Sep 1, 2022
This needs to:

- Create a Flink cluster by setting up a CRD to a kubernetes
  cluster, as that is where the container image used for
  execution is specified
- Use `kubectl port-forward` to proxy to the flink cluster,
  so we can actually talk to it

This uses https://github.com/apache/flink-kubernetes-operator,
as that is the most actively developed, community governed
operator.

Ref #19
@yuvipanda yuvipanda mentioned this issue Sep 1, 2022
5 tasks
yuvipanda added a commit that referenced this issue Sep 17, 2022
This needs to:

- Create a Flink cluster by setting up a CRD to a kubernetes
  cluster, as that is where the container image used for
  execution is specified
- Use `kubectl port-forward` to proxy to the flink cluster,
  so we can actually talk to it

This uses https://github.com/apache/flink-kubernetes-operator,
as that is the most actively developed, community governed
operator.

Ref #19
yuvipanda added a commit that referenced this issue Sep 22, 2022
This needs to:

- Create a Flink cluster by setting up a CRD to a kubernetes
  cluster, as that is where the container image used for
  execution is specified
- Use `kubectl port-forward` to proxy to the flink cluster,
  so we can actually talk to it

This uses https://github.com/apache/flink-kubernetes-operator,
as that is the most actively developed, community governed
operator.

Ref #19
@yuvipanda
Copy link
Collaborator Author

Merged #21 that runs on AWS! Here's a tutorial on how to use it: https://pangeo-forge-runner.readthedocs.io/en/latest/tutorial/flink.html.

Need folks to try it out though.

@alxmrs
Copy link

alxmrs commented Sep 23, 2022

Hey @kevingg! @pabloem recommended that I reach out to you for questions on Beam's Flink runner for Python. @yuvipanda can probably recreate the problems we're facing better than I can, but to summarize:

  • When running a Beam pipeline on Flink (for this project), we notice that only one worker is ever used (parallelism=1)
  • We've noticed that the Apache Beam documentation for Flink says there is a parallelism argument we can set, but after investigating for a while, we've found that this isn't supported / respected in the Python Flink runner (which uses the Portable Beam Runner).
  • Our Beam pipeline runs in parallel on Dataflow. Here, it makes extensive use of Reshuffle() to specify parallel stages of execution (and to prevent operator fusion). Does Flink treat reshuffles in the same way? We recognize that the problem at hand could be that all operations are fused into one, and thus only one thread of execution is needed. If this is the case, what's the right way to restructure our pipeline?

@nika-qubit
Copy link

  • When running a Beam pipeline on Flink (for this project), we notice that only one worker is ever used (parallelism=1)

This is true. To be more precisely, one task per step.

  • We've noticed that the Apache Beam documentation for Flink says there is a parallelism argument we can set, but after investigating for a while, we've found that this isn't supported / respected in the Python Flink runner (which uses the Portable Beam Runner).

If the pipeline option is added (I'm doing it in apache/beam#23271), it will be recognized by the Java job server (flink runner). But one caveat is that the parallelism is applied to the whole pipeline, so all the steps will share the same parallelism. Relatedly, there is also a max_parallelism option.

  • Our Beam pipeline runs in parallel on Dataflow. Here, it makes extensive use of Reshuffle() to specify parallel stages of execution (and to prevent operator fusion). Does Flink treat reshuffles in the same way? We recognize that the problem at hand could be that all operations are fused into one, and thus only one thread of execution is needed. If this is the case, what's the right way to restructure our pipeline?

Yes and you have to explicitly apply Reshuffle when using Flink, otherwise, elements fanned out will always stay on one Task Manager (worker); while in Dataflow, sometimes it knows how to rebalance data to more VMs without an explicit Reshuffle.

@rsignell-usgs
Copy link

rsignell-usgs commented Aug 3, 2023

Looks like Beam 2.42.0 was released in October, including the merged PR to support parallelism apache/beam#23271
So maybe we just need a passthrough here in the runner?

@rsignell-usgs
Copy link

rsignell-usgs commented Aug 3, 2023

Woohoo, finally success running Beam pipelines at scale on AWS!

We first deployed the Kubernetes cluster running Apache Flink on AWS using the @yuvipanda Terraform deployment method. The environment we used to run the terraform stuff is described here.

Then to execute pangeo-forge beam recipes from the command line we created this environment

name: runner
channels:
  - conda-forge
dependencies:
  - python=3.9.13
  - pangeo-forge-recipes=0.10.0
  - apache-beam=2.42.0
  - pandas<2.0
  - s3fs
  - pip:
      - git+https://github.com/pangeo-forge/pangeo-forge-runner.git@main

with this aws_config.py:

c.TargetStorage.root_path = "s3://esip-qhub/testing/pangeo_forge/{job_name}"

c.TargetStorage.fsspec_class = "s3fs.S3FileSystem"
c.TargetStorage.fsspec_args = { "key":"xxxxxxxxxxxxx", "secret":"xxxxxxxxxxxx", "client_kwargs":{"region_name":"us-west-2"}}

c.InputCacheStorage.fsspec_class = c.TargetStorage.fsspec_class
c.InputCacheStorage.fsspec_args = c.TargetStorage.fsspec_args

c.InputCacheStorage.root_path = "s3://esip-qhub/testing/pangeo-forge/pangeo_forge/inputcache-data/"

c.Bake.bakery_class = "pangeo_forge_runner.bakery.flink.FlinkOperatorBakery"

Example call from the command line:

pangeo-forge-runner bake --repo https://github.com/pforgetest/gpcp-from-gcs-feedstock/ --ref beam-refactor --config aws_config.py --prune --FlinkOperatorBakery.parallelism=10 --Bake.job_name=awstest02

and here you can see the requested parallelism:
2023-08-03_17-26-26
including the pods spinning up to meet the request:
2023-08-03_17-22-48

@yuvipanda
Copy link
Collaborator Author

OMG THIS IS FUCKING AMAZING @rsignell-usgs!!!!!!

@rsignell-usgs
Copy link

Credit where credit is due: its you and @cisaacstern !!

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Oct 18, 2023

@rsignell-usgs: I know it's been a couple months. I'm over on this side of town trying to reproduce your success and getting sorta/maybe close with the same recipe but never quite there.

A couple questions:

  1. I see your pangeo-forge-runner command above is using --prune. Did you try it without --prune?
  2. Did you validate that your zarr output in the s3 bucket was full and complete?

For me the task pods are never completing successfully (meaning I expect Python subprocess to exit with a status code of 0 and then quietly be killed without any traceback or error). Sometimes a job will create zarr output but my hunch is that StoreToZarr isn't completing successfully b/c tasks running the subcommand python -m apache_beam.runners.worker.sdk_worker_main give me a potentially dreaded Python exited: <nil>

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Oct 18, 2023

Sometimes a job will create zarr output but my hunch is that StoreToZarr isn't completing successfully b/c tasks running the subcommand python -m apache_beam.runners.worker.sdk_worker_main give me a potentially dreaded Python exited: <nil>

I realized after I wrote this from the perspective of the parent Golang process <nil> might be the success status 🤔 So let me compare outputs

@ranchodeluxe
Copy link
Collaborator

ranchodeluxe commented Oct 18, 2023

Update: zarr outputs match what the GCP DataFlow integrations tests have. Here are some notes about getting this recipe working on EKS and Flink from Oct 11th-17th that might be helpful for future humanoids ✨ 🤖

What's Worked and Hasn't Worked

Most of my time has been spent trying to understand if jobs/tasks are actually succeeding. The lack of clear job status updates on the kind: flinkdeployment has to be a bug. Regarding my last comment Python exited: <nil> should convey success from the perspective of the Golang parent process. But even if that's returned there's no guarantee that it produced zarr output depending on which branch and version we're using. Here's a breakdown:

produces
zarr
output
k8s
flinkdeployment
status
pangeo
forge
runner
branch
flink
operator
version
flink
version
apache
beam
version
no JOB STATUS: "empty" main 1.5.0 1.15 2.42.0
yes JOB STATUS: "empty" unpin-beam 1.5.0 1.16 2.[47-51].0
(all versions listed https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16/)

Caching and Delays -- Oh My

When working with the pangeo-forge-runner@unpin-beam branch and experimenting with custom pangeo-forge-recipe branches and different apache-beam versions I've noticed the following things about caches and delays (delays in terms of the JobManager being delayed in kicking off the TaskManager):

Caching:

  • you don't want to have multiple versions apache-beam pip installed or the apache-beam client (which pangeo-forge-runner delegates to) might choose the wrong jar (~/.apache_beam/cache/jars/*.jar) and SDK image

  • if you're playing with different dependencies and versions of pangeo-forge-recipes you also want to make sure the cache where the apache-beam client temporarily downloads packages is clean otherwise EVERYTHING has to get uploaded to the flink server and then EVERYTHING gets staged on the workers and then it's not clear what's actually running in the workers. In the pangeo-forge-runner cli output (toward the end) you should see a line similar to below showing where the cache dir is located on the host as the value of the --dest flag

Executing command: ['/Users/ranchodeluxe/apps/venv_py39/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/99/ts9mxkwx1n73mbwqvbjztfbh000
0gr/T/dataflow-requirements-cache', '-r', '/var/folders/99/ts9mxkwx1n73mbwqvbjztfbh0000gr/T/tmps3_6zbwx/tmp_requirements.txt', '--exists-action', 'i
', '--no-deps', '--implementation', 'cp', '--abi', 'cp39', '--platform', 'manylinux2014_x86_64']

Delays:

  • some of the delay between JobManager and TaskManager is b/c the JobManager needs a little more resource juice. Been running with this override: --FlinkOperatorBakery.job_manager_resources='{"cpu": 1.0}'

  • other aspects of the delay seem to be related to the fact that the apache-beam client has to load the job-server jar and all our requirements to the flink server. Not sure if that can be mitigated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants