Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fea: doc #59

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions docs/gpu.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# GPU

### GPU任务提交流程:

1. **`kubectl` 通过配置文件创建GPU-job**

配置文件主要参数有:`job-name`, `ntasks-per-node` 等超算平台需要的参数,`sourcePath` 代码和编译文件存放路径。

2. **`apiServer` 存入GPU-job,并发送创建pod消息**

3. **`kubelet` 创建pod实例**

创建容器的时候,将配置文件中的**参数**,通过**环境变量**形式传入。

将所需要的cuda程序代码和编译文件,通过**volumn映射**传入容器,

创建container所使用的镜像中包含**`gpu_server.py`**,主要作用有:

- 通过`os.getenv` 获取相关参数(环境变量全大写、下划线)
- 通过参数编写作业脚本`.slurm`
- 连接超算平台,传送对应文件
- 加载运行环境,编译程序,提交作业
- 轮询查看是否完成(查看指定路径是否有结果文件输出)
- 完成后给apiServer发消息(job-name和status),结果文件通过volumn映射传回主机

4. apiServer收到job完成消息,修改job状态



### CUDA程序说明

示例程序位于`minik8s/testdata/Gpu`

程序主要有以下几个部分:

1. CUDA 核函数

利用当前线程的行和列索引,编写运算逻辑

```c
__global__ void matrixMulKernel(float *A, float *B, float *C, int m, int k, int n) {
int row = blockDim.y * blockIdx.y + threadIdx.y;
int col = blockDim.x * blockIdx.x + threadIdx.x;

if (row < m && col < n) {
float sum = 0.0f;
for (int i = 0; i < k; ++i) {
sum += A[IDX2C(row, i, m)] * B[IDX2C(i, col, k)];
}
C[IDX2C(row, col, m)] = sum;
}
}
```

2. 内存分配和初始化

3. 配置 CUDA 核函数的执行参数

这部分是使用GPU并发能力的关键,配置线程块、计算网格大小

```c
// 配置 CUDA 核函数的执行参数
dim3 threadsPerBlock(16, 16);
dim3 blocksPerGrid((N + threadsPerBlock.x - 1) / threadsPerBlock.x,
(M + threadsPerBlock.y - 1) / threadsPerBlock.y);

// 调用矩阵乘法核函数
matrixMulKernel<<<blocksPerGrid, threadsPerBlock>>>(devPtrA, devPtrB, devPtrC, M, K, N);
```

4. 结果传回主机和释放内存
32 changes: 32 additions & 0 deletions docs/pod.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Pod

### 功能使用

1. `kubectl get` 可以获取pod运行状态,可以指定namespace、name
2. `kubectl apply -f <file-path>`可以通过配置文件创建pod
3. `kubectl delete pod -n <namespace> name` 可以删除指定pod
4. pod内部容器之间支持localhost访问
5. 外部支持 pod ip+端口 访问



### 实现方式

1. pod创建
- `apiserver` 接收请求,存入etcd,并向kafka中发送创建消息
- `scheduler` 监听pod创建消息,进行调度,将调度节点写入pod配置文件
- `kubelet` 监听pod创建消息(过滤掉没有调度的pod),创建pod实例
- pull对应镜像
- 创建pause容器,cni插件(flannel)为pause容器分配ip地址
- 创建其他容器,配置env、command、volumn等对应参数
- `kubelet` 用心跳机制发回pod ip
2. pod内部通讯
- 通过flannel为pause容器分配ip地址(作为pod ip),pod ip适用于集群
- 其余容器通过linux namespace配置ipc、utc、network,共享pause容器网络(例如:/proc/<pause_pid>/ns/ipc)
- pod内部容器之间支持localhost访问



### CNI插件

选择flannel,通过kubenetes集群配置方式,配置了flannel插件,通过创建容器时指定 --network flannel。主要用于分配pod ip(pause容器ip)
172 changes: 129 additions & 43 deletions docs/serverless.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
# Severless实现
# Serverless

### 1. Function抽象
### 使用方式

**使用方式**
Serverless相关使用

- 用户可以通过单个python文件定义function,python文件名无要求,但是主函数名必须为`main`;
- 用户需要指定参数和返回值的数量、类型,也可指定名字(用于workflow之间传递指定参数);
- 用户可以指定函数invoke方式:http或event trigger;
- 用户需在对应文件夹下提供requirement.txt
1. 通过kubectl apply -f <filename>添加一个function,workflow或事件触发器
2. 通过kubectl get function查看集群中的function
3. 通过kubectl get workflow查看集群中的workflow
4. 通过kubectl get job查看function的调用结果/workflow的中间结果
5. 通过kubectl get trigger result查看workflow的最终结果
6. 通过kubectl delete job <name>删除一个job
7. 通过kubectl delete function <name>删除一个function
8. 通过kubectl delete workflow <name>删除一个workflow
9. 通过kubectl delete trigger <function_name>删除一个事件触发器
10. 通过kubectl http触发一个function/workflow
11. 通过给apiserver发送POST请求触发一个function/workflow,路径为/api/v1/namespaces/<function_namespace>/functions(workflow)/<function_name>/run
12. 通过往etcd中存储数据触发一个function,路径/trigger/<function_namespace>/<function_name>



