Source code for msl.package_manager.utils

"""
Functions for the API.

.. _packages: https://pypi.org/search/?q=%22Measurement+Standards+Laboratory+of+New+Zealand%22
.. _git: https://git-scm.com
.. _repositories: https://github.com/MSLNZ
.. _JSON: https://www.json.org/
"""
import base64
import collections
import datetime
import fnmatch
import getpass
import json
import logging
import os
import platform
import re
import shlex
import struct
import subprocess
import sys
import textwrap
import threading
import time

try:
    from importlib import reload
    from urllib.request import urlopen, Request, HTTPError, URLError
except ImportError:  # then Python 2
    from imp import reload
    from urllib2 import urlopen, Request, HTTPError, URLError

import pkg_resources
from colorama import Back
from colorama import Fore
from colorama import Style
from colorama import init
from pkg_resources import packaging

_PKG_NAME = 'msl-package-manager'
_pip_quiet = 0
_IS_WINDOWS = sys.platform == 'win32'

_HOME_DIR = os.path.join(os.path.expanduser('~'), '.msl', 'package-manager')
if not os.path.isdir(_HOME_DIR):
    os.makedirs(_HOME_DIR)

_GITHUB_AUTH_PATH = os.path.join(_HOME_DIR, 'github-auth')

# the HOME_DIR changed from ~/.msl to ~/.msl/package-manager
# move a previously-created github-auth file to the new HOME_DIR
_old_auth_path = os.path.join(os.path.expanduser('~'), '.msl', '.mslpm-github-auth')
if os.path.isfile(_old_auth_path):
    os.rename(_old_auth_path, _GITHUB_AUTH_PATH)

_package_name_regex = re.compile(
    r'(?P<package_name>[*]?[\w-]*[*]?[\w-]*)(?P<extras_require>\[.*\])?(?P<version_requested>[<!=>~].*)?'
)

try:
    subprocess.check_output(['git', '--version'])
except:
    has_git = False
else:
    has_git = True


