#
# Copyright (C) 2013 Ash (Tuxdude) <tuxdude.github@gmail.com>
#
# This file is part of repobuddy.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
.. module: repobuddy.tests.common
:platform: Unix, Windows
:synopsis: Provides the test framework used by the tests for ``repobuddy``.
.. moduleauthor: Ash <tuxdude.github@gmail.com>
"""
import os as _os
import shlex as _shlex
import shutil as _shutil
import subprocess as _subprocess
import sys as _sys
import traceback as _traceback
if _sys.version_info < (2, 7):
# pylint: disable=F0401
import cStringIO as _io
import ordereddict as _collections
import unittest2 as _unittest
else:
# pylint: disable=F0401
if _sys.version_info < (3, 0):
import cStringIO as _io
else:
import io as _io
import collections as _collections
import unittest as _unittest
from repobuddy.utils import RepoBuddyBaseException, Logger
[docs]class ShellError(RepoBuddyBaseException):
"""Exception raised by :class:`ShellHelper`."""
[docs] def __init__(self, error_str):
"""Initializer.
:param error_str: The error string to store in the exception.
:type error_str: str
"""
super(ShellError, self).__init__(error_str)
return
[docs]class ShellHelper: # pylint: disable=W0232
"""Helper for performing shell operations."""
@classmethod
[docs] def exec_command(cls, command, base_dir, debug_output=True):
"""Execute a command.
:param command: The command to execute.
:type command: str
:param base_dir: The base directory path.
:type base_dir: str
:param debug_output: If set to ``False``, the output of the command is
**NOT** printed on ``stdout`` and ``stderr``.
:returns: None
:raises: :exc:`ShellError` if the program's execution returned a
non-zero return or any other errors.
"""
Logger.msg('>> ' + ' '.join(command))
try:
kwargs = {}
return_code = None
if not debug_output:
kwargs['stdout'] = open(_os.devnull, 'w')
kwargs['stderr'] = _subprocess.STDOUT
proc = _subprocess.Popen(command, # pylint: disable=W0142
cwd=base_dir,
**kwargs)
try:
proc.communicate()
except:
proc.kill()
proc.wait()
raise
return_code = proc.poll()
if return_code != 0:
raise ShellError('Command \'%s\' failed!' % command)
except (OSError, IOError) as err:
raise ShellError(str(err))
return
@classmethod
[docs] def read_file_as_string(cls, filename):
"""Read the contents of the file as a string and return it.
:param filename: The name of the file to read.
:type filename: str
:returns: Contents of the file.
:rtype: str
:raises: :exc:`ShellError` on errors.
"""
try:
with open(filename, 'r') as file_handle:
data = file_handle.read()
except (OSError, IOError) as err:
raise ShellError(str(err))
return data
@classmethod
[docs] def append_text_to_file(cls, text, filename, base_dir):
try:
with open(_os.path.join(base_dir, filename), 'a') as file_handle:
file_handle.write(text)
except (OSError, IOError) as err:
raise ShellError(str(err))
return
@classmethod
[docs] def remove_file(cls, filename):
try:
_os.unlink(filename)
except (OSError, IOError) as err:
raise ShellError(str(err))
return
@classmethod
[docs] def make_dir(cls,
dirname,
create_parent_dirs=False,
only_if_not_exists=False):
if not create_parent_dirs:
try:
if not only_if_not_exists or not _os.path.exists(dirname):
_os.mkdir(dirname)
except (OSError, IOError) as err:
raise ShellError(str(err))
else:
try:
if not only_if_not_exists or not _os.path.exists(dirname):
_os.makedirs(dirname)
except (OSError, IOError) as err:
raise ShellError(str(err))
return
@classmethod
[docs] def remove_dir(cls, dirname):
if _os.path.isdir(dirname):
try:
_shutil.rmtree(dirname, ignore_errors=False)
except (OSError, IOError) as err:
raise ShellError(str(err))
return
[docs]class TestCommon: # pylint: disable=W0232
@classmethod
def _git_append_add_commit(cls, text, filename, commit_log, exec_dir):
ShellHelper.append_text_to_file(text, filename, exec_dir)
ShellHelper.exec_command(
_shlex.split('git add %s' % filename),
exec_dir)
ShellHelper.exec_command(
_shlex.split('git commit -m "%s"' % commit_log),
exec_dir)
return
@classmethod
[docs] def get_string_stream(cls):
return _io.StringIO()
@classmethod
[docs] def setup_test_repos(cls, base_dir):
# Cleanup and create an empty directory
ShellHelper.remove_dir(base_dir)
ShellHelper.make_dir(base_dir)
# Set up the origin and clone repo paths
origin_repo_url = _os.path.join(base_dir, 'repo-origin')
clone_repo1 = _os.path.join(base_dir, 'clone1')
clone_repo2 = _os.path.join(base_dir, 'clone2')
# Set up the origin as a bare repo
ShellHelper.make_dir(origin_repo_url)
ShellHelper.exec_command(
_shlex.split('git init --bare'),
origin_repo_url)
# Create Clone1 from the origin
ShellHelper.exec_command(
_shlex.split('git clone %s %s' % (origin_repo_url, clone_repo1)),
base_dir)
# Create some content in clone1
cls._git_append_add_commit(
'First content...\n',
'README',
'First commit.',
clone_repo1)
cls._git_append_add_commit(
'Hardly useful...\n',
'dummy',
'Here we go.',
clone_repo1)
cls._git_append_add_commit(
'More content...\n',
'README',
'Appending to README.',
clone_repo1)
# Push the changes to origin
ShellHelper.exec_command(
_shlex.split('git push origin master'),
clone_repo1)
# Make some more changes, but do not push yet
cls._git_append_add_commit(
'Another line...\n',
'README',
'One more to README.',
clone_repo1)
cls._git_append_add_commit(
'Dummy2 in place...\n',
'dummy2',
'Creating dummy2.',
clone_repo1)
# Create clone2 from the origin
ShellHelper.exec_command(
_shlex.split('git clone %s %s' % (origin_repo_url, clone_repo2)),
base_dir)
# Add and commit changes in clone2
cls._git_append_add_commit(
'Another line...\n',
'dummy',
'One more to dummy.',
clone_repo2)
cls._git_append_add_commit(
'More dummy...\n',
'dummy',
'More dummy.',
clone_repo2)
# Create a new branch in clone2
ShellHelper.exec_command(
_shlex.split('git branch new-branch'),
clone_repo2)
ShellHelper.exec_command(
_shlex.split('git checkout new-branch'),
clone_repo2)
# Add some more changes in clone2's new-branch
cls._git_append_add_commit(
'More lines...\n',
'dummy',
'Another line to dummy.',
clone_repo2)
cls._git_append_add_commit(
'Just keep it coming...\n',
'dummy',
'Again :D',
clone_repo2)
# Switch back to master in clone2
ShellHelper.exec_command(
_shlex.split('git checkout master'),
clone_repo2)
# Push all branches to origin
ShellHelper.exec_command(
_shlex.split('git push origin --all'),
clone_repo2)
# Pull changes from origin into clone1
ShellHelper.exec_command(
_shlex.split('git fetch origin'),
clone_repo1)
ShellHelper.exec_command(
_shlex.split(
'git merge --commit -m "Merge origin into clone1" ' +
'origin/master'),
clone_repo1)
# Now push the merges back to origin
ShellHelper.exec_command(
_shlex.split('git push origin master'),
clone_repo1)
# Get the changes from origin into clone2
ShellHelper.exec_command(
_shlex.split('git fetch origin'),
clone_repo2)
ShellHelper.exec_command(
_shlex.split(
'git merge --commit -m "Merge origin into clone2" ' +
'origin/master'),
clone_repo2)
# Now push the merges back to origin
ShellHelper.exec_command(
_shlex.split('git push origin --all'),
clone_repo2)
# Now get rid of the clone repos, we only need the origin
ShellHelper.remove_dir(clone_repo1)
ShellHelper.remove_dir(clone_repo2)
return
[docs]class TestCaseBase(_unittest.TestCase):
def _set_tear_down_cb(self, method, *args, **kwargs):
self._tear_down_cb = method
self._tear_down_cb_args = args
self._tear_down_cb_kwargs = kwargs
return
def _clear_tear_down_cb(self):
self._tear_down_cb = None
self._tear_down_cb_args = None
self._tear_down_cb_kwargs = None
return
[docs] def __init__(self, methodName='runTest'):
super(TestCaseBase, self).__init__(methodName)
self._tear_down_cb = None
self._tear_down_cb_args = None
self._tear_down_cb_kwargs = None
if _sys.version_info >= (3, 2):
# pylint: disable=E1101
self._assert_count_equal = self.assertCountEqual
else:
# pylint: disable=E1101
self._assert_count_equal = self.assertItemsEqual
return
[docs] def setUp(self):
return
[docs] def tearDown(self):
if not self._tear_down_cb is None:
self._tear_down_cb(*self._tear_down_cb_args,
**self._tear_down_cb_kwargs)
self._clear_tear_down_cb()
return
[docs]class TestResult(_unittest.TestResult):
PASSED = 0
ERROR = 1
FAILED = 2
SKIPPED = 3
EXPECTED_FAILURE = 4
UNEXPECTED_SUCCESS = 5
_result_str = {PASSED: 'PASSED',
ERROR: 'ERROR',
FAILED: 'FAILED',
SKIPPED: 'SKIPPED',
EXPECTED_FAILURE: 'EXPECTED FAILURE',
UNEXPECTED_SUCCESS: 'UNEXPECTED_SUCCESS'}
def _update_result(self, test, err, result_type):
module_test_results = []
test_id = test.id().split('.')
if test_id[-2] not in self.test_results:
self.test_results[test_id[-2]] = module_test_results
else:
module_test_results = self.test_results[test_id[-2]]
result = {}
result['test_case'] = test_id[-1]
result['description'] = str(test.shortDescription())
result['result'] = result_type
if not err is None:
result['formated_traceback'] = \
''.join(_traceback.format_exception(err[0], err[1], err[2]))
module_test_results.append(result)
return
[docs] def __init__(self):
super(TestResult, self).__init__()
self.test_results = _collections.OrderedDict()
self.has_errors = False
self.has_failures = False
self.has_unexpected_success = False
return
@classmethod
[docs] def get_result_str(cls, result):
return cls._result_str[result]
[docs] def addError(self, test, err):
self._update_result(test, err, type(self).ERROR)
self.has_errors = True
return
[docs] def addFailure(self, test, err):
self._update_result(test, err, type(self).FAILED)
self.has_failures = True
return
[docs] def addSuccess(self, test):
self._update_result(test, None, type(self).PASSED)
return
[docs] def addSkip(self, test, reason):
self._update_result(test, None, type(self).SKIPPED)
return
[docs] def addExpectedFailure(self, test, err):
self._update_result(test, err, type(self).EXPECTED_FAILURE)
return
[docs] def addUnexpectedSuccess(self, test):
self._update_result(test, None, type(self).UNEXPECTED_SUCCESS)
self.has_unexpected_success = True
return
[docs]class TestRunner(_unittest.TextTestRunner):
[docs] def __init__(self, stream, descriptions=True, verbosity=1):
super(TestRunner, self).__init__(stream, descriptions, verbosity)
self._test_result = None
return
def _makeResult(self):
self._test_result = TestResult()
return self._test_result
[docs] def get_test_result(self):
return self._test_result
[docs]class TestSuiteManager(object):
_base_dir = None
@classmethod
[docs] def get_base_dir(cls):
return cls._base_dir
[docs] def __init__(self, base_dir):
if not _os.path.isdir(base_dir):
ShellHelper.make_dir(base_dir)
type(self)._base_dir = base_dir
self._test_suite = None
self._output = _io.StringIO()
self._test_result = None
return
[docs] def add_test_suite(self, test_suite):
if self._test_suite is None:
self._test_suite = test_suite
else:
self._test_suite.addTest(test_suite)
return
[docs] def run(self):
runner = TestRunner(
stream=self._output,
verbosity=0)
runner.run(self._test_suite)
self._test_result = runner.get_test_result()
return
[docs] def show_results(self):
Logger.msg('\n')
Logger.msg('*' * 72)
Logger.msg('{0:^72}'.format('Test Summary'))
Logger.msg('*' * 72)
Logger.msg(self._output.getvalue())
Logger.msg('-' * 72 + '\n\n')
error_traces_str = ''
failure_traces_str = ''
for test_suite, results in self._test_result.test_results.items():
Logger.msg('TestSuite: %s' % test_suite)
Logger.msg('#' * 72)
Logger.msg('{0:<8} {1:48} {2:16}'.format(
'No.',
'TestCase',
'Result'))
Logger.msg('#' * 72)
test_count = 0
for result in results:
test_count += 1
Logger.msg('{0:<8} {1:48} {2:16}'.format(
test_count,
result['test_case'],
TestResult.get_result_str(result['result'])))
if result['result'] == TestResult.ERROR:
error_traces_str += \
'%s::%s\n%s\n' % (test_suite,
result['test_case'],
result['formated_traceback'])
elif result['result'] == TestResult.FAILED:
failure_traces_str += \
'%s::%s\n%s\n' % (test_suite,
result['test_case'],
result['formated_traceback'])
Logger.msg('-' * 72 + '\n\n')
if self._test_result.has_errors:
Logger.msg('#' * 72)
Logger.msg('Errors')
Logger.msg('#' * 72 + '\n')
Logger.msg(error_traces_str + '-' * 72 + '\n')
if self._test_result.has_failures:
Logger.msg('#' * 72)
Logger.msg('Failures')
Logger.msg('#' * 72 + '\n')
Logger.msg(failure_traces_str + '-' * 72 + '\n')
Logger.msg('Tests Run: ' + str(self._test_result.testsRun))
Logger.msg('\n')
return
[docs] def was_successful(self):
return (not self._test_result.has_errors) and \
(not self._test_result.has_failures) and \
(not self._test_result.has_unexpected_success)