Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filtering #6

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
11 changes: 8 additions & 3 deletions ipf/glue2/abstractservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
from .service import *
from .endpoint import *
from ipf.sysinfo import ResourceName
from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt


#######################################################################################################################
class AbstractService(Data):
def __init__(self, id):
def __init__(self, id, ipfinfo):
Data.__init__(self,id)
self.services = []
self.handles = []
self.ipfinfo = ipfinfo

def add(self, serv):
self.services.append(serv)
Expand All @@ -55,13 +58,14 @@ class AbstractServiceStep(Step):

def __init__(self):
Step.__init__(self)
self.requires = [ResourceName]
self.requires = [IPFInformation,ResourceName]
self.produces = [AbstractService]
self.services = []

def run(self):
self.resource_name = self._getInput(ResourceName).resource_name
servlist = AbstractService(self.resource_name)
self.ipfinfo = [self._getInput(IPFInformation)]
servlist = AbstractService(self.resource_name,self.ipfinfo)
service_paths = []
try:
paths = os.environ["SERVICEPATH"]
Expand Down Expand Up @@ -248,6 +252,7 @@ def get(self):

def toJson(self):
doc = {}
doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo)
doc["StorageService"] = []
doc["ComputingService"] = []
doc["LoginService"] = []
Expand Down
82 changes: 47 additions & 35 deletions ipf/glue2/accelerator_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def run(self):
host_group.ID = "urn:glue2:AcceleratorEnvironment:%s.%s" % (host_group.Name,self.resource_name)
host_group.ManagerID = "urn:glue2:ComputingManager:%s" % (self.resource_name)
self.debug("host_group.id "+host_group.id)
self.debug("host_group.uas "+str(host_group.UsedAcceleratorSlots))

self._output(AcceleratorEnvironments(self.resource_name,host_groups))

Expand Down Expand Up @@ -111,6 +112,10 @@ def _groupHosts(self, hosts):
host_group.TotalInstances += host.TotalInstances
host_group.UsedInstances += host.UsedInstances
host_group.UnavailableInstances += host.UnavailableInstances
#if host_group.UsedAcceleratorSlots is None:
# host_group.UsedAcceleratorSlots = 0
#if host.UsedAcceleratorSlots is None:
# host.UsedAcceleratorSlots = 0
host_group.UsedAcceleratorSlots += host.UsedAcceleratorSlots
if host_group.TotalAcceleratorSlots is None:
host_group.TotalAcceleratorSlots = 0
Expand Down Expand Up @@ -139,6 +144,13 @@ def _goodHost(self, host):
m = re.search("urn:glue2:ComputingShare:(\S+).%s" % self.resource_name,share)
if self._includeQueue(m.group(1)):
return True
# if the host is associated with a partition, check that it is a good one
if len(host.Partitions) == 0:
return True
partition_list = host.Partitions.split(',')
for share in partition_list:
if self._includeQueue(share):
return True
return False

#######################################################################################################################
Expand Down Expand Up @@ -456,37 +468,37 @@ def toJson(self):

#######################################################################################################################

class AcceleratorEnvironmentOgfJson(ResourceOgfJson):
data_cls = AcceleratorEnvironment

def __init__(self, data):
ResourceOgfJson.__init__(self,data)

def get(self):
return json.dumps(self.toJson(),sort_keys=True,indent=4)

def toJson(self):
doc = ResourceOgfJson.toJson(self)

doc["Platform"] = self.data.Platform
if self.data.PhysicalAccelerators is not None:
doc["PhysicalAccelerators"] = self.data.PhysicalAccelerators
if self.data.LogicalAccelerators is not None:
doc["LogicalAccelerators"] = self.data.LogicalAccelerators
if self.data.Vendor is not None:
doc["Vendor"] = self.data.Vendor
if self.data.Model is not None:
doc["Model"] = self.data.Model
if self.data.Version is not None:
doc["Version"] = self.data.Version
if self.data.ClockSpeed is not None:
doc["ClockSpeed"] = self.data.ClockSpeed
if self.data.Memory is not None:
doc["Memory"] = self.data.Memory
if self.data.ComputeCapability is not None:
doc["ComputeCapability"] = self.data.ComputeCapability

