Skip to content

Commit

Permalink
fix: K8SWorkerScheduler can not work (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
y1yang0 committed Jan 19, 2022
1 parent 4d3d201 commit 1cb11e3
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 51 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ mat-deps

k8s/*.tgz
cloud/
*.tgz
*.tgz

bin
deploy/k8s_pattern/*.log
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public enum ErrorCode {

FILE_TOO_BIG,

RETRY,

RELEASE_PENDING_JOB;

public boolean isFatal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
********************************************************************************/
package org.eclipse.jifa.common.vo;

import io.vertx.serviceproxy.ServiceException;
import lombok.Data;
import org.eclipse.jifa.common.ErrorCode;
import org.eclipse.jifa.common.JifaException;
Expand All @@ -27,6 +28,12 @@ public ErrorResult(Throwable t) {
if (t instanceof JifaException) {
errorCode = ((JifaException) t).getCode();
}
if (t instanceof ServiceException) {
ServiceException se = (ServiceException) t;
if (ErrorCode.values().length > se.failureCode() && se.failureCode() >= 0) {
errorCode = ErrorCode.values()[se.failureCode()];
}
}

if (t instanceof IllegalArgumentException) {
errorCode = ErrorCode.ILLEGAL_ARGUMENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ private void transfer(RoutingContext context, TransferWay way) {
String origin = extractOriginalName(sb.toString());
FileType type = FileType.valueOf(context.request().getParam("type"));

String name = buildFileName(userId, origin);
String name;
if (context.request().getParam("retry") != null) {
name = context.request().getParam("retry");
} else {
name = buildFileName(userId, origin);
}
request.params().add("fileName", name);

fileService.rxTransfer(userId, type, origin, name, way, convert(request.params()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@
import org.eclipse.jifa.master.service.sql.JobSQL;
import org.eclipse.jifa.master.service.sql.MasterSQL;
import org.eclipse.jifa.master.service.sql.WorkerSQL;
import org.eclipse.jifa.master.support.Factory;
import org.eclipse.jifa.master.support.Pattern;
import org.eclipse.jifa.master.support.WorkerClient;
import org.eclipse.jifa.master.support.WorkerScheduler;
import org.eclipse.jifa.master.support.*;
import org.eclipse.jifa.master.task.SchedulingTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,6 +104,8 @@ public class Pivot {

private WorkerScheduler scheduler;

private boolean isDefaultPattern;

private Pivot() {
}

Expand All @@ -121,6 +120,7 @@ public static synchronized Pivot instance(Vertx vertx, JDBCClient dbClient, Json
jm.vertx = vertx;

Pattern pattern = Pattern.valueOf(ConfigHelper.getString(jm.config(SCHEDULER_PATTERN)));
jm.isDefaultPattern = pattern == Pattern.DEFAULT;
jm.scheduler = Factory.create(pattern);

jm.pendingJobMaxCount = ConfigHelper.getInt(jm.config(PENDING_JOB_MAX_COUNT_KEY));
Expand Down Expand Up @@ -152,6 +152,11 @@ public boolean isLeader() {
return currentMaster.isLeader();
}

public boolean isDefaultPattern() {
SERVICE_ASSERT.isTrue(scheduler instanceof DefaultWorkerScheduler, "must be");
return isDefaultPattern;
}

public void setSchedulingTask(SchedulingTask task) {
this.schedulingTask = task;
}
Expand Down Expand Up @@ -277,7 +282,7 @@ private HashMap<String, String> buildParams(File... files) {
@SuppressWarnings("rawtypes")
Map[] maps = new Map[files.length];
for (int i = 0; i < files.length; i++) {
Map<String, String> map= new HashMap<>();
Map<String, String> map = new HashMap<>();
map.put("name", files[i].getName());
map.put("type", files[i].getType().name());
maps[i] = map;
Expand Down Expand Up @@ -344,9 +349,14 @@ public Completable finish(Job job) {

private Completable finish(Job job, Function<SQLConnection, Completable> post) {
return inTransactionAndLock(
conn -> selectWorker(conn, job.getHostIP())
.flatMapCompletable(worker -> updateWorkerLoad(conn, worker.getHostIP(),
worker.getCurrentLoad() - job.getEstimatedLoad()))
conn -> scheduler.decide(job, conn)
.flatMapCompletable(worker -> {
if (isDefaultPattern()) {
return updateWorkerLoad(conn, worker.getHostIP(), worker.getCurrentLoad() - job.getEstimatedLoad());
} else {
return Completable.complete();
}
})
.andThen(insertHistoricalJob(conn, job))
// delete old job
.andThen(deleteActiveJob(conn, job))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,18 @@
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodBuilder;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Volume;
import io.kubernetes.client.openapi.models.V1VolumeMount;
import io.kubernetes.client.openapi.models.*;
import io.kubernetes.client.util.Config;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import org.eclipse.jifa.master.Constant;
import io.vertx.serviceproxy.ServiceException;
import org.eclipse.jifa.common.ErrorCode;
import org.eclipse.jifa.common.JifaException;
import org.eclipse.jifa.master.entity.Job;
import org.eclipse.jifa.master.entity.Worker;
import org.eclipse.jifa.master.entity.enums.JobType;
import org.eclipse.jifa.master.model.WorkerInfo;
import org.eclipse.jifa.master.service.impl.Pivot;
import org.eclipse.jifa.master.task.PVCCleanupTask;
Expand All @@ -45,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -55,6 +49,8 @@ public class K8SWorkerScheduler implements WorkerScheduler {

private static final Logger LOGGER = LoggerFactory.getLogger(K8SWorkerScheduler.class);

private static final String WORKER_PREFIX = "jifa-worker";

private static String NAMESPACE;

private static String WORKER_IMAGE;
Expand Down Expand Up @@ -142,7 +138,12 @@ public void initialize(Pivot pivot, Vertx vertx, JsonObject config) {

@Override
public Single<Worker> decide(Job job, SQLConnection conn) {
return null;
String name = buildWorkerName(job);
String workerIp = getWorkerInfo(name).getIp();
Worker handmake = new Worker();
handmake.setHostIP(workerIp);
handmake.setHostName(name);
return Single.just(handmake);
}

@Override
Expand All @@ -151,34 +152,46 @@ public boolean supportPendingJob() {
}

private String buildWorkerName(Job job) {
return "my-worker" + job.getTarget().hashCode();
return WORKER_PREFIX + job.getTarget().hashCode();
}

@Override
public Completable start(Job job) {
String name = buildWorkerName(job);
Map<String, String> config = new HashMap<>();
// FIXME
config.put("requestMemSize", Long.toString(512 * 1024 * 1024));

return Completable.fromAction(() -> {
String name = buildWorkerName(job);
Map<String, String> config = new HashMap<>();
if (job.getType() != JobType.FILE_TRANSFER) {
// see AnalyzerRoute.calculateLoad
long size = job.getEstimatedLoad() / 10 * 1024 * 1024 * 1024;
config.put("requestMemSize", Long.toString(size));
}
schedule(name, config);

// FIXME
while (true) {
HttpResponse<Buffer> response =
WorkerClient.get(getWorkerInfo(name).getIp(), Constant.uri(Constant.PING)).blockingGet();
if (response.statusCode() == Constant.HTTP_GET_OK_STATUS_CODE) {
return;
}
}
});
schedule(name, config);

String workerIp = getWorkerInfo(name).getIp();

if (workerIp == null) {
// Front-end would retry original request until worker pod has been started or
// timeout threshold reached.
return Completable.error(new ServiceException(ErrorCode.RETRY.ordinal(), job.getTarget()));
}
return WorkerClient.get(workerIp, uri(PING))
.flatMap(resp -> Single.just("OK"))
.onErrorReturn(err -> {
if (err instanceof ConnectException) {
// ConnectionException is tolerable because it simply indicates worker is still
// starting
return "RETRY";
}
return err.getMessage();
}).flatMapCompletable(msg -> {
if (msg.equals("OK")) {
return Completable.complete();
} else if (msg.equals("RETRY")) {
return Completable.error(new ServiceException(ErrorCode.RETRY.ordinal(), job.getTarget()));
} else {
return Completable.error(new JifaException("Can not start worker due to internal error: " + msg));
}
});
}

public void schedule(String id, Map<String, String> config) {
private void schedule(String id, Map<String, String> config) {
long requestMemSize = 0L;

String tmp = config.get("requestMemSize");
Expand All @@ -187,9 +200,9 @@ public void schedule(String id, Map<String, String> config) {
}

if (getWorkerInfo(id) != null) {
LOGGER.debug("Start worker " + id + " but it already exists");
LOGGER.debug("Create worker {} but it already exists", id);
} else {
LOGGER.debug("Start worker " + id + "[MemRequest:" + requestMemSize + "bytes]");
LOGGER.debug("Create worker {} [MemRequest: {}bytes]", id, requestMemSize);
createWorker(id, requestMemSize);
}
}
Expand All @@ -207,7 +220,7 @@ public Completable stop(Job job) {
});
}

public WorkerInfo getWorkerInfo(String id) {
private WorkerInfo getWorkerInfo(String id) {
V1Pod npod = null;
try {
npod = api.readNamespacedPod(id, NAMESPACE, null, null, null);
Expand All @@ -222,4 +235,4 @@ public WorkerInfo getWorkerInfo(String id) {
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ void diskUsage(Promise<DiskUsage> promise) {

promise.complete(new DiskUsage(totalSpaceInMb, usedSpaceInMb));
}

@RouteMeta(path = "/system/ping")
void ping(Promise<Void> promise) {
promise.complete();
}
}
2 changes: 1 addition & 1 deletion deploy/k8s_pattern/jifa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ spec:
apiVersion: v1
kind: Pod
metadata:
name: my-jifa-master
name: jifa-master
labels:
app: jifa-master
spec:
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s_pattern/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ metadata:
name: jifa-worker
spec:
containers:
- name: my-jifa-worker
- name: jifa-worker-test
image: docker.io/jifadocker/jifa-worker-open
ports:
- containerPort: 8102
29 changes: 29 additions & 0 deletions frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"dependencies": {
"axios": "^0.18.1",
"axios-retry": "^3.2.4",
"bootstrap-vue": "^2.0.0-rc.19",
"chart.js": "^2.8.0",
"echarts": "^4.6.0",
Expand Down
Loading

0 comments on commit 1cb11e3

Please sign in to comment.