Source code for repobuddy.utils
#
# Copyright (C) 2013 Ash (Tuxdude) <tuxdude.github@gmail.com>
#
# This file is part of repobuddy.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import errno as _errno
import os as _os
import pkg_resources as _pkg_resources
import sys as _sys
import time as _time
[docs]class RepoBuddyBaseException(Exception):
[docs] def __init__(self, error_str):
super(RepoBuddyBaseException, self).__init__(error_str)
self._error_str = str(error_str)
return
def __str__(self):
return self._error_str
def __repr__(self):
return self._error_str
[docs]class FileLockError(RepoBuddyBaseException):
[docs] def __init__(self, error_str, is_time_out=False):
super(FileLockError, self).__init__(error_str)
self.is_time_out = is_time_out
return
# Credits to Evan for the FileLock code
# noqa http://www.evanfosmark.com/2009/01/cross-platform-file-locking-support-in-python/ # pylint: disable=C0301
# Have made minor changes to the original version
[docs]class FileLock(object):
[docs] def __init__(self, file_name, timeout=1, delay=.1):
self._is_locked = False
self._lock_file = _os.path.join(file_name)
self._timeout = timeout
self._delay = delay
self._fd = None
return
def __del__(self):
self.release()
return
def __enter__(self):
if not self._is_locked:
self.acquire()
return self
def __exit__(self, exception_type, exception_value, traceback):
if self._is_locked:
self.release()
return
[docs] def acquire(self):
begin = _time.time()
while True:
try:
self._fd = _os.open(
self._lock_file,
_os.O_CREAT | _os.O_EXCL | _os.O_RDWR)
break
except OSError as err:
if err.errno != _errno.EEXIST:
raise FileLockError(
'Error: Unable to create the lock file: ' +
self._lock_file)
if (_time.time() - begin) >= self._timeout:
raise FileLockError(
'Timeout',
is_time_out=True)
_time.sleep(self._delay)
self._is_locked = True
return
[docs] def release(self):
if self._is_locked:
_os.close(self._fd)
try:
_os.unlink(self._lock_file)
except OSError as err:
# Lock file could be deleted, ignoring
if err.errno != _errno.ENOENT:
raise FileLockError('Error: ' + str(err))
self._is_locked = False
return
[docs]class ResourceHelperError(RepoBuddyBaseException):
[docs] def __init__(self, error_str):
super(ResourceHelperError, self).__init__(error_str)
return
[docs]class ResourceHelper: # pylint: disable=W0232
@classmethod
[docs] def open_data_file(cls, package_name, file_name):
if _pkg_resources.resource_exists(package_name, file_name):
return _pkg_resources.resource_stream(package_name, file_name)
else:
raise ResourceHelperError(
'Unable to locate the resource: %s in package %s' %
(file_name, package_name))
return
[docs]class EqualityBase(object):
def __eq__(self, other):
return (type(other) is type(self)) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
[docs]class LoggerError(Exception):
[docs] def __init__(self, error_str):
super(LoggerError, self).__init__(error_str)
self._error_str = error_str
return
def __str__(self):
return str(self._error_str)
def __repr__(self):
return str(self._error_str)
[docs]class Logger: # pylint: disable=W0232
disable_debug = True
debug_stream = _sys.stdout
msg_stream = _sys.stdout
error_stream = _sys.stdout
def __new__(cls):
raise LoggerError('This class should not be instantiated')
@classmethod
[docs] def msg(cls, msg, append_new_line=True):
if append_new_line:
cls.msg_stream.write(msg + '\n')
else:
cls.msg_stream.write(msg)
return
@classmethod
[docs] def debug(cls, msg, append_new_line=True):
if not cls.disable_debug:
if append_new_line:
cls.debug_stream.write(msg + '\n')
else:
cls.debug_stream.write(msg)
return
@classmethod
[docs] def error(cls, msg, append_new_line=True):
if append_new_line:
cls.error_stream.write(msg + '\n')
else:
cls.error_stream.write(msg)
return