forked from 23andMe/yhaplo
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
118 lines (88 loc) · 3.07 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# David Poznik
# 2016.01.05
# utils.py
#
# Defines utility functions and non-application-specific global constants.
#----------------------------------------------------------------------
from __future__ import absolute_import, print_function
import csv
import errno
import gzip
import logging
import os
import re
import sys
#----------------------------------------------------------------------
# constants
DASHES = '-'*72 + '\n'
type2fmtDict = {bool: '%i', int: '%i', str: '%s', float: '%f'}
#----------------------------------------------------------------------
# utility functions
def basenameNoEnding(fn, ending):
'returns the basename of a file and removes the supplied ending'
return os.path.basename(fn)[:(1 - len(ending))]
def checkFileExistence(fn, fileDescription=None):
'exits if file does not exist'
if fileDescription:
message = '%s file not found' % fileDescription
else:
message = 'File not found'
if not os.path.isfile(fn):
sys.exit('\nERROR. %s: %s\n' % (message, fn))
def closeFiles(fileList):
'closes files from list, ignoring any that are set to None'
for File in fileList:
if File:
File.close()
def compressWhitespace(myString):
'replaces whitespace with a single space'
return re.sub(r'\s+', ' ', myString)
def getCSVreader(inFN, delimiter='\t'):
'opens a (possibly gzipped) file and creates a csv reader'
extension = os.path.splitext(inFN)[1]
if extension == '.gz':
try: inFile = gzip.open(inFN, 'rt')
except IOError:
sys.exit('\nERROR. Could not open: %s\n' % inFN)
else:
try: inFile = open(inFN, 'r')
except IOError:
sys.exit('\nERROR. Could not open: %s\n' % inFN)
return inFile, csv.reader(inFile, delimiter=delimiter)
def mkdirP(dirName):
'makes a directory'
try:
os.makedirs(dirName)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(dirName):
pass
else:
raise
def object2fmt(x):
'returns a printf style format string appropriate to the object'
return type2fmtDict[type(x)]
def printAndLogger(message):
'output a message to stdout and to the logger'
print(message)
logging.info(message)
def printIterable(myIterable):
'cycles through an iterable, printing each item'
for item in myIterable:
print(item)
def readPositionsSet(inFN, column = 0, logFunction = None):
'reads positions from the specified column of a file and constructs a set'
positionsSet = set()
checkFileExistence(inFN, 'SNP positions')
with open(inFN, 'r') as inFile:
for line in inFile:
pos = int(line.strip().split()[column])
positionsSet.add(pos)
message = '%5d unique positions read: %s\n' % (len(positionsSet), inFN)
if logFunction:
logFunction(message)
else:
sys.stderr.write(message)
return positionsSet
def unimplementedMessage(methodName):
'emits message and exits'
sys.exit('\n\n! Unimplemented method: %s\nExiting.\n' % methodName)