Source code for pulsar.apps.test.plugins.profile

'''
:class:`Profile` is a :class:`.TestPlugin` for profiling
test cases and generating Html reports.
It uses the :mod:`cProfile` module from the standard library.

To use the plugin follow these two steps:

* Included it in the test Suite::

    from pulsar.apps.test import TestSuite
    from pulsar.apps.test.plugins import profile

    def suite():
        TestSuite(..., plugins=(..., profile.Profile()))

* Run the test suite with the ``--profile`` command line option.

.. autoclass:: Profile
   :members:
   :member-order: bysource

'''
import os
import re
import time
import shutil
import tempfile
import cProfile as profiler
import pstats
from datetime import datetime
from io import StringIO as Stream

import pulsar

from .base import TestPlugin


other_filename = 'unknown'
line_func = re.compile(r'(?P<line>\d+)\((?P<func>\w+)\)')
template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                             'htmlfiles', 'profile')
headers = (
    ('ncalls',
     'total number of calls'),
    ('primitive calls',
     'Number primitive calls (calls not induced via recursion)'),
    ('tottime',
     'Total time spent in the given function (excluding time spent '
     'in calls to sub-functions'),
    ('percall',
     'tottime over ncalls, the time spent by each call'),
    ('cumtime',
     'Total time spent in the given function including all subfunctions'),
    ('percall',
     'cumtime over primitive calls'),
    ('function', ''),
    ('lineno', ''),
    ('filename', '')
    )


def absolute_file(val):
    dir = os.getcwd()
    return os.path.join(dir, val)


def make_stat_table(data):
    yield "<thead>\n<tr>\n"
    for head, description in headers:
        yield '<th title="{1}">{0}</th>'.format(head, description)
    yield '\n</tr>\n</thead>\n<tbody>\n'
    for row in data:
        yield '<tr>\n'
        for col in row:
            yield '<td>{0}</td>'.format(col)
        yield '\n</tr>\n'
    yield '</tbody>'


def data_stream(lines, num=None):
    if num:
        lines = lines[:num]
    for line in lines:
        if not line:
            continue
        fields = [field for field in line.split() if field is not '']
        if len(fields) == 6:
            valid = True
            new_fields = fields[0].split('/')
            if len(new_fields) == 1:
                new_fields.append(new_fields[0])
            for f in fields[1:-1]:
                try:
                    float(f)
                except Exception:
                    valid = False
                    break
                new_fields.append(f)
            if not valid:
                continue
            filenames = fields[-1].split(':')
            linefunc = filenames.pop()
            match = line_func.match(linefunc)
            if match:
                lineno, func = match.groups()
                filename = ':'.join(filenames)
                filename = filename.replace('\\', '/')
                new_fields.extend((func, lineno, filename))
            else:
                new_fields.extend(('', '', other_filename))
            yield new_fields


def copy_file(filename, target, context=None):
    with open(os.path.join(template_path, filename), 'r') as file:
        stream = file.read()
    if context:
        stream = stream.format(context)
    with open(os.path.join(target, filename), 'w') as file:
        file.write(stream)


[docs]class Profile(TestPlugin): """TestPlugin for profiling test cases. """ desc = '''Profile tests using the cProfile module''' profile_stats_path = pulsar.Setting( flags=['--profile-stats-path'], default='htmlprof', desc='location of profile directory.', validator=absolute_file) def configure(self, cfg): self.config = cfg self.profile_stats_path = cfg.profile_stats_path dir, name = os.path.split(self.profile_stats_path) fname = '.'+name self.profile_temp_path = os.path.join(dir, fname) def before_test_function_run(self, test, local): # If active return a TestProfile instance wrapping the real test case. if self.config.profile: local.prof = profiler.Profile() local.tmp = tempfile.mktemp(dir=self.profile_temp_path) local.prof.enable() def after_test_function_run(self, test, local): if self.config.profile: local.prof.disable() local.prof.dump_stats(local.tmp) def on_start(self): if self.config.profile: self.remove_dir(self.profile_temp_path, build=True) def remove_dir(self, dir, build=False): sleep = 0 if os.path.exists(dir): shutil.rmtree(dir) sleep = 0.2 if build: time.sleep(sleep) os.mkdir(dir) def on_end(self): if self.config.profile: files = [os.path.join(self.profile_temp_path, file) for file in os.listdir(self.profile_temp_path)] if not files: return stats = pstats.Stats(*files, **{'stream': Stream()}) stats.sort_stats('time', 'calls') stats.print_stats() stats_str = stats.stream.getvalue() self.remove_dir(self.profile_temp_path) stats_str = stats_str.split('\n') run_info = 'Executed %s.' % datetime.now().isoformat() for n, line in enumerate(stats_str): b = 0 while b < len(line) and line[b] == ' ': b += 1 line = line[b:] if line: if line.startswith('ncalls'): break bits = line.split(' ') try: int(bits[0]) except Exception: continue else: run_info += ' ' + line data = ''.join(make_stat_table(data_stream(stats_str[n+1:], 100))) self.remove_dir(self.profile_stats_path, build=True) for file in os.listdir(template_path): if file == 'index.html': copy_file(file, self.profile_stats_path, {'table': data, 'run_info': run_info, 'version': pulsar.__version__}) else: copy_file(file, self.profile_stats_path)