"""
Utility things
"""
from __future__ import absolute_import
import os
import tarfile
import zipfile
from os.path import join, abspath, expanduser
from subprocess import Popen, CalledProcessError, PIPE
try:
import lzma
except ImportError:
try:
import backports.lzma as lzma
except ImportError:
lzma = None
try:
from configparser import ConfigParser, NoSectionError, NoOptionError
except ImportError:
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from six import iteritems, string_types
UNSUPPORTED_ARCHIVE_MESSAGE = "Missing support for '{arctype}' archives, use `pip install starforge[{extra}]` to install"
[docs]def dict_merge(old, new):
"""Recursive dictionary merge, values in `new` will replace values of
conflicting keys in `old`.
"""
for (k, v) in iteritems(new):
if type(v) == dict:
if k in old:
dict_merge(old[k], new[k])
else:
old[k] = v
else:
old[k] = v
[docs]def xdg_config_file(name='config.yml'):
config_home = expanduser(os.environ.get('XDG_CONFIG_HOME', '~/.config'))
return abspath(join(config_home, 'galaxy-starforge', name))
[docs]def xdg_data_dir():
data_home = expanduser(os.environ.get('XDG_DATA_HOME', '~/.local/share/'))
return abspath(join(data_home, 'galaxy-starforge'))
[docs]def check_output(*popenargs, **kwargs):
""" From Python 2.7
"""
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
process = Popen(stdout=PIPE, *popenargs, **kwargs)
output, unused_err = process.communicate()
retcode = process.poll()
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd, output=output)
return output
# asbool implementation pulled from PasteDeploy
truthy = frozenset(['true', 'yes', 'on', 'y', 't', '1'])
falsy = frozenset(['false', 'no', 'off', 'n', 'f', '0'])
[docs]def asbool(obj):
if isinstance(obj, string_types):
obj = obj.strip().lower()
if obj in truthy:
return True
elif obj in falsy:
return False
else:
raise ValueError("String is not true/false: %r" % obj)
return bool(obj)
[docs]class Archive(object):
def __init__(self, arcfile):
self.__roots = set()
if tarfile.is_tarfile(arcfile):
self.arctype = 'tar'
self.arc = tarfile.open(arcfile)
elif zipfile.is_zipfile(arcfile):
self.arctype = 'zip'
self.arc = zipfile.ZipFile(arcfile)
elif arcfile.endswith('.tar.xz'):
self.arctype = 'tar'
self.arc = tarfile.open(fileobj=lzma.open(arcfile))
else:
raise Exception('Unknown archive type: %s' % arcfile)
[docs] @classmethod
def open(cls, arcfile):
return cls(arcfile)
@property
def roots(self):
if not self.__roots:
for name in self.getnames():
self.__roots.add(name.split(os.sep, 1)[0])
return self.__roots
[docs] def getnames(self):
if self.arctype == 'tar':
return self.arc.getnames()
elif self.arctype == 'zip':
return self.arc.namelist()
@property
def universal(self):
""" Return true if this archive contains an sdist for a universal wheel
"""
assert len(self.roots) == 1, 'Cannot check archives with != 1 root'
root = next(iter(self.roots))
setup_cfg = join(root, 'setup.cfg')
try:
fh = self.extractfile(setup_cfg)
cp = ConfigParser()
cp.readfp(fh)
try:
universal = cp.get('bdist_wheel', 'universal')
except NoSectionError:
# this isn't documented but works, and PasteDeploy uses it
universal = cp.get('wheel', 'universal')
return asbool(universal)
except (KeyError, NoSectionError, NoOptionError):
return False
[docs]class UnsupportedArchiveModule(object):
def __init__(self, arctype, extra):
self.arctype = arctype
self.extra = extra
[docs] def open(self, *args, **kwargs):
raise UnsupportedArchiveType(
UNSUPPORTED_ARCHIVE_MESSAGE.format(
arctype=self.arctype,
extra=self.extra,
)
)
[docs]class UnsupportedArchiveType(Exception):
pass
if lzma is None:
lzma = UnsupportedArchiveModule('xz', 'lzma')