Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add affinity spec configuration #37

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 13 additions & 40 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package nextflow.nomad

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.nomad.config.AffinitySpec
import nextflow.nomad.config.VolumeSpec

/**
* Nomad Config
Expand All @@ -30,14 +32,6 @@ import groovy.util.logging.Slf4j
class NomadConfig {
final static protected API_VERSION = "v1"

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

final NomadClientOpts clientOpts
final NomadJobOpts jobOpts

Expand Down Expand Up @@ -66,6 +60,7 @@ class NomadConfig {
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec
final AffinitySpec affinitySpec

NomadJobOpts(Map nomadJobOpts){
deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ?
Expand Down Expand Up @@ -93,39 +88,17 @@ class NomadConfig {
}else{
volumeSpec = null
}
}
}

class VolumeSpec{

private String type
private String name

String getType() {
return type
}

String getName() {
return name
}

VolumeSpec type(String type){
this.type = type
this
}

VolumeSpec name(String name){
this.name = name
this
}

protected validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
if( nomadJobOpts.affinity && nomadJobOpts.affinity instanceof Closure){
this.affinitySpec = new AffinitySpec()
def closure = (nomadJobOpts.affinity as Closure)
def clone = closure.rehydrate(this.affinitySpec, closure.owner, closure.thisObject)
clone.resolveStrategy = Closure.DELEGATE_FIRST
clone()
this.affinitySpec.validate()
}else{
affinitySpec = null
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package nextflow.nomad.config

class AffinitySpec{

private String attribute
private String operator
private String value
private Integer weight

String getOperator(){
return operator
}

String getAttribute() {
return attribute
}

String getValue() {
return value
}

Integer getWeight() {
return weight
}

AffinitySpec attribute(String attribute){
this.attribute=attribute
this
}

AffinitySpec operator(String operator){
this.operator = operator
this
}

AffinitySpec value(String value){
this.value = value
this
}

AffinitySpec weight(int weight){
this.weight = weight
this
}

void validate(){
}
}
44 changes: 44 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/config/VolumeSpec.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package nextflow.nomad.config

import nextflow.nomad.NomadConfig

class VolumeSpec {

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

private String type
private String name

String getType() {
return type
}

String getName() {
return name
}

VolumeSpec type(String type){
this.type = type
this
}

VolumeSpec name(String name){
this.name = name
this
}

void validate(){
if( !VOLUME_TYPES.contains(type) ) {
throw new IllegalArgumentException("Volume type $type is not supported")
}
if( !this.name ){
throw new IllegalArgumentException("Volume name is required")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.nomadproject.client.ApiClient
import io.nomadproject.client.api.JobsApi
import io.nomadproject.client.model.Affinity
import io.nomadproject.client.model.AllocationListStub
import io.nomadproject.client.model.Job
import io.nomadproject.client.model.JobRegisterRequest
import io.nomadproject.client.model.JobRegisterResponse
import io.nomadproject.client.model.JobSummary
import io.nomadproject.client.model.ReschedulePolicy
import io.nomadproject.client.model.Resources
import io.nomadproject.client.model.RestartPolicy
Expand All @@ -34,6 +34,7 @@ import io.nomadproject.client.model.TaskGroup
import io.nomadproject.client.model.VolumeMount
import io.nomadproject.client.model.VolumeRequest
import nextflow.nomad.NomadConfig
import nextflow.nomad.config.VolumeSpec
import nextflow.processor.TaskRun
import nextflow.util.MemoryUnit

Expand Down Expand Up @@ -120,7 +121,7 @@ class NomadService implements Closeable{
)


if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand All @@ -130,7 +131,7 @@ class NomadService implements Closeable{
)
}

if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE){
if( config.jobOpts.volumeSpec && config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
Expand Down Expand Up @@ -180,6 +181,23 @@ class NomadService implements Closeable{
volume: config.jobOpts.volumeSpec.name
)]
}

if( config.jobOpts.affinitySpec ){
def affinity = new Affinity()
if(config.jobOpts.affinitySpec.attribute){
affinity.ltarget(config.jobOpts.affinitySpec.attribute)
}

affinity.operand(config.jobOpts.affinitySpec.operator ?: "=")

if(config.jobOpts.affinitySpec.value){
affinity.rtarget(config.jobOpts.affinitySpec.value)
}
if(config.jobOpts.affinitySpec.weight != null){
affinity.weight(config.jobOpts.affinitySpec.weight)
}
taskDef.affinities([affinity])
}
taskDef
}

Expand Down
26 changes: 23 additions & 3 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package nextflow.nomad

import nextflow.nomad.config.VolumeSpec
import spock.lang.Specification

/**
Expand Down Expand Up @@ -130,7 +131,7 @@ class NomadConfigSpec extends Specification {

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -140,7 +141,7 @@ class NomadConfigSpec extends Specification {

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -150,7 +151,7 @@ class NomadConfigSpec extends Specification {

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.type == VolumeSpec.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"

when:
Expand All @@ -161,4 +162,23 @@ class NomadConfigSpec extends Specification {
then:
thrown(IllegalArgumentException)
}

void "should instantiate an affinity spec if specified"() {
when:
def config = new NomadConfig([
jobs: [affinity : {
attribute '${meta.my_custom_value}'
operator ">"
value "3"
weight 50
}]
])

then:
config.jobOpts.affinitySpec
config.jobOpts.affinitySpec.getAttribute() == '${meta.my_custom_value}'
config.jobOpts.affinitySpec.getOperator() == '>'
config.jobOpts.affinitySpec.getValue() == '3'
config.jobOpts.affinitySpec.getWeight() == 50
}
}
Loading