return doc
#class AcceleratorEnvironmentOgfJson(ResourceOgfJson):
# data_cls = AcceleratorEnvironment
#
# def __init__(self, data):
# ResourceOgfJson.__init__(self,data)
#
# def get(self):
# return json.dumps(self.toJson(),sort_keys=True,indent=4)
#
# def toJson(self):
# doc = ResourceOgfJson.toJson(self)
#
# doc["Platform"] = self.data.Platform
# if self.data.PhysicalAccelerators is not None:
# doc["PhysicalAccelerators"] = self.data.PhysicalAccelerators
# if self.data.LogicalAccelerators is not None:
# doc["LogicalAccelerators"] = self.data.LogicalAccelerators
# if self.data.Vendor is not None:
# doc["Vendor"] = self.data.Vendor
# if self.data.Model is not None:
# doc["Model"] = self.data.Model
# if self.data.Version is not None:
# doc["Version"] = self.data.Version
# if self.data.ClockSpeed is not None:
# doc["ClockSpeed"] = self.data.ClockSpeed
# if self.data.Memory is not None:
# doc["Memory"] = self.data.Memory
# if self.data.ComputeCapability is not None:
# doc["ComputeCapability"] = self.data.ComputeCapability
#
# return doc

#######################################################################################################################

Expand Down Expand Up @@ -514,10 +526,10 @@ def toJson(self):

#######################################################################################################################

class AcceleratorEnvironments(Data):
def __init__(self, id, accel_envs=[]):
Data.__init__(self,id)
self.accel_envs = accel_envs
#class AcceleratorEnvironments(Data):
# def __init__(self, id, accel_envs=[]):
# Data.__init__(self,id)
# self.accel_envs = accel_envs



Expand Down
8 changes: 6 additions & 2 deletions ipf/glue2/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ipf.error import StepError
from ipf.step import Step
from ipf.sysinfo import ResourceName
from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt

from .entity import *

Expand Down Expand Up @@ -159,12 +160,13 @@ def toJson(self):
#######################################################################################################################

class Applications(Data):
def __init__(self, resource_name):
def __init__(self, resource_name, ipfinfo):
Data.__init__(self)
self.id = resource_name
self.environments = []
self.handles = []
self.resource_name = resource_name
self.ipfinfo = ipfinfo

def add(self, env, handles):
if env.AppVersion is None:
Expand Down Expand Up @@ -207,6 +209,7 @@ def toJson(self):
doc["ApplicationHandle"] = []
for handle in self.data.handles:
doc["ApplicationHandle"].append(ApplicationHandleOgfJson(handle).toJson())
doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo)
return doc

#######################################################################################################################
Expand All @@ -217,13 +220,14 @@ def __init__(self):

self.description = "produces a document containing GLUE 2 ApplicationEnvironment and ApplicationHandle"
self.time_out = 30
self.requires = [ResourceName]
self.requires = [IPFInformation,ResourceName]
self.produces = [Applications]

self.resource_name = None

def run(self):
self.resource_name = self._getInput(ResourceName).resource_name
self.ipfinfo = [self._getInput(IPFInformation)]
self._output(self._run())

def _run(self):
Expand Down
16 changes: 14 additions & 2 deletions ipf/glue2/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from ipf.sysinfo import ResourceName
from ipf.step import Step

from ipf.ipfinfo import IPFInformation, IPFInformationJson, IPFInformationTxt
from computing_activity import ComputingActivities, ComputingActivityTeraGridXml, ComputingActivityOgfJson
from computing_manager import ComputingManager, ComputingManagerTeraGridXml, ComputingManagerOgfJson
from computing_manager_accel_info import ComputingManagerAcceleratorInfo, ComputingManagerAcceleratorInfoOgfJson
Expand All @@ -35,6 +36,8 @@
from execution_environment import ExecutionEnvironmentTeraGridXml
from execution_environment import ExecutionEnvironmentOgfJson
from accelerator_environment import AcceleratorEnvironments
from accelerator_environment import AcceleratorEnvironmentsOgfJson
from accelerator_environment import AcceleratorEnvironment
from accelerator_environment import AcceleratorEnvironmentOgfJson
from location import Location, LocationOgfJson, LocationTeraGridXml
#######################################################################################################################
Expand All @@ -45,13 +48,14 @@ def __init__(self):