### 架构设计

1. Serverless架构图:

![img](./img/serverless.png)

### Function抽象:

1. 用户可以通过单个python文件定义function,python文件名无要求,但是主函数名必须为`main`;
2. 用户需要指定参数和返回值的数量、类型,也可指定名字(用于workflow之间传递指定参数);
3. 用户可以指定函数invoke方式:http或event trigger;
4. 用户需在对应文件夹下提供requirement.txt

~~~yaml
yaml文件示例如下:

```yaml
Expand All @@ -29,54 +53,116 @@ result:
- name: greeting
type: string
```
~~~

函数的执行:

1. 通过http、event trigger触发,根据函数配置文件中的参数(数量、类型)定义,组装参数
2. 通过json格式传入容器8080端口,并触发函数运行
3. 结果直接发给`apiserver`

实现方式:

#### 1.1 创建function

通过 `apply -f <filepath>` 指令,通过yaml文件创建function:
结果的返回:

1. 存入etcd;
1. apiserver收到容器返回的结果后,取出对应的Job并将结果存入其中,将Job标记为Ended,代表一次函数调用的结束

2. build函数镜像
### Trigger实现

1. 将所有需要的文件复制到工作路径,安装运行环境
2. 在容器中写入一个server.py,在容器运行时监听8080端口,并且执行用户函数,将结果发送至集群的apiserver

3. 将镜像push到minik8s所维护的镜像仓库

> 该镜像仓库是运行在master IP 下的 5050 端口上的一个 Docker registry
1. Trigger抽象:无论通过http触发还是事件触发器触发,apiserver都会统一生成一条Trigger消息,Trigger消息包含本次调用的uuid,以及被调用的函数和参数,这条消息通过kafka发给Serverless Controller
2. Job抽象:Serverless Controller接收到Trigger消息后,生成一个Job,Job沿用Trigger消息的uuid,函数名称和参数,但是除此之外,Job还记录了当前调用处于的状态(Created,Running和Ended),调用的结果以及该次调用在哪个Pod上执行
3. Job对象的生成:Serverless Controller接收到Trigger消息后,检查被调用函数对应的Serverless实例(Pod):
1. 若当前有空闲的Pod,Serverless Controller直接将该Pod分配给该Job
2. 若没有空闲的Pod,Serverless Controller创建一个新的Pod分配给该Job,这个Pod会在Job被发回给apiserver的时候被真正创建
3. Serverless Controller创建的Job处于Created状态
4. Serverless实例的创建:apiserver接收到从Serverless Controller创建的Job后,检查其中的Pod是否已被创建,若Pod仍未创建,表明这是一个新的Pod,按照Pod创建流程创建
5. Job对象的执行:Job Controller监听Job的变化,当一个Job被创建后,检查其中Pod是否已经被创建:
1. 若Pod没被创建,这个Job会进入等待队列当中,直到Job Controller监听到对应Pod被创建的消息,拿到Pod的IP,才会给Pod IP发http请求,从而调用对应函数
2. 若Pod已经被创建,直接给Pod发http请求调用函数
3. 调用函数的http请求会立刻返回,此时Job Controller把Job标记为Running,并发往apiserver





### Workflow:

1. 每个节点的函数用namespace + name指定
2. 规则匹配采用switch-case模式,匹配满足express的第一个,否则执行default
3. expression需要参数名variable、判断关系opt(>、=等)、值Value、数据类型type,支持多个expression的&&逻辑
4. workflow具体运行触发管理,由workflow_controller控制

workflow 配置文件示例:

```yaml
apiVersion: v1
kind: Workflow
metadata:
name: my-workflow
namespace: default
triggerType:
http: true
graph:
function:
name: BuyTrainTicket
namespace: default
rule:
case:
- expression:
- variable: status
opt: "="
value: "Succeeded"
type: string
successor:
function:
name: ReserveFlight
namespace: default
rule:
case:
- expression:
- variable: status
opt: "="
value: "Succeeded"
type: string
successor:
function:
name: ReserveHotel
namespace: default
rule:
case:
- expression:
- variable: status
opt: "="
value: "Failed"
type: string
successor:
function:
name: CancelFlight
namespace: default
rule:
default:
function:
name: CancelTrainTicket
namespace: default
rule:
default:
function:
name: OrderFailed
namespace: default
default:
function:
name: OrderSucceeded
namespace: default
default:
function:
name: CancelTrainTicket
namespace: default
rule:
default:
function:
name: OrderFailed
namespace: default
default:
function:
name: OrderFailed
namespace: default
```