[docs]def get_email(): """Try to determine the user's email address. If git_ is installed then it returns the ``user.email`` parameter from the user's git_ account to use as the user's email address. If git_ is not installed then returns :data:`None`. Returns ------- :class:`str` or :data:`None` The user's email address. """ if has_git: try: email = subprocess.check_output(['git', 'config', 'user.email']) except subprocess.CalledProcessError: pass else: return email.strip().decode('utf-8')
[docs]def get_username(): """Determine the name of the user. If git_ is installed then it returns the ``user.name`` parameter from the user's git_ account. If git_ is not installed or if the ``user.name`` parameter does not exist then :func:`getpass.getuser` is used to determine the username. Returns ------- :class:`str` The user's name. """ if has_git: try: uname = subprocess.check_output(['git', 'config', 'user.name']) except subprocess.CalledProcessError: pass else: return uname.strip().decode('utf-8') return getpass.getuser()
[docs]def github(update_cache=False): """Get the information about the MSL repositories_ that are available on GitHub. Parameters ---------- update_cache : :class:`bool`, optional The information about the repositories_ that are available on GitHub are cached to use for subsequent calls to this function. After 24 hours the cache is automatically updated. Set `update_cache` to be :data:`True` to force the cache to be updated when you call this function. Returns ------- :class:`dict` The information about the MSL repositories_ that are available on GitHub. """ packages, path = _inspect_github_pypi('github', update_cache) if packages: return packages def fetch(url_suffix): url = 'https://api.github.com' + url_suffix try: response = urlopen(Request(url, headers=headers)) except HTTPError as err: if err.code == 401 or (err.code == 403 and 'Forbidden' in err.msg): msg = ('You have provided invalid authorisation credentials',) elif err.code == 404: # Page not found error. # For example, fetching .../releases/latest might not exist # if no releases for the repo are available yet. This is not # (necessarily) an error; however, if the API routes change # then this error would silently occur but must be fixed. return dict() elif err.code == 403: reply = fetch('/rate_limit') core = reply.get('resources', {}).get('core', {}) if core.get('remaining') == 0: reset = core.get('reset', -1) if reset > 0: hm = datetime.datetime.fromtimestamp(reset+60).strftime('%I:%M%p') msg = ('GitHub rate limit exceeded. Not all of the information could be retrieved.\n' 'Retry after %s or create a file with your GitHub authorisation credentials to\n' 'increase your rate limit. Run "msl authorise --help" for more details.', hm) else: msg = ("%s (no 'reset' key)", err) else: msg = ("%s (no 'remaining' key)", err) else: msg = ('Unhandled %s', err) fetch_errors[err.code] = msg except URLError: # Possible reasons for this error are: # 1. Computer has no internet access # 2. GitHub server is offline # 3. The hostname for the GitHub API has changed fetch_errors[0] = ('Cannot access %s', url) else: return json.loads(response.read().decode('utf-8')) def fetch_latest_release(repo_name): reply = fetch('/repos/MSLNZ/{}/releases/latest'.format(repo_name)) if reply: def verify(ver): # ensure that the version is valid according to PEP 440 try: ver = ver.lstrip('v') packaging.version.Version(ver) return ver except packaging.version.InvalidVersion: return '' version = verify(reply['tag_name']) or verify(reply['name']) else: version = '' pkgs[repo_name]['version'] = version def fetch_tags(repo_name): reply = fetch('/repos/MSLNZ/{}/tags'.format(repo_name)) pkgs[repo_name]['tags'] = [tag['name'] for tag in reply] if reply else [] def fetch_branches(repo_name): reply = fetch('/repos/MSLNZ/{}/branches'.format(repo_name)) pkgs[repo_name]['branches'] = [branch['name'] for branch in reply] if reply else [] # Check if the user specified their GitHub authorisation credentials. # The os.environ option is used for CI testing, it is not the # recommended way for a user to store their credentials. auth = os.environ.get('MSL_PM_GITHUB_AUTH') if not auth and os.path.isfile(_GITHUB_AUTH_PATH): with open(_GITHUB_AUTH_PATH, mode='rb') as fp: line = fp.readline().strip() auth = base64.b64encode(line).decode('utf-8') headers = { 'User-Agent': _PKG_NAME + '/Python', 'Accept': 'application/vnd.github+json', } if auth: headers['Authorization'] = 'Basic ' + auth log.debug('Getting the repositories from GitHub') fetch_errors = {} # when more than 100 repos exist we'll need to also fetch "?per_page=100&page=2" repos = fetch('/orgs/MSLNZ/repos?per_page=100&page=1') if not repos: # even though updating the cache was requested just reload the cached data # because GitHub cannot be connected to right now for error in fetch_errors.values(): log.warning(*error) cache, _ = _inspect_github_pypi('github', False) return cache pkgs = dict() for repo in repos: if repo['language'] == 'Python': if repo['description'] is None: repo['description'] = '' pkgs[repo['name']] = {'description': repo['description']} threads = [ threading.Thread(target=fcn, args=(repo_name,)) for fcn in [fetch_latest_release, fetch_tags, fetch_branches] for repo_name in pkgs ] for t in threads: t.start() for t in threads: t.join() for error in fetch_errors.values(): log.warning(*error) # replace the failed requests with the cached information if fetch_errors and os.path.isfile(path): with open(path, mode='rt') as fp: cached = json.load(fp) for name, value in pkgs.items(): for item in ('version', 'tags', 'branches'): if not value[item] and name in cached: pkgs[name][item] = cached[name][item] with open(path, mode='wt') as fp: json.dump(pkgs, fp, indent=2) return _sort_packages(pkgs)
[docs]def info(from_github=False, from_pypi=False, update_cache=False, as_json=False): """Show information about MSL packages. The information about the packages can be either those that are installed or those that are available as repositories_ on GitHub or as packages_ on PyPI. The default action is to show the information about the MSL packages that are installed. Parameters ---------- from_github : :class:`bool`, optional Whether to show the information about the MSL repositories_ that are available on GitHub. from_pypi : :class:`bool`, optional Whether to show the information about the MSL packages_ that are available on PyPI. update_cache : :class:`bool`, optional The information about the MSL packages_ that are available on PyPI and about the repositories_ that are available on GitHub are cached to use for subsequent calls to this function. After 24 hours the cache is automatically updated. Set `update_cache` to be :data:`True` to force the cache to be updated when you call this function. If `from_github` is :data:`True` then the cache for the repositories_ is updated. If `from_pypi` is :data:`True` then the cache for the packages_ is updated. as_json : :class:`bool`, optional Whether to show the information in JSON_ format. If enabled then the information about the MSL repositories_ includes additional information about the branches and tags. """ if from_github: typ, pkgs = 'Repository', github(update_cache=update_cache) if not pkgs: return elif from_pypi: typ, pkgs = 'PyPI Package', pypi(update_cache=update_cache) if not pkgs: return else: typ, pkgs = 'Package', installed() if as_json: if not (from_github or from_pypi): for name, value in pkgs.items(): pkgs[name]['requires'] = [str(r) for r in value['requires']] log.debug(Fore.RESET) log.info(json.dumps(pkgs, indent=2)) return # determine the maximum width of each column header = ['MSL ' + typ, 'Version', 'Description'] w = [len(h) for h in header] for p in pkgs: w = [ max(w[0], len(p)), max(w[1], len(pkgs[p]['version'])), max(w[2], len(pkgs[p]['description'])) ] if 'PYCHARM_HOSTED' not in os.environ: # if not executing within the PyCharm IDE then the description # should wrap to the next line and begin where the description # on the previous line started term_w, term_h = _get_terminal_size() max_description_width = max(len(header[2]), term_w - (w[0] + w[1]) - 2) if w[2] > max_description_width: w[2] = max_description_width # log the results msg = [Fore.RESET] msg.append(' '.join(header[i].center(w[i]) for i in range(len(header)))) msg.append(' '.join('-' * width for width in w)) for p in sorted(pkgs): description = pkgs[p]['description'] name_version = p.rjust(w[0]) + ' ' + pkgs[p]['version'].ljust(w[1]) + ' ' if not description: msg.append(name_version) else: padding = ' ' * len(name_version) wrapped_lines = textwrap.wrap(description, width=w[2]) msg.append(name_version + wrapped_lines[0]) for line in wrapped_lines[1:]: msg.append(padding + line) log.info('\n'.join(msg))
[docs]def installed(): """Get the information about the MSL packages that are installed. Returns ------- :class:`dict` The information about the MSL packages that are installed. """ log.debug('Getting the packages from %s', os.path.dirname(sys.executable)) gh = github(update_cache=False) # refresh the working_set reload(pkg_resources) pkgs = {} for dist in pkg_resources.working_set: repo_name = None if dist.project_name in gh: # the installed name might be different from the repo name repo_name = dist.project_name description = '' requires = [] if dist.has_metadata(dist.PKG_INFO): for line in dist.get_metadata_lines(dist.PKG_INFO): if line.startswith('Summary:'): description = line[8:].strip() # can be UNKNOWN elif line.startswith('Home-page:') and 'github.com/MSLNZ' in line: repo_name = line.split('/')[-1] requires = dist.requires() if description: break if repo_name is None: continue if description == 'UNKNOWN': description = gh[repo_name]['description'] pkgs[dist.project_name] = { 'version': dist.version, 'description': description, 'repo_name': repo_name, 'requires': requires, } return _sort_packages(pkgs)
[docs]def outdated_pypi_packages(msl_installed=None): """Check PyPI for all non-MSL packages that are outdated. .. versionadded:: 2.5.0 Parameters ---------- msl_installed : :class:`dict`, optional The MSL packages that are installed. If not specified then calls :func:`installed` to determine the installed packages. Returns ------- :class:`dict` The information about the PyPI packages that are outdated. """ pkgs_to_update = dict() log.debug('Checking PyPI for all non-MSL packages that are outdated') try: output = subprocess.check_output([sys.executable, '-m', 'pip', 'list', '--outdated']) except subprocess.CalledProcessError as e: log.error('ERROR: processing "pip list --outdated" raised -> %s', e) return pkgs_to_update lines = output.decode().splitlines() if not lines: return pkgs_to_update header = [item.lower() for item in lines[0].split()] if header[:3] != ['package', 'version', 'latest']: log.error('ERROR: pip has changed the structure of the outdated packages') return pkgs_to_update # ignore lines[1] since it is a bunch of '-' outdated_packages = [dict(zip(header, line.split())) for line in lines[2:]] if not msl_installed: msl_installed = installed() for msl_project_name, item in msl_installed.items(): for outdated in outdated_packages: package = outdated['package'] if package == 'pip': continue for msl_requirement in item['requires']: if msl_requirement.project_name.startswith(package): # cannot update a package to the latest version # on PyPI if the installed MSL package specifies # that it only supports a specific version if msl_requirement.specifier: specifier = str(msl_requirement.specifier) if package in pkgs_to_update: if specifier not in pkgs_to_update[package]['version']: # multiple version constraints constraints = pkgs_to_update[package]['version'] if constraints[0] not in '<!=>~': # then `constraints` corresponds to an exact version # and is missing the leading "==" new_version = specifier else: new_version = specifier + ',' + constraints pkgs_to_update[package]['version'] = new_version else: pkgs_to_update[package] = { 'installed_version': outdated['version'], 'using_pypi': True, 'extras_require': '', 'version': specifier, 'repo_name': '', } if package not in pkgs_to_update and package not in msl_installed: pkgs_to_update[package] = { 'installed_version': outdated['version'], 'using_pypi': True, 'extras_require': '', 'version': outdated['latest'], 'repo_name': '', } return _sort_packages(pkgs_to_update)
[docs]def pypi(update_cache=False): """Get the information about the MSL packages_ that are available on PyPI. Parameters ---------- update_cache : :class:`bool`, optional The information about the MSL packages_ that are available on PyPI are cached to use for subsequent calls to this function. After 24 hours the cache is automatically updated. Set `update_cache` to be :data:`True` to force the cache to be updated when you call this function. Returns ------- :class:`dict` The information about the MSL packages_ that are available on PyPI. """ packages, path = _inspect_github_pypi('pypi', update_cache) if packages: return packages def request(endpoint): url = 'https://pypi.org' + endpoint try: response = urlopen(Request(url, headers=headers)) except URLError: log.error('Cannot access %s', url) else: return response.read().decode('utf-8') def use_json_api(): # use the JSON API as a backup way to get the package information from PyPI projects = ('msl-package-manager', 'msl-network', 'msl-loadlib', 'msl-io', 'GTC', 'Quantity-Value') for project in projects: response = request('/pypi/{}/json'.format(project)) if response: _info = json.loads(response).get('info') if _info: pkgs[project] = { 'version': _info.get('version', 'UNKNOWN'), 'description': _info.get('summary', 'UNKNOWN') } pkgs = dict() log.debug('Getting the packages from PyPI') headers = {'User-Agent': _PKG_NAME + '/Python'} # use the /search endpoint before the /json endpoint since searching does not # depend on knowing in advance what MSL packages are available on PyPI reply = request('/search/?q=%22Measurement+Standards+Laboratory+of+New+Zealand%22&o=') if reply: items = re.findall( r'<span class="package-snippet__name">(?P<name>.+)</span>\s+' r'<span class="package-snippet__version">(?P<version>.+)</span>\s+' r'<span class="package-snippet__created">.+\s+(?P<created>.+)\s+.+\s+.+\s+' r'<p class="package-snippet__description">(?P<description>.+)</p>', reply, ) for item in items: name, version, created, description = item pkgs[name] = { 'version': version, 'description': description, } if not pkgs: log.warning('The regex pattern is invalid for the /search route, using /json instead') use_json_api() else: use_json_api() if not pkgs: return _inspect_github_pypi('pypi', False)[0] with open(path, mode='wt') as fp: json.dump(pkgs, fp, indent=2) return _sort_packages(pkgs)
[docs]def set_log_level(level): """Set the logging :py:ref:`level <levels>`. Parameters ---------- level : :class:`int` A value from one of the :py:ref:`levels`. """ global _pip_quiet if level <= logging.INFO: _pip_quiet = 0 elif level == logging.WARNING: _pip_quiet = 1 elif level == logging.ERROR: _pip_quiet = 2 else: _pip_quiet = 3 log.setLevel(level)
def _ask_proceed(): """Ask whether to proceed with the command. Returns ------- :class:`bool` Whether to proceed. """ ask = '\nProceed (Y/n)? ' response = _get_input(ask).lower() while True: if response.startswith('y') or not response: return True elif response.startswith('n'): return False else: log.info('Invalid response') response = _get_input(ask).lower() def _check_kwargs(kwargs, allowed): for item in kwargs: if item not in allowed: log.warning('Invalid kwarg %r', item) def _check_wildcards_and_prefix(names, pkgs): """Check if a Unix shell-style wildcard was used or if the package name starts with the msl- prefix. Parameters ---------- names : :class:`tuple` of :class:`str` The package names. pkgs : :class:`dict` The GitHub repositories or the installed packages. Returns ------- :class:`dict` The keys are the package names and the values are the version info/extra requires. """ _packages = dict() pkgs_map = dict((p.lower(), p) for p in pkgs) for name in names: found = re.search(_package_name_regex, name.lower()) if not found: continue result = found.groupdict() if not result['package_name']: log.error('Invalid package name %r', name) continue found_it = False for prefix in ['', 'msl-']: for match in fnmatch.filter(pkgs_map, prefix+result['package_name']): found_it = True _packages[pkgs_map[match]] = { 'extras_require': result['extras_require'], 'version_requested': result['version_requested'] } if not found_it: log.warning('No MSL packages match %r', name) return _packages def _create_install_list(names, branch, commit, tag, update_cache): """Create a list of package names to ``install`` that are GitHub repositories_. Parameters ---------- names : :class:`tuple` of :class:`str` The name of a single GitHub repository or multiple repository names. branch : :class:`str` or :data:`None` The name of a git branch. commit : :class:`str` or :data:`None` The hash value of a git commit. tag : :class:`str` or :data:`None` The name of a git tag. update_cache : :class:`bool` Whether to force the GitHub cache to be updated when you call this function. Returns ------- :class:`dict` The MSL packages to ``install``. """ github_suffix = _get_github_url_suffix(branch, commit, tag) if github_suffix is None: return # keep the order of the log messages consistent: pypi -> github -> local pkgs_github = github(update_cache=update_cache) pkgs_installed = installed() if not names: # e.g., the --all flag packages = dict((pkg, {'extras_require': None, 'version_requested': None}) for pkg in pkgs_github if pkg != _PKG_NAME and pkg.startswith('msl-')) else: packages = _check_wildcards_and_prefix(names, pkgs_github) # the name of an installed package can be different from the repo name repo_names = [p['repo_name'] for p in pkgs_installed.values()] pkgs = {} for name, value in packages.items(): if name in pkgs_installed or name in repo_names: log.warning('The %r package is already installed -- use the update command', name) elif name not in pkgs_github: log.error('Cannot install %r -- the package does not exist', name) elif branch is not None and branch not in pkgs_github[name]['branches']: log.error('Cannot install %r -- a %r branch does not exist', name, branch) elif tag is not None and tag not in pkgs_github[name]['tags']: log.error('Cannot install %r -- a %r tag does not exist', name, tag) else: pkgs[name] = pkgs_github[name] pkgs[name]['version_requested'] = value['version_requested'] pkgs[name]['extras_require'] = value['extras_require'] return pkgs def _create_uninstall_list(names): """Create a list of package names to ``uninstall``. Parameters ---------- names : :class:`tuple` of :class:`str` The name(s) of the package(s) to ``uninstall``. Returns ------- :class:`dict` The MSL packages to ``uninstall``. """ pkgs_installed = installed() if not names: names = [pkg for pkg in pkgs_installed if pkg != _PKG_NAME] else: names = _check_wildcards_and_prefix(names, pkgs_installed) pkgs = {} for name in names: if name == _PKG_NAME: log.warning('The MSL Package Manager cannot uninstall itself, ' 'run "pip uninstall %s"', _PKG_NAME) elif name not in pkgs_installed: log.error('Cannot uninstall %r -- the package is not installed.', name) else: pkgs[name] = pkgs_installed[name] return pkgs def _get_input(msg): """Get input from the user. Parameters ---------- msg : :class:`str` The message to display. Returns ------- :class:`str` The user's input from :attr:`sys.stdin`. """ try: return raw_input(msg) # Python 2 except NameError: return input(msg) def _get_github_url_suffix(branch=None, commit=None, tag=None): """Returns the URL suffix. Parameters ---------- branch : :class:`str` or :data:`None` The name of a git branch. commit : :class:`str` or :data:`None` The hash value of a git commit. tag : :class:`str` or :data:`None` The name of a git tag. Returns ------- :class:`str` or :data:`None` The suffix or :data:`None` if more than 1 of `branch`, `tag` or `commit` were enabled. """ count = [bool(branch), bool(commit), bool(tag)].count(True) if count == 0: return 'main' # default branch if count > 1: log.error('Can only specify a branch, a commit or a tag (not multiple options simultaneously)') return if branch: return branch if commit: return commit return tag def _inspect_github_pypi(where, update_cache): """Inspects the HOME directory for the cached json file. Parameters ---------- where : :class:`str` Either 'github' or 'pypi'. update_cache : :class:`bool` Whether to update the cache. Returns ------- :class:`dict` The packages. :class:`str` The path to the cached file. """ if where == 'github': suffix = 'GitHub repositories' elif where == 'pypi': suffix = 'PyPI packages' else: assert False, '{!r} != github or pypi'.format(where) path = os.path.join(_HOME_DIR, where+'.json') cached_pgks = None if os.path.isfile(path): with open(path, mode='rt') as f: cached_pgks = _sort_packages(json.load(f)) one_day = 60 * 60 * 24 if (not update_cache) and (cached_pgks is not None) and (time.time() < os.path.getmtime(path) + one_day): # The installed() function also calls github() so this log message could be displayed twice. # Avoid seeing the following log message when the installed() function was previously called. if where == 'pypi' or not _ColourStreamHandler.previous_message.endswith(os.path.dirname(sys.executable)): log.debug('Loaded the cached information about the %s', suffix) return _sort_packages(cached_pgks), path return dict(), path def _log_install_uninstall_message(packages, action, branch=None, commit=None, tag=None, pkgs_pypi=None): """Print the ``install`` or ``uninstall`` summary for what is going to happen. Parameters ---------- packages : :class:`dict` The packages that are affected. action : :class:`str` The text to show in color and in upper case about what's happening. branch : :class:`str` or :data:`None` The name of a git branch to use. Only used when installing packages. commit : :class:`str` or :data:`None` The hash value of a git commit to use. Only used when installing packages. tag : :class:`str` or :data:`None` The name of a git tag to use. Only used when installing packages. pkgs_pypi : :class:`dict` Packages that are on PyPI. Only used when installing packages. """ pkgs = _sort_packages(packages) w = [0, 0] for pkg, values in pkgs.items(): if values.get('extras_require'): w[0] = max(w[0], len(pkg + values['extras_require'])) else: w[0] = max(w[0], len(pkg)) if values.get('version_requested'): w[1] = max(w[1], len(values['version_requested'])) else: w[1] = max(w[1], len(values['version'])) msg = '\n{}The following MSL packages will be {}{}{}:\n'.format(Fore.RESET, Fore.CYAN, action, Fore.RESET) for pkg, values in pkgs.items(): name = pkg if values.get('extras_require'): name += values['extras_require'] if values.get('version_requested'): version = values['version_requested'].replace('==', '') else: version = '' if (branch or commit or tag) else values['version'] msg += '\n {} {} '.format(name.ljust(w[0]), version.ljust(w[1])) if action == 'REMOVED': continue if branch is not None: msg += ' [branch:{}]'.format(branch) elif commit is not None: msg += ' [commit:{}]'.format(commit[:7]) elif tag is not None: msg += ' [tag:{}]'.format(tag) elif pkg in pkgs_pypi: msg += ' [PyPI]' else: msg += ' [GitHub]' log.info(msg) def _sort_packages(pkgs): """Sort the MSL packages by the name of the package. Parameters ---------- pkgs : :class:`dict` The MSL packages. Returns ------- :class:`collections.OrderedDict` The packages sorted by name. """ return collections.OrderedDict([(u'{}'.format(k), pkgs[k]) for k in sorted(pkgs)]) class _ColourStreamHandler(logging.StreamHandler): """A SteamHandler that is compatible with colorama.""" COLOURS = { 'DEBUG': Fore.CYAN, 'INFO': '', 'WARN': Fore.YELLOW, 'WARNING': Fore.YELLOW, 'ERROR': Style.BRIGHT + Fore.RED, 'CRIT': Back.RED + Fore.WHITE, 'CRITICAL': Back.RED + Fore.WHITE } previous_message = '' def emit(self, record): _ColourStreamHandler.previous_message = record.getMessage() stream = sys.stdout if record.levelno < logging.WARNING else sys.stderr try: stream.write(self.COLOURS[record.levelname] + self.format(record) + '\n') stream.flush() except: self.handleError(record) def _getLogger(name=None, fmt='%(message)s'): """Create the default stream logger""" init(autoreset=True) # initialize colorama logger = logging.getLogger(name) handler = _ColourStreamHandler() handler.setLevel(logging.DEBUG) handler.setFormatter(logging.Formatter(fmt)) logger.addHandler(handler) logger.setLevel(logging.DEBUG) return logger log = _getLogger(_PKG_NAME) def _get_terminal_size(): """ Taken from https://gist.github.com/jtriley/1108174 getTerminalSize() - get width and height of console - works on linux,os x,windows,cygwin(windows) originally retrieved from: https://stackoverflow.com/questions/566746/how-to-get-console-window-width-in-python """ current_os = platform.system() tuple_xy = None if current_os == 'Windows': tuple_xy = _get_terminal_size_windows() if tuple_xy is None: tuple_xy = _get_terminal_size_tput() # needed for window's python in cygwin's xterm! if current_os in ['Linux', 'Darwin'] or current_os.startswith('CYGWIN'): tuple_xy = _get_terminal_size_linux() if tuple_xy is None or tuple_xy == (0, 0): tuple_xy = (80, 25) # default value return tuple_xy def _get_terminal_size_windows(): try: from ctypes import windll, create_string_buffer # stdin handle is -10 # stdout handle is -11 # stderr handle is -12 h = windll.kernel32.GetStdHandle(-12) csbi = create_string_buffer(22) res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) if res: (bufx, bufy, curx, cury, wattr, left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) return right - left, bottom - top except: pass def _get_terminal_size_tput(): # https://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window try: cols = int(subprocess.check_call(shlex.split('tput cols'))) rows = int(subprocess.check_call(shlex.split('tput lines'))) return cols, rows except: pass def _get_terminal_size_linux(): def ioctl_GWINSZ(fd): try: import fcntl import termios cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) return cr except: pass cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.environ['LINES'], os.environ['COLUMNS']) except: return None return int(cr[1]), int(cr[0])