self.description = "creates a single data containing all nonsensitive compute-related information"
self.time_out = 5
self.requires = [ResourceName,Location,
self.requires = [IPFInformation,ResourceName,Location,
ComputingService,ComputingShares,ComputingManager,ExecutionEnvironments,AcceleratorEnvironments,ComputingManagerAcceleratorInfo,ComputingShareAcceleratorInfo]
self.produces = [Public]

def run(self):
public = Public()
public.resource_name = self._getInput(ResourceName).resource_name
public.ipfinfo = [self._getInput(IPFInformation)]
# the old TeraGridXML wants a site_name, so just derive it
public.site_name = public.resource_name[public.resource_name.find(".")+1:]
public.location = [self._getInput(Location)]
Expand All @@ -72,6 +76,7 @@ class Public(Data):
def __init__(self):
Data.__init__(self)

self.ipfinfo = []
self.location = []
self.service = []
self.share = []
Expand All @@ -80,6 +85,9 @@ def __init__(self):
self.accelenvironment = []

def fromJson(self, doc):
self.ipfinfo = []
for idoc in doc.get("Ipfinfo",[]):
self.ipfinfo.append(ipfinfo().fromJson(idoc))
self.location = []
for ldoc in doc.get("Location",[]):
self.location.append(Location().fromJson(ldoc))
Expand Down Expand Up @@ -159,6 +167,8 @@ def get(self):
def toJson(self):
doc = {}

if self.data.ipfinfo is not None:
doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo)
if len(self.data.location) > 0:
doc["Location"] = map(lambda location: LocationOgfJson(location).toJson(),self.data.location)
if self.data.service is not None:
Expand Down Expand Up @@ -197,11 +207,12 @@ def __init__(self):

self.description = "creates a single data containing all sensitive compute-related information"
self.time_out = 5
self.requires = [ResourceName,ComputingActivities]
self.requires = [IPFInformation,ResourceName,ComputingActivities]
self.produces = [Private]

def run(self):
private = Private()
private.ipfinfo = [self._getInput(IPFInformation)]
private.resource_name = self._getInput(ResourceName).resource_name
# the old TeraGridXML wants a site_name, so just derive it
private.site_name = private.resource_name[private.resource_name.find(".")+1:]
Expand Down Expand Up @@ -280,6 +291,7 @@ def toJson(self):
if len(self.data.activity) > 0:
doc["ComputingActivity"] = map(lambda activity: ComputingActivityOgfJson(activity).toJson(),
self.data.activity)
doc["PublisherInfo"] = map(lambda ipfinfo: IPFInformationJson(ipfinfo).toJson(), self.data.ipfinfo)
return doc

#######################################################################################################################
15 changes: 0 additions & 15 deletions ipf/glue2/computing_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,6 @@ def _addExecutionEnvironment(self, exec_env):

def _addAcceleratorEnvironment(self, exec_env):
self.ResourceID.append(exec_env.ID)
#self.ComputingManagerAcceleratorInfoID.append(exec_env.ID)
#if exec_env.PhysicalAccelerators is not None:
# if self.TotalPhysicalAccelerators == None:
# self.TotalPhysicalAccelerators = 0
# self.TotalPhysicalAccelerators = self.TotalPhysicalAccelerators + exec_env.TotalInstances * exec_env.PhysicalAccelerators
#if exec_env.LogicalAccelerators is not None:
# if self.TotalLogicalAccelerators == None:
# self.TotalLogicalAccelerators = 0
# self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + exec_env.TotalInstances * exec_env.LogicalAccelerators
# self.TotalSlots = self.TotalLogicalAccelerators

