Skip to content

Commit 4f3446d

Browse files
author
tangzhankun
authored
Merge pull request #2 from tangzhankun/zhankun-add-scheduler
Modify fair scheduler and DS
2 parents 4e24605 + 34f5486 commit 4f3446d

File tree

26 files changed

+583
-407
lines changed

26 files changed

+583
-407
lines changed
Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,36 @@
11
package org.apache.hadoop.yarn.api.records;
22

3-
public class FPGASlot {
3+
public class FPGASlot implements Comparable<FPGASlot>{
4+
5+
private FPGAType fpgaType;
6+
private String slotId;
7+
private String afuId;
8+
9+
public static FPGASlot newInstance(FPGAType type, String slotId, String afuId) {
10+
FPGASlot fpgaSlot = new FPGASlot();
11+
fpgaSlot.setFpgaType(type);
12+
fpgaSlot.setSlotId(slotId);
13+
fpgaSlot.setAfuId(afuId);
14+
return fpgaSlot;
15+
}
416

5-
private final FPGAType fpgaType;
6-
private final String socketId;
7-
private final String slotId;
8-
private final String afuId;
17+
public static FPGASlot newInstance(FPGASlot a) {
18+
FPGASlot fpgaSlot = new FPGASlot();
19+
fpgaSlot.setSlotId(a.getSlotId());
20+
fpgaSlot.setFpgaType(a.getFpgaType());
21+
fpgaSlot.setAfuId(a.getAfuId());
22+
return fpgaSlot;
23+
}
924

10-
private FPGASlot(Builder builder) {
11-
this.fpgaType = builder.fpgaType;
12-
this.socketId = builder.socketId;
13-
this.slotId = builder.slotId;
14-
this.afuId = builder.afuId;
25+
public static FPGASlot newInstance() {
26+
FPGASlot fpgaSlot = new FPGASlot();
27+
return fpgaSlot;
1528
}
1629

1730
public FPGAType getFpgaType() {
1831
return fpgaType;
1932
}
2033

21-
public String getSocketId() {
22-
return socketId;
23-
}
24-
2534
public String getSlotId() {
2635
return slotId;
2736
}
@@ -31,41 +40,54 @@ public String getAfuId() {
3140
}
3241

3342
@Override
34-
public String toString() {
35-
return "fpga type: " + fpgaType + " socket id: " + socketId + " slot id: " + slotId + " afu id: " + afuId + ".";
43+
public boolean equals(Object obj) {
44+
if (this == obj)
45+
return true;
46+
if (obj == null)
47+
return false;
48+
if (!(obj instanceof FPGASlot))
49+
return false;
50+
FPGASlot other = (FPGASlot) obj;
51+
return !(getFpgaType() != other.getFpgaType()
52+
|| !getAfuId().equals(other.getAfuId())
53+
|| !getSlotId().equals(other.getSlotId()));
3654
}
3755

38-
public static class Builder {
39-
40-
private FPGAType fpgaType;
41-
private String socketId;
42-
private String slotId;
43-
private String afuId;
44-
45-
public Builder fpgaType(FPGAType fpgaType) {
46-
this.fpgaType = fpgaType;
47-
return this;
48-
}
49-
50-
public Builder socketId(String socketId) {
51-
this.socketId = socketId;
52-
return this;
53-
}
56+
@Override
57+
public int hashCode() {
58+
final int prime = 263167;
59+
int result = 939769357 + fpgaType.hashCode();
60+
result = prime * result + slotId.hashCode();
61+
result = prime * result + afuId.hashCode();
62+
return result;
63+
}
5464

55-
public Builder slotId(String slotId) {
56-
this.slotId = slotId;
57-
return this;
58-
}
65+
@Override
66+
public String toString() {
67+
return "fpga type: " + fpgaType + " slot id: " + slotId + " afu id: " + afuId + ".";
68+
}
5969

60-
public Builder afuId(String afuId) {
61-
this.afuId = afuId;
62-
return this;
70+
@Override
71+
public int compareTo(FPGASlot o) {
72+
int diff = this.fpgaType.compareTo(o.fpgaType);
73+
if (diff == 0) {
74+
diff = this.slotId.compareTo(o.slotId);
75+
if (diff == 0) {
76+
diff = this.afuId.compareTo(o.afuId);
77+
}
6378
}
79+
return diff;
80+
}
6481

65-
synchronized public FPGASlot build() {
66-
return new FPGASlot(this);
67-
}
82+
public void setFpgaType(FPGAType fpgaType) {
83+
this.fpgaType = fpgaType;
84+
}
6885

86+
public void setSlotId(String slotId) {
87+
this.slotId = slotId;
6988
}
7089

90+
public void setAfuId(String afuId) {
91+
this.afuId = afuId;
92+
}
7193
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/FPGAType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ public enum FPGAType {
88

99
RSD,
1010

11-
PHI
11+
PHI,
12+
13+
UNKNOWN,
1214
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,28 +24,30 @@
2424
import org.apache.hadoop.classification.InterfaceStability.Stable;
2525
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
2626
import org.apache.hadoop.yarn.util.Records;
27-
import java.util.List;
27+
28+
import java.util.Iterator;
29+
import java.util.Set;
2830

2931
/**
3032
* <p><code>Resource</code> models a set of computer resources in the
3133
* cluster.</p>
32-
*
34+
*
3335
* <p>Currently it models both <em>memory</em> and <em>CPU</em>.</p>
34-
*
36+
*
3537
* <p>The unit for memory is megabytes. CPU is modeled with virtual cores
3638
* (vcores), a unit for expressing parallelism. A node's capacity should
3739
* be configured with virtual cores equal to its number of physical cores. A
3840
* container should be requested with the number of cores it can saturate, i.e.
3941
* the average number of threads it expects to have runnable at a time.</p>
40-
*
42+
*
4143
* <p>Virtual cores take integer values and thus currently CPU-scheduling is
4244
* very coarse. A complementary axis for CPU requests that represents processing
4345
* power will likely be added in the future to enable finer-grained resource
4446
* configuration.</p>
45-
*
47+
*
4648
* <p>Typically, applications request <code>Resource</code> of suitable
4749
* capability to run their component tasks.</p>
48-
*
50+
*
4951
* @see ResourceRequest
5052
* @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest)
5153
*/
@@ -73,7 +75,7 @@ public static Resource newInstance(long memory, int vCores) {
7375

7476
@Public
7577
@Stable
76-
public static Resource newInstance(int memory, int vCores, List<FPGASlot> fpgaSlots) {
78+
public static Resource newInstance(int memory, int vCores, Set<FPGASlot> fpgaSlots) {
7779
Resource resource = Records.newRecord(Resource.class);
7880
resource.setMemorySize(memory);
7981
resource.setVirtualCores(vCores);
@@ -83,7 +85,7 @@ public static Resource newInstance(int memory, int vCores, List<FPGASlot> fpgaSl
8385

8486
@Public
8587
@Stable
86-
public static Resource newInstance(long memory, int vCores, List<FPGASlot> fpgaSlots) {
88+
public static Resource newInstance(long memory, int vCores, Set<FPGASlot> fpgaSlots) {
8789
Resource resource = Records.newRecord(Resource.class);
8890
resource.setMemorySize(memory);
8991
resource.setVirtualCores(vCores);
@@ -110,7 +112,7 @@ public static Resource newInstance(long memory, int vCores, List<FPGASlot> fpgaS
110112
@Stable
111113
public long getMemorySize() {
112114
throw new NotImplementedException(
113-
"This method is implemented by ResourcePBImpl");
115+
"This method is implemented by ResourcePBImpl");
114116
}
115117

116118
/**
@@ -129,32 +131,32 @@ public long getMemorySize() {
129131
@Stable
130132
public void setMemorySize(long memory) {
131133
throw new NotImplementedException(
132-
"This method is implemented by ResourcePBImpl");
134+
"This method is implemented by ResourcePBImpl");
133135
}
134136

135137

136138
/**
137139
* Get <em>number of virtual cpu cores</em> of the resource.
138-
*
140+
*
139141
* Virtual cores are a unit for expressing CPU parallelism. A node's capacity
140142
* should be configured with virtual cores equal to its number of physical cores.
141143
* A container should be requested with the number of cores it can saturate, i.e.
142144
* the average number of threads it expects to have runnable at a time.
143-
*
145+
*
144146
* @return <em>num of virtual cpu cores</em> of the resource
145147
*/
146148
@Public
147149
@Evolving
148150
public abstract int getVirtualCores();
149-
151+
150152
/**
151153
* Set <em>number of virtual cpu cores</em> of the resource.
152-
*
154+
*
153155
* Virtual cores are a unit for expressing CPU parallelism. A node's capacity
154156
* should be configured with virtual cores equal to its number of physical cores.
155157
* A container should be requested with the number of cores it can saturate, i.e.
156158
* the average number of threads it expects to have runnable at a time.
157-
*
159+
*
158160
* @param vCores <em>number of virtual cpu cores</em> of the resource
159161
*/
160162

@@ -164,19 +166,22 @@ public void setMemorySize(long memory) {
164166

165167
@Public
166168
@Evolving
167-
public abstract List<FPGASlot> getFPGASlots();
169+
public abstract Set<FPGASlot> getFPGASlots();
168170

169171
@Public
170172
@Evolving
171-
public abstract void setFPGASlots(List<FPGASlot> fpgaSlots);
173+
public abstract void setFPGASlots(Set<FPGASlot> fpgaSlots);
172174

173175
@Override
174176
public int hashCode() {
175177
final int prime = 263167;
176178

177179
int result = (int) (939769357
178-
+ getMemorySize()); // prime * result = 939769357 initially
180+
+ getMemorySize()); // prime * result = 939769357 initially
179181
result = prime * result + getVirtualCores();
182+
for (FPGASlot fpgaSlot : getFPGASlots()) {
183+
result = prime * result + fpgaSlot.hashCode();
184+
}
180185
return result;
181186
}
182187

@@ -190,23 +195,48 @@ public boolean equals(Object obj) {
190195
return false;
191196
Resource other = (Resource) obj;
192197
if (getMemorySize() != other.getMemorySize() ||
193-
getVirtualCores() != other.getVirtualCores()) {
198+
getVirtualCores() != other.getVirtualCores()) {
194199
return false;
195200
}
196-
return true;
201+
return this.getFPGASlots().equals(other.getFPGASlots());
197202
}
198203

199204
@Override
200205
public String toString() {
201206
StringBuilder fpgaInfo = new StringBuilder();
202207

203-
List<FPGASlot> fpgaSlots = getFPGASlots();
208+
Set<FPGASlot> fpgaSlots = getFPGASlots();
209+
if (fpgaSlots == null) {
210+
return "<memory:" + getMemorySize() + ", vCores:" + getVirtualCores() + ">";
211+
}
204212
fpgaInfo.append("\t\t\t\tFPGA accelerator number:" + fpgaSlots.size() + "\n");
213+
if (fpgaSlots.size() > 0) {
214+
fpgaInfo.append("\t\t\t\tFPGA accelerator details: \n");
215+
for (FPGASlot fpgaSlot : fpgaSlots) {
216+
fpgaInfo.append("\t\t\t\t fpga type:" + fpgaSlot.getFpgaType() + ", slot id:" + fpgaSlot.getSlotId() + ", afu id:" + fpgaSlot.getAfuId() + "\n");
217+
}
218+
}
219+
return "<memory:" + getMemorySize() + ", vCores:" + getVirtualCores() + ">\n" + fpgaInfo;
220+
}
205221

206-
fpgaInfo.append("\t\t\t\tFPGA accelerator details: \n");
207-
for(FPGASlot fpgaSlot : fpgaSlots) {
208-
fpgaInfo.append("\t\t\t\t fpga type:" + fpgaSlot.getFpgaType() + ", socket id:" + fpgaSlot.getSocketId() + ", slot id:" + fpgaSlot.getSlotId() + ", afu id:" + fpgaSlot.getAfuId() + "\n");
222+
@Override
223+
public int compareTo(Resource o) {
224+
Set<FPGASlot> thisFPGA = this.getFPGASlots();
225+
Set<FPGASlot> otherFPGA = o.getFPGASlots();
226+
int diff = thisFPGA.size() - otherFPGA.size();
227+
if (diff == 0) {
228+
diff = this.getVirtualCores() - o.getVirtualCores();
229+
if (diff == 0) {
230+
diff = this.getMemory() - o.getMemory();
231+
if (diff == 0) {
232+
Iterator<FPGASlot> it1 = thisFPGA.iterator();
233+
Iterator<FPGASlot> it2 = otherFPGA.iterator();
234+
for (; it1.hasNext();) {
235+
diff = it1.next().compareTo(it2.next());
236+
}
237+
}
238+
}
209239
}
210-
return "<memory:" + getMemorySize() + ", vCores:" + getVirtualCores() + ">\n" + fpgaInfo ;
240+
return diff;
211241
}
212242
}

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1123,7 +1123,7 @@ public static boolean isAclEnabled(Configuration conf) {
11231123
public static final int DEFAULT_NM_VCORES = 8;
11241124

11251125
public static final String NM_FPGA = NM_PREFIX + "resource.fpga";
1126-
public static final String DEFAULT_NM_FPGA = "MCP:0:1:88f2b0e6-b3d0-4b18-a7e1-0b53eaaf74e1,DSC:0:1:50d64a0f-da0d-4057-ba94-67290d25eeee";
1126+
public static final String DEFAULT_NM_FPGA = "MCP:0:88f2b0e6-b3d0-4b18-a7e1-0b53eaaf74e1,DSC:0:50d64a0f-da0d-4057-ba94-67290d25eeee,MCP:1:00000000-0000-0000-0000-000000000000";
11271127

11281128
/** Count logical processors(like hyperthreads) as cores. */
11291129
public static final String NM_COUNT_LOGICAL_PROCESSORS_AS_CORES = NM_PREFIX

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ enum FPGATypeProto {
6464
MCP = 2;
6565
RSD = 3;
6666
PHI = 4;
67+
UNKNOWN = 5;
6768
}
6869

6970
message FPGAOptionProto {
7071
optional FPGATypeProto fpga_type = 1;
71-
optional string socket_id = 2;
7272
optional string slot_id = 3;
7373
optional string afu_id = 4;
7474
}
@@ -79,10 +79,6 @@ message ResourceUtilizationProto {
7979
optional float cpu = 3;
8080
}
8181

82-
message FPGAResourceProto {
83-
optional string type = 1;
84-
optional string accelerator = 2;
85-
}
8682

8783
message ResourceOptionProto {
8884
optional ResourceProto resource = 1;

0 commit comments

Comments
 (0)