Source code for repobuddy.utils

#
#   Copyright (C) 2013 Ash (Tuxdude) <tuxdude.github@gmail.com>
#
#   This file is part of repobuddy.
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
#

import errno as _errno
import os as _os
import pkg_resources as _pkg_resources
import sys as _sys
import time as _time


[docs]class RepoBuddyBaseException(Exception):
[docs] def __init__(self, error_str): super(RepoBuddyBaseException, self).__init__(error_str) self._error_str = str(error_str) return
def __str__(self): return self._error_str def __repr__(self): return self._error_str
[docs]class FileLockError(RepoBuddyBaseException):
[docs] def __init__(self, error_str, is_time_out=False): super(FileLockError, self).__init__(error_str) self.is_time_out = is_time_out return # Credits to Evan for the FileLock code # noqa http://www.evanfosmark.com/2009/01/cross-platform-file-locking-support-in-python/ # pylint: disable=C0301 # Have made minor changes to the original version
[docs]class FileLock(object):
[docs] def __init__(self, file_name, timeout=1, delay=.1): self._is_locked = False self._lock_file = _os.path.join(file_name) self._timeout = timeout self._delay = delay self._fd = None return
def __del__(self): self.release() return def __enter__(self): if not self._is_locked: self.acquire() return self def __exit__(self, exception_type, exception_value, traceback): if self._is_locked: self.release() return
[docs] def acquire(self): begin = _time.time() while True: try: self._fd = _os.open( self._lock_file, _os.O_CREAT | _os.O_EXCL | _os.O_RDWR) break except OSError as err: if err.errno != _errno.EEXIST: raise FileLockError( 'Error: Unable to create the lock file: ' + self._lock_file) if (_time.time() - begin) >= self._timeout: raise FileLockError( 'Timeout', is_time_out=True) _time.sleep(self._delay) self._is_locked = True return
[docs] def release(self): if self._is_locked: _os.close(self._fd) try: _os.unlink(self._lock_file) except OSError as err: # Lock file could be deleted, ignoring if err.errno != _errno.ENOENT: raise FileLockError('Error: ' + str(err)) self._is_locked = False return
[docs]class ResourceHelperError(RepoBuddyBaseException):
[docs] def __init__(self, error_str): super(ResourceHelperError, self).__init__(error_str) return
[docs]class ResourceHelper: # pylint: disable=W0232 @classmethod
[docs] def open_data_file(cls, package_name, file_name): if _pkg_resources.resource_exists(package_name, file_name): return _pkg_resources.resource_stream(package_name, file_name) else: raise ResourceHelperError( 'Unable to locate the resource: %s in package %s' % (file_name, package_name)) return
[docs]class EqualityBase(object): def __eq__(self, other): return (type(other) is type(self)) and self.__dict__ == other.__dict__ def __ne__(self, other): return not self.__eq__(other)
[docs]class LoggerError(Exception):
[docs] def __init__(self, error_str): super(LoggerError, self).__init__(error_str) self._error_str = error_str return
def __str__(self): return str(self._error_str) def __repr__(self): return str(self._error_str)
[docs]class Logger: # pylint: disable=W0232 disable_debug = True debug_stream = _sys.stdout msg_stream = _sys.stdout error_stream = _sys.stdout def __new__(cls): raise LoggerError('This class should not be instantiated') @classmethod
[docs] def msg(cls, msg, append_new_line=True): if append_new_line: cls.msg_stream.write(msg + '\n') else: cls.msg_stream.write(msg) return
@classmethod
[docs] def debug(cls, msg, append_new_line=True): if not cls.disable_debug: if append_new_line: cls.debug_stream.write(msg + '\n') else: cls.debug_stream.write(msg) return
@classmethod
[docs] def error(cls, msg, append_new_line=True): if append_new_line: cls.error_stream.write(msg + '\n') else: cls.error_stream.write(msg) return

This Page