1. ⽀持Function抽象。⽤⼾可以通过单个⽂件(zip包或代码⽂件)定义函数内容,通过指令上传给
minik8s,并且通过http trigger和event trigger调⽤函数。
◦ 函数需要⾄少⽀持Python语⾔
◦ 函数的格式,return的格式,update、invoke指令的格式可以⾃定义
◦ 函数调⽤的⽅式:⽀持⽤⼾通过http请求、和绑定事件触发两种⽅式调⽤函数。函数可以通过
指令指定要绑定的事件源,事件的类型可以是时间计划、⽂件修改或其他⾃定义内容
2. ⽀持Serverless Workflow抽象:⽤⼾可以定义Serverless DAG,包括以下⼏个组成成分:
◦ 函数调⽤链:在调⽤函数时传参给第⼀个函数,之后依次调⽤各个函数,前⼀个函数的输出作
为后⼀个函数的输⼊,最终输出结果。
◦ 分⽀:根据上⼀个函数的输出,控制⾯决定接下来运⾏哪⼀个分⽀的函数。
◦ Serverless Workflow可以通过配置⽂件来定义,参考AWS StepFunction或Knative的做法。除
此之外,同学们也可以⾃⾏定义编程模型来构建Serverless Workflow,只要workflow能达到
上述要求即可。
3. Serverless的⾃动扩容(Scale-to-0)
◦ Serverless的实例应当在函数请求⾸次到来时被创建(冷启动),并且在⻓时间没有函数请求再
次到来时被删除(scale-to-0)。同时,Serverless能够监控请求数变化,当请求数量增多时能
够⾃动扩容⾄>1实例。
◦ Serverless应当能够正确处理⼤量的并发请求(数⼗并发),演⽰时使⽤wrk2或者Jmeter进⾏压
⼒测试。
4. 找⼀个较为复杂的开源的Serverless应⽤或者⾃⾏实现⼀个较为复杂的Serverless应⽤,该应⽤必
须有现实的应⽤场景(⽐如下图所⽰的 Image Processing),不能只是简单的加减乘除或者hello
world。将该应⽤部署在minik8s上,基于这个应⽤来展⽰Serverless相关功能,并在验收报告中结
合该应⽤的需求和特点详细分析该应⽤使⽤Serverless架构的必要性和优势。 Serverless样例可参考Serverlessbench中的Alexa:ServerlessBench/Testcase4-Applicationbreakdown/alexa at master · SJTU-IPADS/ServerlessBench
42 changes: 42 additions & 0 deletions docs/service&kubeproxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Service & kubeproxy
### service结构定义

通过yaml文件创建

定义selector规则,端口映射,ClusterIp(用户定义的如果被占用会重新分配),对外访问的nodePort

### ClusterIp生成

ip范围:10.96.0.0/16,etcd中存储ClusterIp使用情况(map ClusterIp --> ServiceName)。

预先设置dummy网卡,并将service Cluster ip加入网络

```shell
ip L a minik8s0 type dummy

ip addr add 10.96.0.2/32 dev

echo 1 > /proc/sys/net/ipv4/vs/conntrack
```



### 流量控制

使用 IPVS 控制流量转发。负载均衡选择RoundRobin。

1. 创建service :添加service(ClusterIp:port)到 IPVS 中。在添加之前,会检查service是否已存在于 IPVS 中,如果已存在则跳过。
等效指令:`ipvsadm -A -t <ClusterIP>:<Port> -s rr`
2. 根据service对应的endpoints,创建路由转发规则
`ipvsadm -a -t <ClusterIP>:<Port> -r <PodIP>:<PodPort> -m`
3. 删除service
`ipvsadm -D -t <ClusterIP>:<Port>`
4. 删除路由转发规则
`ipvsadm -d -t <ClusterIP>:<Port> -r <PodIP>:<PodPort>`
5. NodePort配置:
1. 将主机端口访问转发到service ClusterIP:Port
2. 实现指令:`iptables -t nat -A PREROUTING -p tcp --dport <NodePort> -j DNAT --to-destination <ClusterIP>:<Port>`
6. 相关组件:

- `endpoint_controller`:维护endpoints的动态更新
- `kubeproxy`:监听service创建、删除等请求
Loading