#if len(self.ResourceID) == 1:
# self.Homogeneous = True
#else:
# self.Homogeneous = False

def _addComputingShare(self, share):
if self.SlotsUsedByLocalJobs == None:
Expand Down
2 changes: 1 addition & 1 deletion ipf/glue2/computing_manager_accel_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _addAcceleratorEnvironment(self, accel_env):
if accel_env.LogicalAccelerators is not None:
if self.TotalLogicalAccelerators == None:
self.TotalLogicalAccelerators = 0
self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators
self.TotalLogicalAccelerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators
self.TotalSlots = self.TotalLogicalAccelerators
if accel_env.UsedAcceleratorSlots is not None:
if self.UsedAcceleratorSlots == None:
Expand Down
5 changes: 0 additions & 5 deletions ipf/glue2/computing_share_accel_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ def _addAcceleratorEnvironment(self, accel_env):
if self.TotalAcceleratorSlots == None:
self.TotalAcceleratorSlots = 0
self.TotalAcceleratorSlots = self.TotalAcceleratorSlots + accel_env.TotalAcceleratorSlots
#self.TotalPhysicalAccelerators = self.TotalPhysicalAccelerators + accel_env.TotalInstances * accel_env.PhysicalAccelerators
#if self.TotalLogicalAccelerators == None:
# self.TotalLogicalAccelerators = 0
#self.TotalLogicalAcclerators = self.TotalLogicalAccelerators + accel_env.TotalInstances * accel_env.LogicalAccelerators
#self.TotalSlots = self.TotalLogicalAccelerators
if accel_env.UsedAcceleratorSlots is not None:
if self.UsedAcceleratorSlots == None:
self.UsedAcceleratorSlots = 0
Expand Down
14 changes: 14 additions & 0 deletions ipf/glue2/execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def __init__(self):
self._acceptParameter("queues",
"An expression describing the queues to include (optional). The syntax is a series of +<queue> and -<queue> where <queue> is either a queue name or a '*'. '+' means include '-' means exclude. The expression is processed in order and the value for a queue at the end determines if it is shown.",
False)
self._acceptParameter("partitions",
"An expression describing the partitions to include (optional). The syntax is a series of +<partition> and -<partition> where <partition> is either a partition name or a '*'. '+' means include '-' means exclude. The expression is processed in order and the value for a partition at the end determines if it is shown.",
False)

self.resource_name = None

Expand Down Expand Up @@ -138,6 +141,14 @@ def _goodHost(self, host):
m = re.search("urn:glue2:ComputingShare:(\S+).%s" % self.resource_name,share)
if self._includeQueue(m.group(1)):
return True

# if the host is associated with a partition, check that it is a good one
if len(host.Partitions) == 0:
return True
partition_list = host.Partitions.split(',')
for share in partition_list:
if self._includeQueue(share):
return True
return False

#######################################################################################################################
Expand Down Expand Up @@ -190,6 +201,9 @@ def __init__(self):
self.OSName = sysName.lower()
self.OSVersion = release

#for filtering nodes by partition:
self.Partitions = None # string

def __str__(self):
return json.dumps(ExecutionEnvironmentOgfJson(self).toJson(),sort_keys=True,indent=4)

Expand Down
2 changes: 1 addition & 1 deletion ipf/glue2/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _run(self):

self.support_contact = self.params.get("default_support_contact",False)

apps = application.Applications(self.resource_name)
apps = application.Applications(self.resource_name, self.ipfinfo)

module_paths = []
try:
Expand Down
2 changes: 1 addition & 1 deletion ipf/glue2/openstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def _run(self):
auth_url=auth_url)

# restrict this to public images only?
apps = glue2.application.Applications(self.resource_name)
apps = glue2.application.Applications(self.resource_name,self.ipfinfo)
for image in nova.images.list():
#print(" metadata: %s" % image.metadata)
#print(dir(image))
Expand Down
Loading