diff --git a/content/cn/blog/_posts/2018-06-28-Airflow-Kubernetes-Operator.md b/content/cn/blog/_posts/2018-06-28-Airflow-Kubernetes-Operator.md new file mode 100644 index 0000000000000..1d7eaf47ce406 --- /dev/null +++ b/content/cn/blog/_posts/2018-06-28-Airflow-Kubernetes-Operator.md @@ -0,0 +1,351 @@ +--- +title: 'Airflow在Kubernetes中的使用(第一部分):一种不同的Operator' +cn-approvers: +- congfairy +--- + + + + +**作者**: Daniel Imberman (Bloomberg LP) + + + +## 介绍 + +作为Bloomberg [继续致力于开发Kubernetes生态系统]的一部分(https://www.techatbloomberg.com/blog/bloomberg-awarded-first-cncf-end-user-award-contributions-kubernetes/),我们很高兴能够宣布Kubernetes Airflow Operator的发布; [Apache Airflow](https://airflow.apache.org/),的机制,一种流行的工作流程编排框架,使用Kubernetes API可以在本机启动任意的Kubernetes Pod。 + + + +## 什么是Airflow? + +Apache Airflow是DevOps“Configuration As Code”理念的一种实现。 Airflow允许用户使用简单的Python对象DAG(定向非循环图)启动多步骤流水线。 您可以在易于阅读的UI中定义依赖项,以编程方式构建复杂的工作流,并监视调度的作业。 + +Airflow DAGs +Airflow UI + + + +## 为什么在Kubernetes上使用Airflow? + +自成立以来,Airflow的最大优势在于其灵活性。 Airflow提供广泛的服务集成,包括Spark和HBase,以及各种云提供商的服务。 Airflow还通过其插件框架提供轻松的可扩展性。但是,该项目的一个限制是Airflow用户仅限于执行时Airflow站点上存在的框架和客户端。单个组织可以拥有各种Airflow工作流程,范围从数据科学流到应用程序部署。用例中的这种差异会在依赖关系管理中产生问题,因为两个团队可能会在其工作流程使用截然不同的库。 + +为了解决这个问题,我们使Kubernetes允许用户启动任意Kubernetes pod和配置。 Airflow用户现在可以在其运行时环境,资源和机密上拥有全部权限,基本上将Airflow转变为“您想要的任何工作”工作流程协调器。 + + + +## Kubernetes运营商 + +在我们进一步发展之前,我们应该澄清Airflow中的[Operator](https://airflow.apache.org/concepts.html#operators)是一个任务定义。 当用户创建DAG时,他们将使用像“SparkSubmitOperator”或“PythonOperator”这样的operator分别提交/监视Spark作业或Python函数。 Airflow附带了Apache Spark,BigQuery,Hive和EMR等框架的内置运算符。 它还提供了一个插件入口点,允许DevOps工程师开发自己的连接器。 + +Airflow用户一直在寻找更易于管理部署和ETL流的方法。 在增加监控的同时,任何解耦流程的机会都可以减少未来的停机等问题。 以下是Airflow Kubernetes Operator提供的性能: + + + +* **提高部署灵活性:** +Airflow的插件API一直为希望在其DAG中测试新功能的工程师提供了重要的福利。 不利的一面是,每当开发人员想要创建一个新的operator时,他们就必须开发一个全新的插件。 现在,任何可以在Docker容器中运行的任务都可以通过完全相同的运算符访问,而无需维护额外的Airflow代码。 + + + +* **配置和依赖的灵活性:** +对于在静态Airflow工作程序中运行的operator,依赖关系管理可能变得非常困难。 如果开发人员想要运行一个需要[SciPy](https://www.scipy.org) 的任务和另一个需要[NumPy](http://www.numpy.org) 的任务,开发人员必须维护所有Airflow节点中的依赖关系或将任务卸载到其他计算机(如果外部计算机以未跟踪的方式更改,则可能导致错误)。 自定义Docker镜像允许用户确保任务环境,配置和依赖关系完全是幂等的。 + + + +* **使用kubernetes秘密以增加安全性:** +处理敏感数据是任何开发工程师的核心职责。 Airflow用户总有机会在严格条款的基础上隔离任何API密钥,数据库密码和登录凭据。 使用Kubernetes运算符,用户可以利用Kubernetes Vault技术存储所有敏感数据。 这意味着Airflow工作人员将永远无法访问此信息,并且可以容易地请求仅使用他们需要的密码信息构建pod。 + + + +# 架构 + +Airflow Architecture + +Kubernetes Operator使用[Kubernetes Python客户端](https://github.com/kubernetes-client/Python)生成由APIServer处理的请求(1)。 然后,Kubernetes将使用您定义的需求启动您的pod(2)。 图像将所有必要的环境变量,秘密和依赖项进行加载,制定单个命令。 一旦启动作业,operator只需要监视跟踪日志的状况(3)。 用户可以选择将日志本地收集到调度程序或当前位于其Kubernetes集群中的任何分布式日志记录服务。 + + + +# 使用Kubernetes Operator + +## 一个基本的例子 + +以下DAG可能是我们可以编写的最简单的示例,以显示Kubernetes Operator的工作原理。 这个DAG在Kubernetes上创建了两个pod:一个带有Python的Linux发行版和一个没有它的基本Ubuntu发行版。 Python pod将正确运行Python请求,而没有Python的那个将向用户报告失败。 如果Operator正常工作,则应该完成“passing-task”pod,而“falling-task”pod则向Airflow网络服务器返回失败。 + + +```Python +from airflow import DAG +from datetime import datetime, timedelta +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator +from airflow.operators.dummy_operator import DummyOperator + + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': datetime.utcnow(), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) +} + +dag = DAG( + 'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10)) + + +start = DummyOperator(task_id='run_this_first', dag=dag) + +passing = KubernetesPodOperator(namespace='default', + image="Python:3.6", + cmds=["Python","-c"], + arguments=["print('hello world')"], + labels={"foo": "bar"}, + name="passing-test", + task_id="passing-task", + get_logs=True, + dag=dag + ) + +failing = KubernetesPodOperator(namespace='default', + image="ubuntu:1604", + cmds=["Python","-c"], + arguments=["print('hello world')"], + labels={"foo": "bar"}, + name="fail", + task_id="failing-task", + get_logs=True, + dag=dag + ) + +passing.set_upstream(start) +failing.set_upstream(start) +``` +Basic DAG Run + +## 但这与我的工作流程有什么关系? + +虽然这个例子只使用基本图像,但Docker的神奇之处在于,这个相同的DAG可以用于您想要的任何图像/命令配对。 以下是推荐的CI / CD管道,用于在Airflow DAG上运行生产就绪代码。 + +### 1:github中的PR +使用Travis或Jenkins运行单元和集成测试,请您的朋友PR您的代码,并合并到主分支以触发自动CI构建。 + +### 2:CI / CD构建Jenkins - > Docker Image + +[在Jenkins构建中生成Docker镜像和缓冲版本](https://getintodevops.com/blog/building-your-first-Docker-image-with-jenkins-2-guide-for-developers)。 + +### 3:Airflow启动任务 + +最后,更新您的DAG以反映新版本,您应该准备好了! + +```Python +production_task = KubernetesPodOperator(namespace='default', + # image="my-production-job:release-1.0.1", <-- old release + image="my-production-job:release-1.0.2", + cmds=["Python","-c"], + arguments=["print('hello world')"], + name="fail", + task_id="failing-task", + get_logs=True, + dag=dag + ) +``` + + +# 启动测试部署 + +由于Kubernetes运营商尚未发布,我们尚未发布官方[helm](https://helm.sh/ )图表或operator(但两者目前都在进行中)。 但是,我们在下面列出了基本部署的说明,并且正在积极寻找测试人员来尝试这一新功能。 要试用此系统,请按以下步骤操作: + +## 步骤1:将kubeconfig设置为指向kubernetes集群 + +## 步骤2:clone Airflow 仓库: + +运行`git clone https:// github.com / apache / incubator-airflow.git`来clone官方Airflow仓库。 + +## 步骤3:运行 + +为了运行这个基本部署,我们正在选择我们目前用于Kubernetes Executor的集成测试脚本(将在本系列的下一篇文章中对此进行解释)。 要启动此部署,请运行以下三个命令: + +``` +sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml +./scripts/ci/kubernetes/Docker/build.sh +./scripts/ci/kubernetes/kube/deploy.sh +``` + + +在我们继续之前,让我们讨论这些命令正在做什么: + +### sed -ie“s / KubernetesExecutor / LocalExecutor / g”scripts / ci / kubernetes / kube / configmaps.yaml + +Kubernetes Executor是另一种Airflow功能,允许动态分配任务已解决幂等pod的问题。我们将其切换到LocalExecutor的原因只是一次引入一个功能。如果您想尝试Kubernetes Executor,欢迎您跳过此步骤,但我们将在以后的文章中详细介绍。 + +### ./scripts/ci/kubernetes/Docker/build.sh + +此脚本将对Airflow主分支代码进行压缩,以根据Airflow分布构建Docker容器 + +### ./scripts/ci/kubernetes/kube/deploy.sh + +最后,我们在您的群集上创建完整的Airflow部署。这包括Airflow配置,postgres后端,webserver +调度程序以及之间的所有必要服务。需要注意的一点是,提供的角色绑定是集群管理员,因此如果您没有该集群的权限级别,可以在scripts / ci / kubernetes / kube / airflow.yaml中进行修改。 + +## 步骤4:登录您的网络服务器 + +现在您的Airflow实例正在运行,让我们来看看UI!用户界面位于Airflow pod的8080端口,因此只需运行即可 + +``` +WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1) +kubectl port-forward $WEB 8080:8080 + ``` + + +现在,Airflow UI将存在于http:// localhost:8080上。 要登录,只需输入`airflow` /`airflow`,您就可以完全访问Airflow Web UI。 + +## 步骤5:上传测试文档 + +要修改/添加自己的DAG,可以使用`kubectl cp`将本地文件上传到Airflow调度程序的DAG文件夹中。 然后,Airflow将读取新的DAG并自动将其上传到其系统。 以下命令将任何本地文件上载到正确的目录中: + +`kubectl cp /:/root/airflow/dags -c scheduler` + + + +## 步骤6:使用它! + +# 那么我什么时候可以使用它? + + 虽然此功能仍处于早期阶段,但我们希望在未来几个月内发布该功能以进行广泛发布。 + +# 参与其中 + +此功能只是将Apache Airflow集成到Kubernetes中的多项主要工作的开始。 Kubernetes Operator已合并到[Airflow的1.10发布分支](https://github.com/apache/incubator-airflow/tree/v1-10-test)(实验模式中的执行模块),以及完整的k8s本地调度程序称为Kubernetes Executor(即将发布文章)。这些功能仍处于早期采用者/贡献者可能对这些功能的未来产生巨大影响的阶段。 + +对于有兴趣加入这些工作的人,我建议按照以下步骤: + + *加入airflow-dev邮件列表dev@airflow.apache.org。 + *在[Apache Airflow JIRA]中提出问题(https://issues.apache.org/jira/projects/AIRFLOW/issues/) + *周三上午10点太平洋标准时间加入我们的SIG-BigData会议。 + *在kubernetes.slack.com上的#sig-big-data找到我们。 + +特别感谢Apache Airflow和Kubernetes社区,特别是Grant Nicholas,Ben Goldberg,Anirudh Ramanathan,Fokko Dreisprong和Bolke de Bruin,感谢您对这些功能的巨大帮助以及我们未来的努力。