Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version update #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ PLUGIN_SFTOCF=True
SFUSER='starfish_username'
SFPASS='starfish_password'
```

2. In `coldfront/config/plugins/`, create file `sftocf.py` with the following contents:

```py
from coldfront.config.base import INSTALLED_APPS
from coldfront.config.env import ENV

INSTALLED_APPS += [ 'coldfront.plugins.sftocf' ]
INSTALLED_APPS += [ 'coldfront.plugins.sf_to_cf' ]

SFUSER = ENV.str('SFUSER')
SFPASS = ENV.str('SFPASS')
Expand All @@ -54,13 +56,21 @@ data using the `clean` parameter.

### DjangoQ Integration

You can schedule your Starfish data pull using DjangoQ by adding the following code to
`coldfront/core/utils/management/commands/add_scheduled_tasks.py`:
You can schedule your Starfish data pull using DjangoQ by:

A. Going to "scheduled tasks" in the adminland DjangoQ section and adding a task
with the func value of `coldfront.plugins.sftocf.tasks.pull_sf_push_cf` (and then
adding any further )

B. adding the following code to
`coldfront/core/utils/management/commands/add_scheduled_tasks.py` and then running
the add_scheduled_tasks command:

```py
if 'coldfront.plugins.sftocf' in settings.INSTALLED_APPS:
schedule('coldfront.plugins.sftocf.tasks.pull_sf_push_cf',
schedule_type=Schedule.WEEKLY,
repeats=-1,
next_run=timezone.now() + datetime.timedelta(days=1))
```
```py
# adds a task scheduled to run weekly
if 'coldfront.plugins.sftocf' in settings.INSTALLED_APPS:
schedule('coldfront.plugins.sftocf.tasks.pull_sf_push_cf',
schedule_type=Schedule.WEEKLY,
repeats=-1,
next_run=timezone.now() + datetime.timedelta(days=1))
```
6 changes: 3 additions & 3 deletions management/commands/pull_sf_push_cf.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from coldfront.plugins.sftocf.utils import ColdFrontDB
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import BaseCommand
import logging


logger = logging.getLogger(__name__)

class Command(BaseCommand):
'''
"""
Collect usage data from Starfish and insert it into the Coldfront database.
'''
"""

def add_arguments(self, parser):
parser.add_argument(
Expand Down
8 changes: 4 additions & 4 deletions servers.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

{
"servername": {
"url":"https://my_url.edu",
"servername_here": {
"url":"https://my_url_here",
"volumes":{
"volumename1":["pathname"],
"volumename2":["pathname"]
"volumename1_here":["pathname_here"],
"volumename2_here":["pathname_here"]
}
}
}
3 changes: 1 addition & 2 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os

from coldfront.plugins.sftocf.pipeline import ColdFrontDB
from coldfront.plugins.sftocf.utils import ColdFrontDB

logger = logging.getLogger(__name__)

Expand Down
171 changes: 111 additions & 60 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
from datetime import datetime, timedelta

import requests
import pandas as pd
from django.utils import timezone
from django.contrib.auth import get_user_model

from coldfront.core.utils.common import import_from_settings
from coldfront.core.project.models import Project
from coldfront.core.allocation.models import (Allocation,
AllocationUser,
AllocationUserStatusChoice)
from coldfront.core.allocation.models import Allocation, AllocationUserStatusChoice

MISSING_DATA_DIR = './local_data/missing/'


datestr = datetime.today().strftime("%Y%m%d")
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -146,7 +148,6 @@ def post_async_query(self, query, group_by, volpath):
"""Post an asynchronous query through the Starfish API.
"""
query_url = self.api_url + "async/query/"

params = {
"volumes_and_paths": volpath,
"queries": query,
Expand Down Expand Up @@ -200,7 +201,7 @@ class ColdFrontDB:
check_volume_collection(self, lr, homepath="./coldfront/plugins/sftocf/data/")
pull_sf(self, volume=None)
push_cf(self, filepaths, clean)
update_usage(self, user, userdict, allocation)
update_user_usage(self, user, userdict, allocation)
"""

