1919import java .time .ZoneOffset ;
2020import java .time .ZonedDateTime ;
2121import java .util .List ;
22+ import java .util .Map ;
2223import java .util .Optional ;
2324import java .util .Properties ;
2425import java .util .stream .Collectors ;
@@ -69,6 +70,7 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer, Configuration config
6970 final List <Integer > ports = getPorts (offer , configuration );
7071 final List <Protos .Resource > resources = getResources (configuration , ports );
7172 final Protos .DiscoveryInfo discovery = getDiscovery (ports , configuration );
73+ final Protos .Labels labels = getLabels (configuration );
7274
7375 final String hostAddress = resolveHostAddress (offer , ports );
7476
@@ -83,6 +85,7 @@ private Protos.TaskInfo buildNativeTask(Protos.Offer offer, Configuration config
8385 .setSlaveId (offer .getSlaveId ())
8486 .addAllResources (resources )
8587 .setDiscovery (discovery )
88+ .setLabels (labels )
8689 .setCommand (nativeCommand (configuration , args , elasticSearchNodeId ))
8790 .build ();
8891 }
@@ -91,6 +94,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
9194 final List <Integer > ports = getPorts (offer , configuration );
9295 final List <Protos .Resource > resources = getResources (configuration , ports );
9396 final Protos .DiscoveryInfo discovery = getDiscovery (ports , configuration );
97+ final Protos .Labels labels = getLabels (configuration );
9498
9599 final String hostAddress = resolveHostAddress (offer , ports );
96100
@@ -107,6 +111,7 @@ private Protos.TaskInfo buildDockerTask(Protos.Offer offer, Configuration config
107111 .setSlaveId (offer .getSlaveId ())
108112 .addAllResources (resources )
109113 .setDiscovery (discovery )
114+ .setLabels (labels )
110115 .setCommand (dockerCommand (configuration , args , elasticSearchNodeId ))
111116 .setContainer (containerInfo )
112117 .build ();
@@ -125,8 +130,7 @@ private List<Integer> getPorts(Protos.Offer offer, Configuration configuration)
125130 if (elasticsearchPorts .isEmpty () || elasticsearchPorts .stream ().allMatch (port -> port == 0 )) {
126131 //No ports requested by user or two random ports requested
127132 ports = Resources .selectTwoPortsFromRange (offer .getResourcesList ());
128- }
129- else {
133+ } else {
130134 //Replace a user requested port 0 with a random port
131135 ports = elasticsearchPorts .stream ().map (port -> port != 0 ? port : Resources .selectOnePortFromRange (offer .getResourcesList ())).collect (Collectors .toList ());
132136 }
@@ -143,14 +147,29 @@ private List<Protos.Resource> getResources(Configuration configuration, List<Int
143147 private Protos .DiscoveryInfo getDiscovery (List <Integer > ports , Configuration configuration ) {
144148 Protos .DiscoveryInfo .Builder discovery = Protos .DiscoveryInfo .newBuilder ();
145149 Protos .Ports .Builder discoveryPorts = Protos .Ports .newBuilder ();
146- discoveryPorts .addPorts (Discovery .CLIENT_PORT_INDEX , Protos .Port .newBuilder ().setNumber (ports .get (0 )).setName (Discovery .CLIENT_PORT_NAME ).setProtocol ("TCP " ));
147- discoveryPorts .addPorts (Discovery .TRANSPORT_PORT_INDEX , Protos .Port .newBuilder ().setNumber (ports .get (1 )).setName (Discovery .TRANSPORT_PORT_NAME ).setProtocol ("TCP " ));
150+ discoveryPorts .addPorts (Discovery .CLIENT_PORT_INDEX , Protos .Port .newBuilder ().setNumber (ports .get (0 )).setName (Discovery .CLIENT_PORT_NAME ).setProtocol ("tcp " ));
151+ discoveryPorts .addPorts (Discovery .TRANSPORT_PORT_INDEX , Protos .Port .newBuilder ().setNumber (ports .get (1 )).setName (Discovery .TRANSPORT_PORT_NAME ).setProtocol ("tcp " ));
148152 discovery .setPorts (discoveryPorts );
149153 discovery .setVisibility (Protos .DiscoveryInfo .Visibility .EXTERNAL );
150154 discovery .setName (configuration .getTaskName ());
151155 return discovery .build ();
152156 }
153157
158+ private Protos .Labels getLabels (Configuration configuration ) {
159+ Protos .Labels .Builder builder = Protos .Labels .newBuilder ();
160+ Map <String , String > labels = configuration .getTaskLabels ();
161+ for (Map .Entry <String , String > kvp : labels .entrySet ()) {
162+ Protos .Label label = Protos .Label .newBuilder ()
163+ .setKey (kvp .getKey ())
164+ .setValue (kvp .getValue ())
165+ .build ();
166+
167+ builder .addLabels (label );
168+ }
169+
170+ return builder .build ();
171+ }
172+
154173 private Protos .ContainerInfo getContainer (Configuration configuration , Protos .TaskID taskID , Long elasticSearchNodeId , Protos .SlaveID slaveID ) {
155174 final Protos .Environment environment = Protos .Environment .newBuilder ().addAllVariables (new ExecutorEnvironmentalVariables (configuration , elasticSearchNodeId ).getList ()).build ();
156175 final Protos .ContainerInfo .DockerInfo .Builder dockerInfo = Protos .ContainerInfo .DockerInfo .newBuilder ()
0 commit comments