@record_process
Expand All @@ -217,22 +218,20 @@ def produce_lab_dict(self, vol):
"lab_name": [("volume", "tier"),("volume", "tier")]
"""
pr_objs = Allocation.objects.only("id", "project")
pr_dict = {}
for alloc in pr_objs:
proj_name = alloc.project.title
resource_list = alloc.get_resources_as_string.split(', ')
if proj_name not in pr_dict:
pr_dict[proj_name] = resource_list
else:
pr_dict[proj_name].extend(resource_list)
lab_res = pr_dict if not vol else {p:[i for i in r if vol in i] for p, r in pr_dict.items()}
labs_resources = {p:[tuple(rs.split("/")) for rs in r] for p, r in lab_res.items()}
pr_dict = {allocation.project.title: [] for allocation in pr_objs}
for allocation in pr_objs:
proj_name = allocation.project.title
resource_list = allocation.get_resources_as_string.split(", ")
pr_dict[proj_name].extend(resource_list)
labs_resources = pr_dict if not vol else {
p:[i for i in r if vol in i] for p, r in pr_dict.items()
}
logger.debug("labs_resources:\n%s", labs_resources)
return labs_resources


def check_volume_collection(self, lr, homepath="./coldfront/plugins/sftocf/data/"):
'''
"""
for each lab-resource combination in parameter lr, check existence of corresponding
file in data path. If a file for that lab-resource combination that is <2 days old
exists, mark it as collected. If not, slate lab-resource combination for collection.
Expand All @@ -248,10 +247,10 @@ def check_volume_collection(self, lr, homepath="./coldfront/plugins/sftocf/data/
List of lab usage files that have already been created.
to_collect : list
list of tuples - (labname, volume, tier, filename)
'''
"""
filepaths = []
to_collect = []
labs_resources = [(l, res[0], res[1]) for l, r in lr.items() for res in r]
labs_resources = [(l, res) for l, r in lr.items() for res in r]
logger.debug("labs_resources:%s", labs_resources)

yesterdaystr = (datetime.today()-timedelta(1)).strftime("%Y%m%d")
Expand All @@ -260,14 +259,13 @@ def check_volume_collection(self, lr, homepath="./coldfront/plugins/sftocf/data/
for lr_pair in labs_resources:
lab = lr_pair[0]
resource = lr_pair[1]
tier = lr_pair[2]
fpaths = [f"{homepath}{lab}_{resource}_{n}.json" for n in dates]
if any(Path(fpath).exists() for fpath in fpaths):
for fpath in fpaths:
if Path(fpath).exists():
filepaths.append(fpath)
else:
to_collect.append((lab, resource, tier, fpaths[-1],))
to_collect.append((lab, resource, fpaths[-1],))

return filepaths, to_collect

Expand All @@ -282,7 +280,9 @@ def pull_sf(self, volume=None):
filepaths, to_collect = self.check_volume_collection(lab_res)
# 3. produce set of all volumes to be queried
vol_set = {i[1] for i in to_collect}
servers_vols = [(k, vol) for k, v in svp.items() for vol in vol_set if vol in v['volumes']]
servers_vols = [
(k, vol) for k, v in svp.items() for vol in vol_set if vol in v['volumes']
]
for server_vol in servers_vols:
srv = server_vol[0]
vol = server_vol[1]
Expand All @@ -303,54 +303,52 @@ def push_cf(self, filepaths, clean):
for file in filepaths:
content = read_json(file)
usernames = [d['username'] for d in content['contents']]
resource = content['volume'] + "/" + content['tier']
resource = content['volume']

user_models = get_user_model().objects.only("id","username")\
.filter(username__in=usernames)
log_missing_user_models(content["project"], user_models, usernames)
user_models, missing_usernames = id_present_missing_users(usernames)
print("missing_usernames", missing_usernames)
log_missing('user', missing_usernames)

project = Project.objects.get(title=content["project"])
project = Project.objects.get(title=content['project'])
# find project allocation
allocations = Allocation.objects.filter(project=project, resources__name=resource)
if Allocation.MultipleObjectsReturned.count() > 1:
allocations = Allocation.objects.filter(
project=project, resources__name=resource
)
if Allocation.MultipleObjectsReturned:
logger.warning(' '.join(["WARNING: Multiple allocations found"
"for project id %s, resource %s. Updating the first."]),
project.id, resource)
allocation = allocations.first()
logger.debug("%s\nusernames: %s\nuser_models: %s",
project.title, usernames, [u.username for u in user_models])
logger.debug(
"%s\nusernames: %s\nuser_models: %s",
project.title, usernames, [u.username for u in user_models]
)

for user in user_models:
userdict = [d for d in content['contents'] if d["username"] == user.username][0]
model = user_models.get(username=userdict["username"])
self.update_usage(model, userdict, allocation)
userdict = [d for d in content['contents'] if d['username'] == user.username][0]
model = user_models.get(username=userdict['username'])
self.update_user_usage(model, userdict, allocation)
if clean:
os.remove(file)
logger.debug("push_cf complete")


def update_usage(self, user, userdict, allocation):
def update_user_usage(self, user, userdict, allocation):
"""Update usage, unit, and usage_bytes values for the designated user.
"""
usage, unit = split_num_string(userdict["size_sum_hum"])
logger.debug("entering for user: %s", user.username)
try:
allocationuser = AllocationUser.objects.get(
allocation=allocation, user=user
)
except AllocationUser.DoesNotExist:
logger.info("creating allocation user:")
AllocationUser.objects.create(
allocation=allocation,
created=timezone.now(),
status=AllocationUserStatusChoice.objects.get(name='Active'),
user=user
)
allocationuser = AllocationUser.objects.get(
allocation=allocation, user=user
)

allocationuser.usage_bytes = userdict["size_sum"]
allocationuser, created = allocation.allocationuser_set.get_or_create(
user=user,
defaults={
'created': timezone.now(),
'status': AllocationUserStatusChoice.objects.get(name='Active')
}
)
if created:
logger.info('allocation user %s created', allocationuser)
allocationuser.usage_bytes = userdict['size_sum']
allocationuser.usage = usage
allocationuser.unit = unit
# automatically updates "modified" field & adds old record to history
Expand Down Expand Up @@ -430,8 +428,8 @@ def collect_starfish_usage(server, volume, volumepath, projects):
logger.debug("projects: %s", projects)
for project in projects:
p = project[0]
tier = project[2]
filepath = project[3]
tier = project[1]
filepath = project[2]
lab_volpath = volumepath[1] if "_l3" in p else volumepath[0]
logger.debug("filepath: %s lab: %s volpath: %s", filepath, p, lab_volpath)
usage_query = server.create_query(
Expand Down Expand Up @@ -460,15 +458,68 @@ def collect_starfish_usage(server, volume, volumepath, projects):
return filepaths


def log_missing_user_models(groupname, user_models, usernames):
"""Identify and record any usernames that lack a matching user_models entry.

def id_present_missing_users(username_list):
"""
Collect all User entries with usernames in param username_list; return tuple
of all matching User entries and all usernames with no User entries.
"""
present_users = get_user_model().objects.filter(username__in=username_list)
present_usernames = list(present_users.values_list('username', flat=True))
missing_usernames = [
{'username': name} for name in username_list if name not in present_usernames
]
return (present_users, missing_usernames)


def update_csv(new_entries, dirpath, csv_name, date_update='date'):
"""Add or update entries in CSV, order CSV by descending date and save.

Parameters
----------
new_entries : list of dicts
identifying information to record for missing entries:
for users, "username".
for projects, "title".
for allocations, "resource_name", "project_title", and "path".
dirpath : str
csv_name : str
date_update : str
"""
if new_entries:
locate_or_create_dirpath(dirpath)
fpath = f'{dirpath}{csv_name}'
try:
df = pd.read_csv(fpath, parse_dates=[date_update])
except FileNotFoundError:
df = pd.DataFrame()
except ValueError:
df = pd.read_csv(fpath)
new_records = pd.DataFrame(new_entries)
col_checks = new_records.columns.values.tolist()
new_records[date_update] = datetime.today()
updated_df = (pd.concat([df, new_records])
.drop_duplicates(col_checks, keep='last')
.sort_values(date_update, ascending=False)
.reset_index(drop=True))
updated_df.to_csv(fpath, index=False)


def log_missing(modelname, missing):
"""log missing entries for a given Coldfront model.
Add or update entries in CSV, order CSV by descending date and save.

Parameters
----------
modelname : string
lowercase name of the Coldfront model for "missing"
missing : list of dicts
identifying information to record for missing entries:
for users, "username".
for projects, "title".
for allocations, "resource_name", "project_title", and "path".
"""
missing_unames = [u for u in usernames if u not in [m.username for m in user_models]]
if missing_unames:
fpath = './coldfront/plugins/sftocf/data/missing_users.csv'
patterns = [f"{groupname},{uname},{datestr}" for uname in missing_unames]
write_update_file_line(fpath, patterns)
logger.warning("no User entry found for users: %s", missing_unames)
update_csv(missing, MISSING_DATA_DIR, f'missing_{modelname}s.csv')


def generate_headers(token):
Expand Down