PKT;4 funkload/FunkLoadDocTest.py# (C) Copyright 2006 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """FunkLoad doc test $Id: FunkLoadDocTest.py 32254 2006-01-26 10:58:02Z bdelbosc $ """ import os from tempfile import gettempdir from FunkLoadTestCase import FunkLoadTestCase import PatchWebunit class FunkLoadDocTest(FunkLoadTestCase): """Class to use in doctest. >>> from FunkLoadDocTest import FunkLoadDocTest >>> fl = FunkLoadDocTest() >>> ret = fl.get('http://localhost') >>> ret.code 200 >>> 'HTML' in ret.body True """ def __init__(self, debug=False, debug_level=1): """Initialise the test case.""" class Dummy: pass option = Dummy() option.ftest_sleep_time_max = .001 option.ftest_sleep_time_min = .001 if debug: option.ftest_log_to = 'console file' if debug_level: option.debug_level = debug_level else: option.ftest_log_to = 'file' tmp_path = gettempdir() option.ftest_log_path = os.path.join(tmp_path, 'fl-doc-test.log') option.ftest_result_path = os.path.join(tmp_path, 'fl-doc-test.xml') FunkLoadTestCase.__init__(self, 'runTest', option) def runTest(self): """FL doctest""" return def _test(): import doctest, FunkLoadDocTest return doctest.testmod(FunkLoadDocTest) if __name__ == "__main__": _test() PKT;4Q$Q$funkload/ReportStats.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """Classes that collect statistics submitted by the result parser. $Id: ReportStats.py 24737 2005-08-31 09:00:16Z bdelbosc $ """ class MonitorStat: """Collect system monitor info.""" def __init__(self, attrs): for key, value in attrs.items(): setattr(self, key, value) class ErrorStat: """Collect Error or Failure stats.""" def __init__(self, cycle, step, number, code, header, body, traceback): self.cycle = cycle self.step = step self.number = number self.code = code self.header = header and header.copy() or {} self.body = body or None self.traceback = traceback class AllResponseStat: """Collect stat for all response in a cycle.""" def __init__(self, cycle, cycle_duration, cvus): self.cycle = cycle self.cycle_duration = cycle_duration self.cvus = int(cvus) self.per_second = {} self.max = 0 self.min = 999999999 self.avg = 0 self.total = 0 self.count = 0 self.success = 0 self.error = 0 self.error_percent = 0 self.rps = 0 self.rps_min = 0 self.rps_max = 0 self.finalized = False def add(self, date, result, duration): """Add a new response to stat.""" date_s = int(float(date)) self.per_second [date_s] = self.per_second.setdefault( int(date_s), 0) + 1 self.count += 1 if result == 'Successful': self.success += 1 else: self.error += 1 self.max = max(self.max, float(duration)) self.min = min(self.min, float(duration)) self.total += float(duration) self.finalized = False def finalize(self): """Compute avg times.""" if self.finalized: return if self.count: self.avg = self.total / float(self.count) self.min = min(self.max, self.min) if self.error: self.error_percent = 100.0 * self.error / float(self.count) rps_min = rps_max = 0 for date in self.per_second.keys(): rps_max = max(rps_max, self.per_second[date]) rps_min = min(rps_min, self.per_second[date]) if self.cycle_duration: rps = self.count / float(self.cycle_duration) if rps < 1: # average is lower than 1 this means that sometime there was # no request during one second rps_min = 0 self.rps = rps self.rps_max = rps_max self.rps_min = rps_min self.finalized = True class SinglePageStat: """Collect stat for a single page.""" def __init__(self, step): self.step = step self.count = 0 self.date_s = None self.duration = 0.0 self.result = 'Successful' def addResponse(self, date, result, duration): """Add a response to a page.""" self.count += 1 if self.date_s is None: self.date_s = int(float(date)) self.duration += float(duration) if result != 'Successful': self.result = result def __repr__(self): """Representation.""" return 'page %s %s %ss' % (self.step, self.result, self.duration) class PageStat(AllResponseStat): """Collect stat for asked pages in a cycle.""" def __init__(self, cycle, cycle_duration, cvus): AllResponseStat.__init__(self, cycle, cycle_duration, cvus) self.threads = {} def add(self, thread, step, date, result, duration, rtype): """Add a new response to stat.""" thread = self.threads.setdefault(thread, {'count': 0, 'pages': {}}) if str(rtype) in ('post', 'get', 'xmlrpc'): new_page = True else: new_page = False if new_page: thread['count'] += 1 self.count += 1 if not thread['count']: # don't take into account request that belongs to a staging up page return stat = thread['pages'].setdefault(thread['count'], SinglePageStat(step)) stat.addResponse(date, result, duration) self.finalized = False def finalize(self): """Compute avg times.""" if self.finalized: return for thread in self.threads.keys(): for page in self.threads[thread]['pages'].values(): if str(page.result) == 'Successful': if page.date_s: count = self.per_second.setdefault(page.date_s, 0) + 1 self.per_second[page.date_s] = count self.success += 1 self.total += page.duration else: self.error += 1 continue duration = page.duration self.max = max(self.max, duration) self.min = min(self.min, duration) AllResponseStat.finalize(self) if self.cycle_duration: # override rps to srps self.rps = self.success / float(self.cycle_duration) self.finalized = True class ResponseStat: """Collect stat a specific response in a cycle.""" def __init__(self, step, number, cvus): self.step = step self.number = number self.cvus = int(cvus) self.max = 0 self.min = 999999999 self.avg = 0 self.total = 0 self.count = 0 self.success = 0 self.error = 0 self.error_percent = 0 self.url = '?' self.description = '' self.type = '?' self.finalized = False def add(self, rtype, result, url, duration, description=None): """Add a new response to stat.""" self.count += 1 if result == 'Successful': self.success += 1 else: self.error += 1 self.max = max(self.max, float(duration)) self.min = min(self.min, float(duration)) self.total += float(duration) self.url = url self.type = rtype if description is not None: self.description = description self.finalized = False def finalize(self): """Compute avg times.""" if self.finalized: return if self.total: self.avg = self.total / float(self.count) self.min = min(self.max, self.min) if self.error: self.error_percent = 100.0 * self.error / float(self.count) self.finalized = True class TestStat: """Collect test stat for a cycle. Stat on successful test case. """ def __init__(self, cycle, cycle_duration, cvus): self.cycle = cycle self.cycle_duration = float(cycle_duration) self.cvus = int(cvus) self.max = 0 self.min = 999999999 self.avg = 0 self.total = 0 self.count = 0 self.success = 0 self.error = 0 self.error_percent = 0 self.traceback = [] self.pages = self.images = self.redirects = self.links = 0 self.xmlrpc = 0 self.tps = 0 self.finalized = False def add(self, result, pages, xmlrpc, redirects, images, links, duration, traceback=None): """Add a new response to stat.""" self.finalized = False self.count += 1 if traceback is not None: self.traceback.append(traceback) if result == 'Successful': self.success += 1 else: self.error += 1 return self.max = max(self.max, float(duration)) self.min = min(self.min, float(duration)) self.total += float(duration) self.pages = max(self.pages, int(pages)) self.xmlrpc = max(self.xmlrpc, int(xmlrpc)) self.redirects = max(self.redirects, int(redirects)) self.images = max(self.images, int(images)) self.links = max(self.links, int(links)) def finalize(self): """Compute avg times.""" if self.finalized: return if self.success: self.avg = self.total / float(self.success) self.min = min(self.max, self.min) if self.error: self.error_percent = 100.0 * self.error / float(self.count) if self.cycle_duration: self.tps = self.success / float(self.cycle_duration) self.finalized = True PKH\b4?#**funkload/PatchWebunit.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """Patching Richard Jones' webunit for FunkLoad. * Add cache for links (css, js) * store a browser history * add headers * log response * remove webunit log * fix HTTPResponse __repr__ $Id: PatchWebunit.py 24649 2005-08-29 14:20:19Z bdelbosc $ """ import sys import time import urlparse import httplib import cStringIO from webunit import cookie from webunit.utility import mimeEncode, boundary from webunit.IMGSucker import IMGSucker from webunit.webunittest import WebTestCase, WebFetcher from webunit.webunittest import HTTPResponse, HTTPError, VERBOSE from utils import thread_sleep class FKLIMGSucker(IMGSucker): """Image and links loader, patched to log response stats.""" def __init__(self, url, session, ftestcase=None): IMGSucker.__init__(self, url, session) self.ftestcase = ftestcase def do_img(self, attributes): newattributes = [] for name, value in attributes: if name == 'src': url = urlparse.urljoin(self.base, value) # TODO: figure the re-write path # newattributes.append((name, path)) if not self.session.images.has_key(url): self.ftestcase.logdd(' img: %s ...' % url) t_start = time.time() self.session.images[url] = self.session.fetch(url) t_stop = time.time() self.ftestcase.logdd(' Done in %.3fs' % (t_stop - t_start)) self.session.history.append(('image', url)) self.ftestcase.total_time += (t_stop - t_start) self.ftestcase.total_images += 1 self.ftestcase._log_response(self.session.images[url], 'image', None, t_start, t_stop) thread_sleep() # give a chance to other threads else: newattributes.append((name, value)) # Write the img tag to file (with revised paths) self.unknown_starttag('img', newattributes) def do_link(self, attributes): newattributes = [('rel', 'stylesheet'), ('type', 'text/css')] for name, value in attributes: if name == 'href': url = urlparse.urljoin(self.base, value) # TODO: figure the re-write path # newattributes.append((name, path)) if not self.session.css.has_key(url): self.ftestcase.logdd(' link: %s ...' % url) t_start = time.time() self.session.css[url] = self.session.fetch(url) t_stop = time.time() self.ftestcase.logdd(' Done in %.3fs' % (t_stop - t_start)) self.session.history.append(('link', url)) self.ftestcase.total_time += (t_stop - t_start) self.ftestcase.total_links += 1 self.ftestcase._log_response(self.session.css[url], 'link', None, t_start, t_stop) thread_sleep() # give a chance to other threads else: newattributes.append((name, value)) # Write the link tag to file (with revised paths) self.unknown_starttag('link', newattributes) # remove webunit logging def WTC_log(self, message, content): """Remove webunit logging.""" pass WebTestCase.log = WTC_log # use fl img sucker def WTC_pageImages(self, url, page, testcase=None): '''Given the HTML page that was loaded from url, grab all the images. ''' sucker = FKLIMGSucker(url, self, testcase) sucker.feed(page) sucker.close() WebTestCase.pageImages = WTC_pageImages # WebFetcher fetch def WF_fetch(self, url, postdata=None, server=None, port=None, protocol=None, ok_codes=None): '''Run a single test request to the indicated url. Use the POST data if supplied. Raises failureException if the returned data contains any of the strings indicated to be Error Content. Returns a HTTPReponse object wrapping the response from the server. ''' # see if the url is fully-qualified (not just a path) t_protocol, t_server, t_url, x, t_args, x = urlparse.urlparse(url) if t_server: protocol = t_protocol if ':' in t_server: server, port = t_server.split(':') else: server = t_server if protocol == 'http': port = '80' else: port = '443' url = t_url if t_args: url = url + '?' + t_args # ignore the machine name if the URL is for localhost if t_server == 'localhost': server = None elif not server: # no server was specified with this fetch, or in the URL, so # see if there's a base URL to use. base = self.get_base_url() if base: t_protocol, t_server, t_url, x, x, x = urlparse.urlparse(base) if t_protocol: protocol = t_protocol if t_server: server = t_server if t_url: url = urlparse.urljoin(t_url, url) # TODO: allow override of the server and port from the URL! if server is None: server = self.server if port is None: port = self.port if protocol is None: protocol = self.protocol if ok_codes is None: ok_codes = self.expect_codes if protocol == 'http': h = httplib.HTTP(server, int(port)) if int(port) == 80: host_header = server else: host_header = '%s:%s' % (server, port) elif protocol == 'https': #if httpslib is None: #raise ValueError, "Can't fetch HTTPS: M2Crypto not installed" h = httplib.HTTPS(server, int(port)) if int(port) == 443: host_header = server else: host_header = '%s:%s' % (server, port) else: raise ValueError, protocol params = None if postdata: for field, value in postdata.items(): if type(value) == type({}): postdata[field] = [] for k, selected in value.items(): if selected: postdata[field].append(k) # Do a post with the data file params = mimeEncode(postdata) h.putrequest('POST', url) h.putheader('Content-type', 'multipart/form-data; boundary=%s'% boundary) h.putheader('Content-length', str(len(params))) else: # Normal GET h.putrequest('GET', url) # Other Full Request headers if self.authinfo: h.putheader('Authorization', "Basic %s"%self.authinfo) h.putheader('Host', host_header) # FL Patch ------------------------- for key, value in self.extra_headers: h.putheader(key, value) # FL Patch end --------------------- # Send cookies # - check the domain, max-age (seconds), path and secure # (http://www.ietf.org/rfc/rfc2109.txt) cookies_used = [] cookie_list = [] for domain, cookies in self.cookies.items(): # check cookie domain if not server.endswith(domain): continue for path, cookies in cookies.items(): # check that the path matches urlpath = urlparse.urlparse(url)[2] if not urlpath.startswith(path) and not (path == '/' and urlpath == ''): continue for sendcookie in cookies.values(): # and that the cookie is or isn't secure if sendcookie['secure'] and protocol != 'https': continue # TODO: check max-age cookie_list.append("%s=%s;"%(sendcookie.key, sendcookie.coded_value)) cookies_used.append(sendcookie.key) if cookie_list: h.putheader('Cookie', ' '.join(cookie_list)) # check that we sent the cookies we expected to if self.expect_cookies is not None: assert cookies_used == self.expect_cookies, \ "Didn't use all cookies (%s expected, %s used)"%( self.expect_cookies, cookies_used) # finish the headers h.endheaders() if params is not None: h.send(params) # handle the reply errcode, errmsg, headers = h.getreply() # get the body and save it f = h.getfile() g = cStringIO.StringIO() d = f.read() while d: g.write(d) d = f.read() response = HTTPResponse(self.cookies, protocol, server, port, url, errcode, errmsg, headers, g.getvalue(), self.error_content) f.close() if errcode not in ok_codes: if VERBOSE: sys.stdout.write('e') sys.stdout.flush() raise HTTPError(response) # decode the cookies if self.accept_cookies: try: # decode the cookies and update the cookies store cookie.decodeCookies(url, server, headers, self.cookies) except: if VERBOSE: sys.stdout.write('c') sys.stdout.flush() raise # Check errors if self.error_content: data = response.body for content in self.error_content: if data.find(content) != -1: msg = "Matched error: %s" % content if hasattr(self, 'results') and self.results: self.writeError(url, msg) self.log('Matched error'+`(url, content)`, data) if VERBOSE: sys.stdout.write('c') sys.stdout.flush() raise self.failureException, msg if VERBOSE: sys.stdout.write('_') sys.stdout.flush() return response WebFetcher.fetch = WF_fetch def HR___repr__(self): """fix HTTPResponse rendering.""" return """""" % ( self.protocol, self.server, self.port, self.url, self.code, self.message) HTTPResponse.__repr__ = HR___repr__ PKH\b4߂߂funkload/FunkLoadTestCase.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """FunkLoad test case using Richard Jones' webunit. $Id: FunkLoadTestCase.py 24757 2005-08-31 12:22:19Z bdelbosc $ """ import os import sys import time import re from warnings import warn from socket import error as SocketError from types import ListType from datetime import datetime import unittest import traceback from random import random from urllib import urlencode from tempfile import mkdtemp from xml.sax.saxutils import quoteattr from urlparse import urljoin from ConfigParser import ConfigParser, NoSectionError, NoOptionError from webunit.webunittest import WebTestCase, HTTPError import PatchWebunit from utils import get_default_logger, mmn_is_bench, mmn_decode from utils import recording, thread_sleep, is_html, get_version, trace from xmlrpclib import ServerProxy _marker = [] # ------------------------------------------------------------ # Classes # class FunkLoadTestCase(unittest.TestCase): """Unit test with browser and configuration capabilties.""" # ------------------------------------------------------------ # Initialisation # def __init__(self, methodName='runTest', options=None): """Initialise the test case. Note that methodName is encoded in bench mode to provide additional information like thread_id, concurrent virtual users...""" if mmn_is_bench(methodName): self.in_bench_mode = True else: self.in_bench_mode = False self.test_name, self.cycle, self.cvus, self.thread_id = mmn_decode( methodName) self.meta_method_name = methodName self.suite_name = self.__class__.__name__ unittest.TestCase.__init__(self, methodName=self.test_name) self._response = None self.options = options self.debug_level = getattr(options, 'debug_level', 0) self._funkload_init() self._dump_dir = getattr(options, 'dump_dir', None) self._dumping = self._dump_dir and True or False self._viewing = getattr(options, 'firefox_view', False) self._accept_invalid_links = getattr(options, 'accept_invalid_links', False) self._simple_fetch = getattr(options, 'simple_fetch', False) self._stop_on_fail = getattr(options, 'stop_on_fail', False) if self._viewing and not self._dumping: # viewing requires dumping contents self._dumping = True self._dump_dir = mkdtemp('_funkload') self._loop_mode = getattr(options, 'loop_steps', False) if self._loop_mode: if options.loop_steps.count(':'): steps = options.loop_steps.split(':') self._loop_steps = range(int(steps[0]), int(steps[1])) else: self._loop_steps = [int(options.loop_steps)] self._loop_number = options.loop_number self._loop_recording = False self._loop_records = [] def _funkload_init(self): """Initialize a funkload test case using a configuration file.""" # look into configuration file config_directory = os.getenv('FL_CONF_PATH', '.') config_path = os.path.join(config_directory, self.__class__.__name__ + '.conf') config_path = os.path.abspath(config_path) if not os.path.exists(config_path): config_path = "Missing: "+ config_path config = ConfigParser() config.read(config_path) self._config = config self._config_path = config_path self.default_user_agent = self.conf_get('main', 'user_agent', 'FunkLoad/%s' % get_version(), quiet=True) if self.in_bench_mode: section = 'bench' else: section = 'ftest' ok_codes = self.conf_getList(section, 'ok_codes', [200, 301, 302], quiet=True) self.ok_codes = map(int, ok_codes) self.sleep_time_min = self.conf_getFloat(section, 'sleep_time_min', 0) self.sleep_time_max = self.conf_getFloat(section, 'sleep_time_max', 0) self.log_to = self.conf_get(section, 'log_to', 'console file') self.log_path = self.conf_get(section, 'log_path', 'funkload.log') self.result_path = os.path.abspath( self.conf_get(section, 'result_path', 'funkload.xml')) # init loggers self.logger = get_default_logger(self.log_to, self.log_path) self.logger_result = get_default_logger(log_to="xml", log_path=self.result_path, name="FunkLoadResult") #self.logd('_funkload_init config [%s], log_to [%s],' # ' log_path [%s], result [%s].' % ( # self._config_path, self.log_to, self.log_path, self.result_path)) # init webunit browser (passing a fake methodName) self._browser = WebTestCase(methodName='log') self.clearContext() #self.logd('# FunkLoadTestCase._funkload_init done') def clearContext(self): """Resset the testcase.""" self._browser.clearContext() self._browser.css = {} self._browser.history = [] self._browser.extra_headers = [] self.step_success = True self.test_status = 'Successful' self.steps = 0 self.page_responses = 0 self.total_responses = 0 self.total_time = 0.0 self.total_pages = self.total_images = 0 self.total_links = self.total_redirects = 0 self.total_xmlrpc = 0 self.clearBasicAuth() self.clearHeaders() self.setUserAgent(self.default_user_agent) self.logdd('FunkLoadTestCase.clearContext done') #------------------------------------------------------------ # browser simulation # def _connect(self, url, params, ok_codes, rtype, description): """Handle fetching, logging, errors and history.""" t_start = time.time() try: response = self._browser.fetch(url, params, ok_codes=ok_codes) except: etype, value, tback = sys.exc_info() t_stop = time.time() t_delta = t_stop - t_start self.total_time += t_delta self.step_success = False self.test_status = 'Failure' self.logd(' Failed in %.3fs' % t_delta) if etype is HTTPError: self._log_response(value.response, rtype, description, t_start, t_stop, log_body=True) if self._dumping: self._dump_content(value.response) raise self.failureException, str(value.response) else: self._log_response_error(url, rtype, description, t_start, t_stop) if etype is SocketError: raise SocketError("Can't load %s." % url) raise t_stop = time.time() # Log response t_delta = t_stop - t_start self.total_time += t_delta if rtype in ('post', 'get'): self.total_pages += 1 elif rtype == 'redirect': self.total_redirects += 1 elif rtype == 'link': self.total_links += 1 if rtype in ('post', 'get', 'redirect'): # this is a valid referer for the next request self.setHeader('Referer', url) self._browser.history.append((rtype, url)) self.logd(' Done in %.3fs' % t_delta) self._log_response(response, rtype, description, t_start, t_stop) if self._dumping: self._dump_content(response) return response def _browse(self, url_in, params_in=None, description=None, ok_codes=None, method='post', follow_redirect=True, load_auto_links=True, sleep=True): """Simulate a browser handle redirects, load/cache css and images.""" self._response = None # Loop mode if self._loop_mode: if self.steps == self._loop_steps[0]: self._loop_recording = True self.logi('Loop mode start recording') if self._loop_recording: self._loop_records.append((url_in, params_in, description, ok_codes, method, follow_redirect, load_auto_links, False)) # ok codes if ok_codes is None: ok_codes = self.ok_codes if type(params_in) is ListType: # convert list into a dict params = {} for key, value in params_in: params[key] = params.setdefault(key, []) params[key].append(value) for key, value in params.items(): if len(value) == 1: params[key] = value[0] else: params = params_in if method == 'get' and params: url = url_in + '?' + urlencode(params) params = None else: url = url_in if method == 'get': self.logd('GET: %s\n\tPage %i: %s ...' % (url, self.steps, description or '')) else: url = url_in self.logd('POST: %s %s\n\tPage %i: %s ...' % (url, str(params), self.steps, description or '')) # Fetching response = self._connect(url, params, ok_codes, method, description) # Check redirection if follow_redirect and response.code in (301, 302): max_redirect_count = 10 thread_sleep() # give a chance to other threads while response.code in (301, 302) and max_redirect_count: # Figure the location - which may be relative newurl = response.headers['Location'] url = urljoin(url_in, newurl) self.logd(' Load redirect link: %s' % url) response = self._connect(url, None, ok_codes, 'redirect', None) max_redirect_count -= 1 if not max_redirect_count: self.logd(' WARNING Too many redirects give up.') # Load auto links (css and images) response.is_html = is_html(response.body) if load_auto_links and response.is_html and not self._simple_fetch: self.logd(' Load css and images...') page = response.body t_start = time.time() c_start = self.total_time try: # pageImages is patched to call _log_response on all links self._browser.pageImages(url, page, self) except HTTPError, error: if self._accept_invalid_links: self.logd(' ' + str(error)) else: t_stop = time.time() t_delta = t_stop - t_start self.step_success = False self.test_status = 'Failure' self.logd(' Failed in ~ %.2fs' % t_delta) # XXX The duration logged for this response is wrong self._log_response(error.response, 'link', None, t_start, t_stop, log_body=True) raise self.failureException, str(error) c_stop = self.total_time self.logd(' Done in %.3fs' % (c_stop - c_start)) if sleep: self.sleep() self._response = response # Loop mode if self._loop_mode and self.steps == self._loop_steps[-1]: self._loop_recording = False self.logi('Loop mode end recording.') t_start = self.total_time count = 0 for i in range(self._loop_number): self.logi('Loop mode replay %i' % i) for record in self._loop_records: count += 1 self.steps += 1 self._browse(*record) t_delta = self.total_time - t_start text = ('End of loop: %d pages rendered in %.3fs, ' 'avg of %.3fs per page, ' '%.3f SPPS without concurrency.' % (count, t_delta, t_delta/count, count/t_delta)) self.logi(text) trace(text + '\n') return response def post(self, url, params=None, description=None, ok_codes=None): """POST method on url with params.""" self.steps += 1 self.page_responses = 0 response = self._browse(url, params, description, ok_codes, method="post") return response def get(self, url, params=None, description=None, ok_codes=None): """GET method on url adding params.""" self.steps += 1 self.page_responses = 0 response = self._browse(url, params, description, ok_codes, method="get") return response def exists(self, url, params=None, description="Checking existence"): """Try a GET on URL return True if the page exists or False.""" resp = self.get(url, params, description=description, ok_codes=[200, 301, 302, 404, 503]) if resp.code not in [200, 301, 302]: self.logd('Page %s not found.' % url) return False self.logd('Page %s exists.' % url) return True def xmlrpc(self, url_in, method_name, params=None, description=None): """Call an xml rpc method_name on url with params.""" self.steps += 1 self.page_responses = 0 self.logd('XMLRPC: %s::%s\n\tCall %i: %s ...' % (url_in, method_name, self.steps, description or '')) response = None t_start = time.time() if self._authinfo is not None: url = url_in.replace('//', '//'+self._authinfo) else: url = url_in try: server = ServerProxy(url) method = getattr(server, method_name) if params is not None: response = method(*params) else: response = method() except: etype, value, tback = sys.exc_info() t_stop = time.time() t_delta = t_stop - t_start self.total_time += t_delta self.step_success = False self.test_status = 'Error' self.logd(' Failed in %.3fs' % t_delta) self._log_xmlrpc_response(url_in, method_name, description, response, t_start, t_stop, -1) if etype is SocketError: raise SocketError("Can't access %s." % url) raise t_stop = time.time() t_delta = t_stop - t_start self.total_time += t_delta self.total_xmlrpc += 1 self.logd(' Done in %.3fs' % t_delta) self._log_xmlrpc_response(url_in, method_name, description, response, t_start, t_stop, 200) self.sleep() return response def xmlrpc_call(self, url_in, method_name, params=None, description=None): """BBB of xmlrpc, this method will be removed for 1.6.0.""" warn('Since 1.4.0 the method "xmlrpc_call" is renamed into "xmlrpc".', DeprecationWarning, stacklevel=2) return self.xmlrpc(url_in, method_name, params, description) def waitUntilAvailable(self, url, time_out=20, sleep_time=2): """Wait until url is available. Try a get on url every sleep_time until server is reached or time is out.""" time_start = time.time() while(True): try: self._browser.fetch(url, None, ok_codes=[200, 301, 302]) except SocketError: if time.time() - time_start > time_out: self.fail('Time out service %s not available after %ss' % (url, time_out)) else: return time.sleep(sleep_time) def setBasicAuth(self, login, password): """Set http basic authentication.""" self._browser.setBasicAuth(login, password) self._authinfo = '%s:%s@' % (login, password) def clearBasicAuth(self): """Remove basic authentication.""" self._browser.clearBasicAuth() self._authinfo = None def addHeader(self, key, value): """Add an http header.""" self._browser.extra_headers.append((key, value)) def setHeader(self, key, value): """Add or override an http header. If value is None, the key is removed.""" headers = self._browser.extra_headers for i, (k, v) in enumerate(headers): if k == key: if value is not None: headers[i] = (key, value) else: del headers[i] break else: if value is not None: headers.append((key, value)) def delHeader(self, key): """Remove an http header key.""" self.setHeader(key, None) def clearHeaders(self): """Remove all http headers set by addHeader or setUserAgent. Note that the Referer is also removed.""" self._browser.extra_headers = [] def setUserAgent(self, agent): """Set User-Agent http header for the next requests. If agent is None, the user agent header is removed.""" self.setHeader('User-Agent', agent) def sleep(self): """Sleeps a random amount of time. Between the predefined sleep_time_min and sleep_time_max values. """ s_min = self.sleep_time_min s_max = self.sleep_time_max if s_max != s_min: s_val = s_min + abs(s_max - s_min) * random() else: s_val = s_min # we should always sleep something thread_sleep(s_val) #------------------------------------------------------------ # Assertion helpers # def getLastUrl(self): """Return the last accessed url taking into account redirection.""" response = self._response if response is not None: return response.url return '' def getBody(self): """Return the last response content.""" response = self._response if response is not None: return response.body return '' def listHref(self, pattern=None): """Return a list of href anchor url present in the last html response. Filtering href using the pattern regex if present.""" response = self._response ret = [] if response is not None: a_links = response.getDOM().getByName('a') if a_links: ret = [getattr(x, 'href', '') for x in a_links] if pattern is not None: pat = re.compile(pattern) ret = [href for href in ret if pat.search(href) is not None] return ret def getLastBaseUrl(self): """Return the base href url.""" response = self._response if response is not None: base = response.getDOM().getByName('base') if base: return base[0].href return '' #------------------------------------------------------------ # configuration file utils # def conf_get(self, section, key, default=_marker, quiet=False): """Return an entry from the options or configuration file.""" # check for a command line options opt_key = '%s_%s' % (section, key) opt_val = getattr(self.options, opt_key, None) if opt_val: #print('[%s] %s = %s from options.' % (section, key, opt_val)) return opt_val # check for the configuration file if opt val is None # or nul try: val = self._config.get(section, key) except (NoSectionError, NoOptionError): if not quiet: self.logi('[%s] %s not found' % (section, key)) if default is _marker: raise val = default #print('[%s] %s = %s from config.' % (section, key, val)) return val def conf_getInt(self, section, key, default=_marker, quiet=False): """Return an integer from the configuration file.""" return int(self.conf_get(section, key, default, quiet)) def conf_getFloat(self, section, key, default=_marker, quiet=False): """Return a float from the configuration file.""" return float(self.conf_get(section, key, default, quiet)) def conf_getList(self, section, key, default=_marker, quiet=False, separator=None): """Return a list from the configuration file.""" value = self.conf_get(section, key, default, quiet) if value is default: return value if separator is None: separator = ':' if value.count(separator): return value.split(separator) return [value] #------------------------------------------------------------ # Extend unittest.TestCase to provide bench cycle hook # def setUpCycle(self): """Called on bench mode before a cycle start.""" pass def tearDownCycle(self): """Called after a cycle in bench mode.""" pass #------------------------------------------------------------ # logging # def logd(self, message): """Debug log.""" self.logger.debug(self.meta_method_name +': ' +message) def logdd(self, message): """Verbose Debug log.""" if self.debug_level >= 2: self.logger.debug(self.meta_method_name +': ' +message) def logi(self, message): """Info log.""" if hasattr(self, 'logger'): self.logger.info(self.meta_method_name+': '+message) else: print self.meta_method_name+': '+message def _logr(self, message, force=False): """Log a result.""" if force or not self.in_bench_mode or recording(): self.logger_result.info(message) def _open_result_log(self, **kw): """Open the result log.""" xml = ['' % ( get_version(), datetime.now().isoformat())] for key, value in kw.items(): xml.append('' % ( key, quoteattr(str(value)))) self._logr('\n'.join(xml), force=True) def _close_result_log(self): """Close the result log.""" self._logr('', force=True) def _log_response_error(self, url, rtype, description, time_start, time_stop): """Log a response that raise an unexpected exception.""" self.total_responses += 1 self.page_responses += 1 info = {} info['cycle'] = self.cycle info['cvus'] = self.cvus info['thread_id'] = self.thread_id info['suite_name'] = self.suite_name info['test_name'] = self.test_name info['step'] = self.steps info['number'] = self.page_responses info['type'] = rtype info['url'] = quoteattr(url) info['code'] = -1 info['description'] = description and quoteattr(description) or '""' info['time_start'] = time_start info['duration'] = time_stop - time_start info['result'] = 'Error' info['traceback'] = quoteattr(' '.join( traceback.format_exception(*sys.exc_info()))) message = '''''' % info self._logr(message) def _log_response(self, response, rtype, description, time_start, time_stop, log_body=False): """Log a response.""" self.total_responses += 1 self.page_responses += 1 info = {} info['cycle'] = self.cycle info['cvus'] = self.cvus info['thread_id'] = self.thread_id info['suite_name'] = self.suite_name info['test_name'] = self.test_name info['step'] = self.steps info['number'] = self.page_responses info['type'] = rtype info['url'] = quoteattr(response.url) info['code'] = response.code info['description'] = description and quoteattr(description) or '""' info['time_start'] = time_start info['duration'] = time_stop - time_start info['result'] = self.step_success and 'Successful' or 'Failure' response_start = '''' else: response_start = response_start + '>\n ' header_xml = [] for key, value in response.headers.items(): header_xml.append('
' % ( key, quoteattr(value))) headers = '\n'.join(header_xml) + '\n ' message = '\n'.join([ response_start, headers, ' \n ' % response.body, '']) self._logr(message) def _log_xmlrpc_response(self, url, method, description, response, time_start, time_stop, code): """Log a response.""" self.total_responses += 1 self.page_responses += 1 info = {} info['cycle'] = self.cycle info['cvus'] = self.cvus info['thread_id'] = self.thread_id info['suite_name'] = self.suite_name info['test_name'] = self.test_name info['step'] = self.steps info['number'] = self.page_responses info['type'] = 'xmlrpc' info['url'] = quoteattr(url + '#' + method) info['code'] = code info['description'] = description and quoteattr(description) or '""' info['time_start'] = time_start info['duration'] = time_stop - time_start info['result'] = self.step_success and 'Successful' or 'Failure' message = '''"''' % info self._logr(message) def _log_result(self, time_start, time_stop): """Log the test result.""" info = {} info['cycle'] = self.cycle info['cvus'] = self.cvus info['thread_id'] = self.thread_id info['suite_name'] = self.suite_name info['test_name'] = self.test_name info['steps'] = self.steps info['time_start'] = time_start info['duration'] = time_stop - time_start info['connection_duration'] = self.total_time info['requests'] = self.total_responses info['pages'] = self.total_pages info['xmlrpc'] = self.total_xmlrpc info['redirects'] = self.total_redirects info['images'] = self.total_images info['links'] = self.total_links info['result'] = self.test_status if self.test_status != 'Successful': info['traceback'] = 'traceback=' + quoteattr(' '.join( traceback.format_exception(*sys.exc_info()))) + ' ' else: info['traceback'] = '' text = '''''' % info self._logr(text) def _dump_content(self, response): """Dump the html content in a file. Use firefox to render the content if we are in rt viewing mode.""" dump_dir = self._dump_dir if dump_dir is None: return if getattr(response, 'code', 301) in [301, 302]: return if not response.body: return if not os.access(dump_dir, os.W_OK): os.mkdir(dump_dir, 0775) content_type = response.headers.get('content-type') if content_type == 'text/xml': ext = '.xml' else: ext = os.path.splitext(response.url)[1] if not ext.startswith('.') or len(ext) > 4: ext = '.html' file_path = os.path.abspath( os.path.join(dump_dir, '%3.3i%s' % (self.steps, ext))) f = open(file_path, 'w') f.write(response.body) f.close() if self._viewing: cmd = 'firefox -remote "openfile(file://%s,new-tab)"' % file_path ret = os.system(cmd) if ret != 0: self.logi('Failed to remote control firefox: %s' % cmd) self._viewing = False #------------------------------------------------------------ # Overriding unittest.TestCase # def __call__(self, result=None): """Run the test method. Override to log test result.""" t_start = time.time() if result is None: result = self.defaultTestResult() result.startTest(self) testMethod = getattr(self, self._TestCase__testMethodName) try: ok = False try: self.logd('Starting -----------------------------------\n\t%s' % self.conf_get(self.meta_method_name, 'description', '')) self.setUp() except KeyboardInterrupt: raise except: result.addError(self, self._TestCase__exc_info()) self.test_status = 'Error' self._log_result(t_start, time.time()) return try: testMethod() ok = True except self.failureException: result.addFailure(self, self._TestCase__exc_info()) self.test_status = 'Failure' except KeyboardInterrupt: raise except: result.addError(self, self._TestCase__exc_info()) self.test_status = 'Error' try: self.tearDown() except KeyboardInterrupt: raise except: result.addError(self, self._TestCase__exc_info()) self.test_status = 'Error' ok = False if ok: result.addSuccess(self) finally: self._log_result(t_start, time.time()) if not ok and self._stop_on_fail: result.stop() result.stopTest(self) # ------------------------------------------------------------ # testing # class DummyTestCase(FunkLoadTestCase): """Testing Funkload TestCase.""" def test_apache(self): """Simple apache test.""" self.logd('start apache test') for i in range(2): self.get('http://localhost/') self.logd('base_url: ' + self.getLastBaseUrl()) self.logd('url: ' + self.getLastUrl()) self.logd('hrefs: ' + str(self.listHref())) self.logd("Total connection time = %s" % self.total_time) if __name__ == '__main__': unittest.main() PKT;4* X**funkload/__init__.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # # """Funkload package init. $Id: __init__.py 24649 2005-08-29 14:20:19Z bdelbosc $ """ PKT;4QL1'1'funkload/XmlRpcBase.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """Base class to build XML RPC daemon server. $Id: XmlRpcBase.py 30282 2005-12-05 12:52:30Z bdelbosc $ """ import sys, os from socket import error as SocketError from time import sleep from ConfigParser import ConfigParser, NoOptionError from SimpleXMLRPCServer import SimpleXMLRPCServer from xmlrpclib import ServerProxy import logging from optparse import OptionParser, TitledHelpFormatter from utils import create_daemon, get_default_logger, close_logger from utils import trace, get_version def is_server_running(host, port): """Check if the XML/RPC server is running checking getStatus RPC.""" server = ServerProxy("http://%s:%s" % (host, port)) try: server.getStatus() except SocketError: return False return True # ------------------------------------------------------------ # rpc to manage the server # class MySimpleXMLRPCServer(SimpleXMLRPCServer): """SimpleXMLRPCServer with allow_reuse_address.""" # this property set SO_REUSEADDR which tells the operating system to allow # code to connect to a socket even if it's waiting for other potential # packets allow_reuse_address = True # ------------------------------------------------------------ # Server # class XmlRpcBaseServer: """The base class for xml rpc server.""" usage = """%prog [options] config_file Start %prog XML/RPC daemon. """ server_name = None # list RPC Methods method_names = ['stopServer', 'getStatus'] def __init__(self, argv=None): if self.server_name is None: self.server_name = self.__class__.__name__ if argv is None: argv = sys.argv conf_path, options = self.parseArgs(argv) self.default_log_path = self.server_name + '.log' self.default_pid_path = self.server_name + '.pid' self.server = None self.quit = False # read conf conf = ConfigParser() conf.read(conf_path) self.conf_path = conf_path self.host = conf.get('server', 'host') self.port = conf.getint('server', 'port') try: self.pid_path = conf.get('server', 'pid_path') except NoOptionError: self.pid_path = self.default_pid_path try: log_path = conf.get('server', 'log_path') except NoOptionError: log_path = self.default_log_path if is_server_running(self.host, self.port): trace("Server already running on %s:%s." % (self.host, self.port)) sys.exit(0) trace('Starting %s server at http://%s:%s/' % (self.server_name, self.host, self.port)) # init logger if options.verbose: level = logging.DEBUG else: level = logging.INFO if options.debug: log_to = 'file console' else: log_to = 'file' self.logger = get_default_logger(log_to, log_path, level=level, name=self.server_name) # subclass init self._init_cb(conf, options) # daemon mode if not options.debug: trace(' as daemon.\n') close_logger(self.server_name) create_daemon() # re init the logger self.logger = get_default_logger(log_to, log_path, level=level, name=self.server_name) else: trace(' in debug mode.\n') # init rpc self.initServer() def _init_cb(self, conf, options): """init procedure intend to be implemented by subclasses. This method is called before to switch in daemon mode. conf is a ConfigParser object.""" pass def logd(self, message): """Debug log.""" self.logger.debug(message) def log(self, message): """Log information.""" self.logger.info(message) def parseArgs(self, argv): """Parse programs args.""" parser = OptionParser(self.usage, formatter=TitledHelpFormatter(), version="FunkLoad %s" % get_version()) parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") parser.add_option("-d", "--debug", action="store_true", help="debug mode, server is run in forground") options, args = parser.parse_args(argv) if len(args) != 2: parser.error("Missing configuration file argument") return args[1], options def initServer(self): """init the XMLR/PC Server.""" self.log("Init XML/RPC server %s:%s." % (self.host, self.port)) server = MySimpleXMLRPCServer((self.host, self.port)) for method_name in self.method_names: self.logd('register %s' % method_name) server.register_function(getattr(self, method_name)) self.server = server def run(self): """main server loop.""" server = self.server pid = os.getpid() open(self.pid_path, "w").write(str(pid)) self.log("XML/RPC server pid=%i running." % pid) while not self.quit: server.handle_request() sleep(.5) server.server_close() self.log("XML/RPC server pid=%i stopped." % pid) os.remove(self.pid_path) __call__ = run # RPC # def stopServer(self): """Stop the server.""" self.log("stopServer request.") self.quit = True return 1 def getStatus(self): """Return a status.""" self.logd("getStatus request.") return "%s running pid = %s" % (self.server_name, os.getpid()) # ------------------------------------------------------------ # Controller # class XmlRpcBaseController: """An XML/RPC controller.""" usage = """%prog config_file action action can be: start|startd|stop|restart|status|test Execute action on the XML/RPC server. """ # the server class server_class = XmlRpcBaseServer def __init__(self, argv=None): if argv is None: argv = sys.argv conf_path, self.action, options = self.parseArgs(argv) # read conf conf = ConfigParser() conf.read(conf_path) self.host = conf.get('server', 'host') self.conf_path = conf_path self.port = conf.getint('server', 'port') self.url = 'http://%s:%s/' % (self.host, self.port) self.verbose = not options.quiet self.server = ServerProxy(self.url) def parseArgs(self, argv): """Parse programs args.""" parser = OptionParser(self.usage, formatter=TitledHelpFormatter(), version="FunkLoad %s" % get_version()) parser.add_option("-q", "--quiet", action="store_true", help="Verbose output") options, args = parser.parse_args(argv) if len(args) != 3: parser.error("Missing argument") return args[1], args[2], options def log(self, message, force=False): """Log a message.""" if force or self.verbose: trace(str(message)) def startServer(self, debug=False): """Start an XML/RPC server.""" argv = ['cmd', self.conf_path] if debug: argv.append('-dv') daemon = self.server_class(argv) daemon.run() def __call__(self, action=None): """Call the xml rpc action""" server = self.server if action is None: action = self.action is_running = is_server_running(self.host, self.port) if action == 'status': if is_running: ret = server.getStatus() self.log('%s %s.\n' % (self.url, ret)) else: self.log('No server reachable at %s.\n' % self.url) return 0 elif action in ('stop', 'restart'): if is_running: ret = server.stopServer() self.log('Server %s is stopped.\n' % self.url) is_running = False elif action == 'stop': self.log('No server reachable at %s.\n' % self.url) if action == 'restart': self('start') elif 'start' in action: if is_running: self.log('Server %s is already running.\n' % self.url) else: return self.startServer(action=='startd') elif not is_running: self.log('No server reachable at %s.\n' % self.url) return -1 elif action == 'reload': ret = server.reloadConf() self.log('done\n') elif action == 'test': return self.test() else: raise NotImplementedError('Unknow action %s' % action) return 0 # this method is done to be overriden in sub classes def test(self): """Testing the XML/RPC. Must return an exit code, 0 for success. """ ret = self.server.getStatus() self.log('Testing getStatus: %s\n' % ret) return 0 def main(): """Main""" ctl = XmlRpcBaseController() ret = ctl() sys.exit(ret) if __name__ == '__main__': main() PKH\b4@U!U!funkload/utils.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """FunkLoad common utils. $Id: utils.py 24649 2005-08-29 14:20:19Z bdelbosc $ """ import os import sys import time import logging from time import sleep from socket import error as SocketError from xmlrpclib import ServerProxy MIN_SLEEPTIME = 0.005 # minimum sleep time to let # python threads working properly def thread_sleep(seconds=0): """Sleep seconds. Insure that seconds is at least MIN_SLEEPTIME to let threads working properly.""" #if seconds: # trace('sleep %s' % seconds) sleep(max(abs(seconds), MIN_SLEEPTIME)) # ------------------------------------------------------------ # semaphores # g_recording = False g_running = False def recording(): """A semaphore to tell the running threads when to begin recording.""" global g_recording return g_recording def set_recording_flag(value): """Enable recording.""" global g_recording g_recording = value def running(): """A semaphore to tell the running threads that it should continue running ftest.""" global g_running return g_running def set_running_flag(value): """Set running mode on.""" global g_running g_running = value # ------------------------------------------------------------ # daemon # # See the Chad J. Schroeder example for a full explanation # this version does not chdir to '/' to keep relative path def create_daemon(): """Detach a process from the controlling terminal and run it in the background as a daemon. """ try: pid = os.fork() except OSError, msg: raise Exception, "%s [%d]" % (msg.strerror, msg.errno) if (pid == 0): os.setsid() try: pid = os.fork() except OSError, msg: raise Exception, "%s [%d]" % (msg.strerror, msg.errno) if (pid == 0): os.umask(0) else: os._exit(0) else: sleep(.5) os._exit(0) import resource maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if (maxfd == resource.RLIM_INFINITY): maxfd = 1024 for fd in range(0, maxfd): try: os.close(fd) except OSError: pass os.open('/dev/null', os.O_RDWR) os.dup2(0, 1) os.dup2(0, 2) return(0) # ------------------------------------------------------------ # meta method name encodage # MMN_SEP = ':' # meta method name separator def mmn_is_bench(meta_method_name): """Is it a meta method name ?.""" return meta_method_name.count(MMN_SEP) and True or False def mmn_encode(method_name, cycle, cvus, thread_id): """Encode a extra information into a method_name.""" return MMN_SEP.join((method_name, str(cycle), str(cvus), str(thread_id))) def mmn_decode(meta_method_name): """Decode a meta method name.""" if mmn_is_bench(meta_method_name): method_name, cycle, cvus, thread_id = meta_method_name.split(MMN_SEP) return (method_name, int(cycle), int(cvus), int(thread_id)) else: return (meta_method_name, 1, 0, 1) # ------------------------------------------------------------ # logging # def get_default_logger(log_to, log_path=None, level=logging.DEBUG, name='FunkLoad'): """Get a logger.""" logger = logging.getLogger(name) if logger.handlers: # already setup return logger if log_to.count("console"): hdlr = logging.StreamHandler() logger.addHandler(hdlr) if log_to.count("file") and log_path: formatter = logging.Formatter( '%(asctime)s %(levelname)s %(message)s') hdlr = logging.FileHandler(log_path) hdlr.setFormatter(formatter) logger.addHandler(hdlr) if log_to.count("xml") and log_path: if os.access(log_path, os.F_OK): os.rename(log_path, log_path + '.bak-' + str(int(time.time()))) hdlr = logging.FileHandler(log_path) logger.addHandler(hdlr) logger.setLevel(level) return logger def close_logger(name): """Close the logger.""" logger = logging.getLogger(name) for hdlr in logger.handlers: logger.removeHandler(hdlr) def trace(message): """Simple print to stdout Not thread safe.""" sys.stdout.write(message) sys.stdout.flush() # ------------------------------------------------------------ # xmlrpc # def xmlrpc_get_credential(host, port, group=None): """Get credential thru xmlrpc credential_server.""" url = "http://%s:%s" % (host, port) server = ServerProxy(url) try: return server.getCredential(group) except SocketError: raise SocketError( 'No Credential server reachable at %s, use fl-credential-ctl ' 'to start the credential server.' % url) def xmlrpc_list_groups(host, port): """Get list of groups thru xmlrpc credential_server.""" url = "http://%s:%s" % (host, port) server = ServerProxy(url) try: return server.listGroups() except SocketError: raise SocketError( 'No Credential server reachable at %s, use fl-credential-ctl ' 'to start the credential server.' % url) def xmlrpc_list_credentials(host, port, group=None): """Get list of users thru xmlrpc credential_server.""" url = "http://%s:%s" % (host, port) server = ServerProxy(url) try: return server.listCredentials(group) except SocketError: raise SocketError( 'No Credential server reachable at %s, use fl-credential-ctl ' 'to start the credential server.' % url) # ------------------------------------------------------------ # misc # def get_version(): """Retrun the FunkLoad package version.""" from pkg_resources import get_distribution return get_distribution('funkload').version _COLOR = {'green': "\x1b[32;01m", 'red': "\x1b[31;01m", 'reset': "\x1b[0m" } def red_str(text): """Return red text.""" global _COLOR return _COLOR['red'] + text + _COLOR['reset'] def green_str(text): """Return green text.""" global _COLOR return _COLOR['green'] + text + _COLOR['reset'] def is_html(text): """Simple check that return True if the text is an html page.""" if ' self.length: mid_size = (self.length - 3) / 2 other = other[:mid_size] + self.extra + other[-mid_size:] return other def is_valid_html(html=None, file_path=None, accept_warning=False): """Ask tidy if the html is valid. Return a tuple (status, errors) """ if not file_path: fd, file_path = mkstemp(prefix='fl-tidy', suffix='.html') os.write(fd, html) os.close(fd) tidy_cmd = 'tidy -errors %s' % file_path ret, output = getstatusoutput(tidy_cmd) status = False if ret == 0: status = True elif ret == 256: # got warnings if accept_warning: status = True elif ret > 512: if 'command not found' in output: raise RuntimeError('tidy command not found, please install tidy.') raise RuntimeError('Executing [%s] return: %s ouput: %s' % (tidy_cmd, ret, output)) return status, output PKT;4CYNNfunkload/ReportRenderer.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """Classes that render statistics. $Id: ReportRenderer.py 24736 2005-08-31 08:59:54Z bdelbosc $ """ import os try: import gdchart g_has_gdchart = 1 except ImportError: g_has_gdchart = 0 from shutil import copyfile # ------------------------------------------------------------ # ReST rendering # def rst_title(title, level=1): """Return a rst title.""" rst_level = ['=', '=', '-', '~'] if level == 0: rst = [rst_level[level] * len(title)] else: rst = [''] rst.append(title) rst.append(rst_level[level] * len(title)) rst.append('') return '\n'.join(rst) class BaseRst: """Base class for ReST renderer.""" fmt_int = "%7d" fmt_float = "%7.3f" fmt_percent = "%6.2f%%" fmt_deco = "======= " header = " Not implemented" nb_cols = 1 indent = 0 image_names = [] def __init__(self, stats): self.stats = stats def __repr__(self): """Render stats.""" ret = [''] ret.append(self.render_header()) ret.append(self.render_stat()) ret.append(self.render_footer()) return '\n'.join(ret) def render_images(self): """Render images link.""" indent = ' ' * self.indent rst = [] for image_name in self.image_names: rst.append(indent + " .. image:: %s.png" % image_name) rst.append('') return '\n'.join(rst) def render_header(self, with_chart=False): """Render rst header.""" deco = ' ' + self.fmt_deco * self.nb_cols indent = ' ' * self.indent ret = [] if with_chart: ret.append(self.render_images()) ret.append(indent + deco) ret.append(indent + self.header) ret.append(indent + deco) return '\n'.join(ret) def render_footer(self): """Render rst footer.""" return ' ' * (self.indent + 1) + self.fmt_deco * self.nb_cols + '\n' def render_stat(self): """Render rst stat.""" raise NotImplemented class AllResponseRst(BaseRst): """AllResponseStat rendering.""" header = " CUs RPS maxRPS TOTAL SUCCESS " \ "ERROR MIN AVG MAX" nb_cols = 9 image_names = ['requests_rps', 'requests'] def render_stat(self): """Render rst stat.""" ret = [' ' * self.indent] stats = self.stats stats.finalize() ret.append(self.fmt_int % stats.cvus) ret.append(self.fmt_float % stats.rps) ret.append(self.fmt_float % stats.rps_max) ret.append(self.fmt_int % stats.count) ret.append(self.fmt_int % stats.success) ret.append(self.fmt_percent % stats.error_percent) ret.append(self.fmt_float % stats.min) ret.append(self.fmt_float % stats.avg) ret.append(self.fmt_float % stats.max) ret = ' '.join(ret) return ret class PageRst(AllResponseRst): """Page rendering.""" header = " CUs SPPS maxSPPS TOTAL SUCCESS " \ "ERROR MIN AVG MAX" image_names = ['pages_spps', 'pages'] class ResponseRst(BaseRst): """Response rendering.""" header = " CUs TOTAL SUCCESS ERROR MIN AVG MAX" indent = 4 nb_cols = 7 image_names = ['request_'] def __init__(self, stats): BaseRst.__init__(self, stats) # XXX quick fix for #1017 self.image_names = [name + str(stats.step) + '.' + str(stats.number) for name in self.image_names] def render_stat(self): """Render rst stat.""" stats = self.stats stats.finalize() ret = [' ' * self.indent] ret.append(self.fmt_int % stats.cvus) ret.append(self.fmt_int % stats.count) ret.append(self.fmt_int % stats.success) ret.append(self.fmt_percent % stats.error_percent) ret.append(self.fmt_float % stats.min) ret.append(self.fmt_float % stats.avg) ret.append(self.fmt_float % stats.max) ret = ' '.join(ret) return ret class TestRst(BaseRst): """Test Rendering.""" header = " CUs STPS TOTAL SUCCESS ERROR MIN AVG MAX" nb_cols = 8 image_names = ['tests'] def render_stat(self): """Render rst stat.""" stats = self.stats stats.finalize() ret = [' ' * self.indent] ret.append(self.fmt_int % stats.cvus) ret.append(self.fmt_float % stats.tps) ret.append(self.fmt_int % stats.count) ret.append(self.fmt_int % stats.success) ret.append(self.fmt_percent % stats.error_percent) ret.append(self.fmt_float % stats.min) ret.append(self.fmt_float % stats.avg) ret.append(self.fmt_float % stats.max) ret = ' '.join(ret) return ret class RenderRst: """Render stats in ReST format.""" # number of slowest requests to display slowest_items = 5 def __init__(self, config, stats, error, monitor, options): self.config = config self.stats = stats self.error = error self.monitor = monitor self.options = options self.rst = [] cycles = stats.keys() cycles.sort() self.cycles = cycles if options.html: self.with_chart = True else: self.with_chart = False def getRepresentativeCycleStat(self): """Return the cycle stat with the maximum number of steps.""" stats = self.stats max_steps = 0 cycle_r = None for cycle in self.cycles: steps = stats[cycle]['response_step'].keys() if cycle_r is None: cycle_r = stats[cycle] if len(steps) > max_steps: max_steps = steps cycle_r = stats[cycle] return cycle_r def getBestStpsCycle(self): """Return the cycle with the maximum STPS.""" stats = self.stats max_stps = -1 cycle_r = None for cycle in self.cycles: if not stats[cycle].has_key('test'): continue stps = stats[cycle]['test'].tps if stps > max_stps: max_stps = stps cycle_r = cycle if cycle_r is None and len(self.cycles): # no test ends during a cycle return the first one cycle_r = self.cycles[0] return cycle_r def append(self, text): """Append text to rst output.""" self.rst.append(text) def renderConfig(self): """Render bench configuration.""" config = self.config self.append(rst_title("FunkLoad_ bench report", 0)) self.append('') date = config['time'][:19].replace('T', ' ') self.append(':date: ' + date) description = "Bench result of ``%s.%s``: " % (config['class'], config['method']) description += config['description'] self.append(':abstract: ' + description) self.append('') self.append(".. _FunkLoad: http://funkload.nuxeo.org/") self.append(".. sectnum:: :depth: 2") self.append(".. contents:: Table of contents") self.append(rst_title("Bench configuration", 2)) self.append("* Launched: %s" % date) self.append("* Test: ``%s.py %s.%s``" % (config['module'], config['class'], config['method'])) self.append("* Server: %s" % config['server_url']) self.append("* Cycles of concurrent users: %s" % config['cycles']) self.append("* Cycle duration: %ss" % config['duration']) self.append("* Sleeptime between request: from %ss to %ss" % ( config['sleep_time_min'], config['sleep_time_max'])) self.append("* Sleeptime between test case: %ss" % config['sleep_time']) self.append("* Startup delay between thread: %ss" % config['startup_delay']) self.append("* FunkLoad_ version: %s" % config['version']) self.append("") def renderTestContent(self, test): """Render global information about test content.""" self.append(rst_title("Bench content", 2)) config = self.config self.append('The test ``%s.%s`` contains: ' % (config['class'], config['method'])) self.append('') self.append("* %s page(s)" % test.pages) self.append("* %s redirect(s)" % test.redirects) self.append("* %s link(s)" % test.links) self.append("* %s image(s)" % test.images) self.append("* %s XML RPC call(s)" % test.xmlrpc) self.append('') self.append('The bench contains:') total_tests = 0 total_tests_error = 0 total_pages = 0 total_pages_error = 0 total_responses = 0 total_responses_error = 0 stats = self.stats for cycle in self.cycles: if stats[cycle].has_key('test'): total_tests += stats[cycle]['test'].count total_tests_error += stats[cycle]['test'].error if stats[cycle].has_key('page'): stat = stats[cycle]['page'] stat.finalize() total_pages += stat.count total_pages_error += stat.error if stats[cycle].has_key('response'): total_responses += stats[cycle]['response'].count total_responses_error += stats[cycle]['response'].error self.append('') self.append("* %s tests" % total_tests + ( total_tests_error and ", %s error(s)" % total_tests_error or '')) self.append("* %s pages" % total_pages + ( total_pages_error and ", %s error(s)" % total_pages_error or '')) self.append("* %s requests" % total_responses + ( total_responses_error and ", %s error(s)" % total_responses_error or '')) self.append('') def renderCyclesStat(self, key, title, description=''): """Render a type of stats for all cycles.""" stats = self.stats first = True if key == 'test': klass = TestRst elif key == 'page': klass = PageRst elif key == 'response': klass = AllResponseRst self.append(rst_title(title, 2)) if description: self.append(description) self.append('') renderer = None for cycle in self.cycles: if not stats[cycle].has_key(key): continue renderer = klass(stats[cycle][key]) if first: self.append(renderer.render_header(self.with_chart)) first = False self.append(renderer.render_stat()) if renderer is not None: self.append(renderer.render_footer()) else: self.append('Sorry no %s have finished during a cycle, ' 'the cycle duration is too short.\n' % key) def renderCyclesStepStat(self, step): """Render a step stats for all cycle.""" stats = self.stats first = True renderer = None for cycle in self.cycles: stat = stats[cycle]['response_step'].get(step) if stat is None: continue renderer = ResponseRst(stat) if first: self.append(renderer.render_header(self.with_chart)) first = False self.append(renderer.render_stat()) if renderer is not None: self.append(renderer.render_footer()) def renderPageDetail(self, cycle_r): """Render a page detail.""" self.append(rst_title("Page detail stats", 2)) cycle_r_steps = cycle_r['response_step'] steps = cycle_r['response_step'].keys() steps.sort() self.steps = steps current_step = -1 for step_name in steps: a_step = cycle_r_steps[step_name] if a_step.step != current_step: current_step = a_step.step self.append(rst_title("PAGE %s: %s" % ( a_step.step, a_step.description or a_step.url), 3)) self.append('* Req: %s, %s, url %s' % (a_step.number, a_step.type, a_step.url)) self.append('') self.renderCyclesStepStat(step_name) def renderMonitors(self): """Render all monitored hosts.""" if not self.monitor or not self.with_chart: return self.append(rst_title("Monitored hosts", 2)) for host in self.monitor.keys(): self.renderMonitor(host) def renderMonitor(self, host): """Render a monitored host.""" description = self.config.get(host, '') self.append(rst_title("%s: %s" % (host, description), 3)) self.append("**Load average**\n\n.. image:: %s_load.png\n" % host) self.append("**Memory usage**\n\n.. image:: %s_mem.png\n" % host) self.append("**Network traffic**\n\n.. image:: %s_net.png\n" % host) def renderSlowestRequests(self, number): """Render the n slowest requests of the best cycle.""" stats = self.stats self.append(rst_title("%i Slowest requests"% number, 2)) cycle = self.getBestStpsCycle() cycle_name = None if not (cycle and stats[cycle].has_key('response_step')): return steps = stats[cycle]['response_step'].keys() items = [] for step_name in steps: stat = stats[cycle]['response_step'][step_name] stat.finalize() items.append((stat.avg, stat.step, stat.type, stat.url, stat.description)) if not cycle_name: cycle_name = stat.cvus items.sort() items.reverse() self.append('Slowest average response time during the best cycle ' 'with **%s** CUs:\n' % cycle_name) for item in items[:number]: self.append('* In page %s %s: %s took **%.3fs**\n' ' `%s`' % ( item[1], item[2], item[3], item[0], item[4])) def renderErrors(self): """Render error list.""" if not len(self.error): return self.append(rst_title("Failures and Errors", 2)) for status in ('Failure', 'Error'): if not self.error.has_key(status): continue stats = self.error[status] errors = {} for stat in stats: header = stat.header key = (stat.code, header.get('bobo-exception-file'), header.get('bobo-exception-line'), ) err_list = errors.setdefault(key, []) err_list.append(stat) err_types = errors.keys() err_types.sort() self.append(rst_title(status + 's', 3)) for err_type in err_types: stat = errors[err_type][0] if err_type[1]: self.append('* %s time(s), code: %s, %s\n' ' in %s, line %s: %s' %( len(errors[err_type]), err_type[0], header.get('bobo-exception-type'), err_type[1], err_type[2], header.get('bobo-exception-value'))) else: traceback = stat.traceback and stat.traceback.replace( 'File ', '\n File ') or 'No traceback.' self.append('* %s time(s), code: %s::\n\n' ' %s\n' %( len(errors[err_type]), err_type[0], traceback)) def __repr__(self): self.renderConfig() if not self.cycles: self.append('No cycle found') return '\n'.join(self.rst) cycle_r = self.getRepresentativeCycleStat() if cycle_r.has_key('test'): self.renderTestContent(cycle_r['test']) self.renderCyclesStat('test', 'Test stats', 'The number of Successful **Test** Per Second ' '(STPS) over Concurrent Users (CUs).') self.renderCyclesStat('page', 'Page stats', 'The number of Successful **Page** Per Second ' '(SPPS) over Concurrent Users (CUs).\n' 'Note that an XML RPC call count like a page.') self.renderCyclesStat('response', 'Request stats', 'The number of **Request** Per Second (RPS) ' 'successful or not over Concurrent Users (CUs).') self.renderSlowestRequests(self.slowest_items) self.renderMonitors() self.renderPageDetail(cycle_r) self.renderErrors() return '\n'.join(self.rst) # ------------------------------------------------------------ # HTML rendering # class RenderHtml(RenderRst): """Render stats in html. Simply render stuff in ReST than ask docutils to build an html doc. """ chart_size = (350, 250) big_chart_size = (640, 320) color_success = 0x00ff00 color_error = 0xff0000 color_time = 0x0000ff color_time_min_max = 0xccccee color_grid = 0xcccccc color_line = 0x333333 color_plot = 0x003a6b color_bg = 0xffffff color_line = 0x000000 def __init__(self, config, stats, error, monitor, options, css_file=None): RenderRst.__init__(self, config, stats, error, monitor, options) self.css_file = css_file self.report_dir = self.css_path = self.rst_path = self.html_path = None def prepareReportDirectory(self): """Create a folder to save the report.""" # init output dir output_dir = os.path.abspath(self.options.output_dir) if not os.access(output_dir, os.W_OK): os.mkdir(output_dir, 0775) # init report dir config = self.config stamp = config['time'][:19].replace(':', '-') report_dir = os.path.join(output_dir, '%s-%s' % (config['id'], stamp)) if not os.access(report_dir, os.W_OK): os.mkdir(report_dir, 0775) self.report_dir = report_dir def createRstFile(self): """Create the ReST file.""" rst_path = os.path.join(self.report_dir, 'index.rst') f = open(rst_path, 'w') f.write(str(self)) f.close() self.rst_path = rst_path def copyCss(self): """Copy the css to the report dir.""" css_file = self.css_file if css_file is not None: copyfile(css_file, css_dest_path) css_dest_path = os.path.join(self.report_dir, css_file) else: # use the one in our package_data from pkg_resources import resource_string css_content = resource_string('funkload', 'data/funkload.css') css_dest_path = os.path.join(self.report_dir, 'funkload.css') f = open(css_dest_path, 'w') f.write(css_content) f.close() self.css_path = css_dest_path def copyXmlResult(self): """Make a copy of the xml result.""" xml_src_path = self.options.xml_file xml_dest_path = os.path.join(self.report_dir, 'funkload.xml') copyfile(xml_src_path, xml_dest_path) def generateHtml(self): """Ask docutils to convert our rst file into html.""" from docutils.core import publish_cmdline html_path = os.path.join(self.report_dir, 'index.html') cmdline = "-t --stylesheet-path=%s %s %s" % (self.css_path, self.rst_path, html_path) cmd_argv = cmdline.split(' ') publish_cmdline(writer_name='html', argv=cmd_argv) self.html_path = html_path def render(self): """Create the html report.""" self.prepareReportDirectory() self.createRstFile() self.copyCss() try: self.generateHtml() pass except ImportError: print "WARNING docultils not found, no html output." return '' self.createCharts() self.copyXmlResult() return self.html_path __call__ = render # Charts ------------------------------------------------------------ # XXX need some factoring below def getChartSize(self, cvus): """Compute the right size lenght depending on the number of cvus.""" size = list(self.chart_size) len_cvus = len(cvus) if len_cvus > 7: size = list(self.big_chart_size) size[0] = min(800, 50 * len(cvus)) return tuple(size) def createCharts(self): """Create all charts.""" global g_has_gdchart if not g_has_gdchart: return self.createMonitorCharts() self.createTestChart() self.createPageChart() self.createAllResponseChart() for step_name in self.steps: self.createResponseChart(step_name) def createTestChart(self): """Create the test chart.""" image_path = str(os.path.join(self.report_dir, 'tests.png')) stats = self.stats errors = [] stps = [] cvus = [] has_error = False for cycle in self.cycles: if not stats[cycle].has_key('test'): continue test = stats[cycle]['test'] stps.append(test.tps) error = test.error_percent if error: has_error = True errors.append(error) cvus.append(str(test.cvus)) color_error = has_error and self.color_error or self.color_bg gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_success, self.color_success), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, line_color=self.color_line, title='Successful Tests Per Second', xtitle='CUs', ylabel_fmt='%.2f', ylabel2_fmt='%.2f %%', ytitle='STPS', ytitle2="Errors", ylabel_density=50, ytitle2_color=color_error, ylabel2_color=color_error, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_LINE_BAR, self.getChartSize(cvus), image_path, cvus, stps, errors) def createPageChart(self): """Create the page chart.""" image_path = str(os.path.join(self.report_dir, 'pages.png')) image2_path = str(os.path.join(self.report_dir, 'pages_spps.png')) stats = self.stats errors = [] delay = [] delay_max = [] delay_min = [] spps = [] cvus = [] has_error = False for cycle in self.cycles: page = stats[cycle]['page'] delay.append(page.avg) delay_min.append(page.min) delay_max.append(page.max) spps.append(page.rps) error = page.error_percent if error: has_error = True errors.append(error) cvus.append(str(page.cvus)) color_error = has_error and self.color_error or self.color_bg gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_time_min_max, self.color_time_min_max, self.color_time), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, grid_color=self.color_grid, line_color=self.color_line, title='Page response time', xtitle='CUs', ylabel_fmt='%.2fs', ylabel2_fmt='%.2f %%', ytitle='Duration', ytitle2="Errors", ylabel_density=50, hlc_style=gdchart.GDC_HLC_I_CAP+gdchart. GDC_HLC_CONNECTING, ytitle2_color=color_error, ylabel2_color=color_error, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_HLC_BAR, self.getChartSize(cvus), image_path, cvus, (delay_max, delay_min, delay), errors) gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_success, self.color_success), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, line_color=self.color_line, title='Successful Pages Per Second', xtitle='CUs', ylabel_fmt='%.2f', ylabel2_fmt='%.2f %%', ytitle='SPPS', ytitle2="Errors", ylabel_density=50, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_LINE_BAR, self.getChartSize(cvus), image2_path, cvus, spps, errors) def createAllResponseChart(self): """Create global responses chart.""" image_path = str(os.path.join(self.report_dir, 'requests.png')) image2_path = str(os.path.join(self.report_dir, 'requests_rps.png')) stats = self.stats errors = [] delay = [] delay_max = [] delay_min = [] rps = [] cvus = [] has_error = False for cycle in self.cycles: resp = stats[cycle]['response'] delay.append(resp.avg) delay_min.append(resp.min) delay_max.append(resp.max) rps.append(resp.rps) error = resp.error_percent if error: has_error = True errors.append(error) cvus.append(str(resp.cvus)) color_error = has_error and self.color_error or self.color_bg gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_time_min_max, self.color_time_min_max, self.color_time), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, grid_color=self.color_grid, line_color=self.color_line, title='Request response time', xtitle='CUs', ylabel_fmt='%.2fs', ylabel2_fmt='%.2f %%', ytitle='Duration', ytitle2="Errors", ylabel_density=50, hlc_style=gdchart.GDC_HLC_I_CAP+gdchart. GDC_HLC_CONNECTING, ytitle2_color=color_error, ylabel2_color=color_error, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_HLC_BAR, self.getChartSize(cvus), image_path, cvus, (delay_max, delay_min, delay), errors) gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_success, self.color_success), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, line_color=self.color_line, title='Requests Per Second', xtitle='CUs', ylabel_fmt='%.2f', ylabel2_fmt='%.2f %%', ytitle='RPS', ytitle2="Errors", ylabel_density=50, ytitle2_color=color_error, ylabel2_color=color_error, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_LINE_BAR, self.getChartSize(cvus), image2_path, cvus, rps, errors) def createResponseChart(self, step): """Create responses chart.""" stats = self.stats errors = [] delay = [] delay_max = [] delay_min = [] cvus = [] number = 0 has_error = False for cycle in self.cycles: resp = stats[cycle]['response_step'].get(step) if resp is None: delay.append(None) delay_min.append(None) delay_max.append(None) errors.append(None) cvus.append('?') else: delay.append(resp.avg) delay_min.append(resp.min) delay_max.append(resp.max) error = resp.error_percent if error: has_error = True errors.append(error) cvus.append(str(resp.cvus)) number = resp.number image_path = str(os.path.join(self.report_dir, 'request_%s.png' % step)) title = str('Request %s response time' % step) color_error = has_error and self.color_error or self.color_bg gdchart.option(format=gdchart.GDC_PNG, set_color=(self.color_time_min_max, self.color_time_min_max, self.color_time), vol_color=self.color_error, bg_color=self.color_bg, plot_color=self.color_plot, grid_color=self.color_grid, line_color=self.color_line, title=title, xtitle='CUs', ylabel_fmt='%.2fs', ylabel2_fmt='%.2f %%', ytitle='Duration', ytitle2="Errors", ylabel_density=50, hlc_style=gdchart.GDC_HLC_I_CAP+gdchart. GDC_HLC_CONNECTING, ytitle2_color=color_error, ylabel2_color=color_error, requested_ymin=0.0) gdchart.chart(gdchart.GDC_3DCOMBO_HLC_BAR, self.getChartSize(cvus), image_path, cvus, (delay_max, delay_min, delay), errors) # monitoring charts def createMonitorCharts(self): """Create all montirored server charts.""" if not self.monitor or not self.with_chart: return self.append(rst_title("Monitored hosts", 2)) for host in self.monitor.keys(): self.createMonitorChart(host) def createMonitorChart(self, host): """Create monitrored server charts.""" stats = self.monitor[host] time_start = float(stats[0].time) times = [] for stat in stats: test, cycle, cvus = stat.key.split(':') times.append(str('%ss / %s CUs' % ( int(float(stat.time) - time_start), cvus))) mem_total = int(stats[0].memTotal) mem_used = [mem_total - int(x.memFree) for x in stats] mem_used_start = mem_used[0] mem_used = [x - mem_used_start for x in mem_used] swap_total = int(stats[0].swapTotal) swap_used = [swap_total - int(x.swapFree) for x in stats] swap_used_start = swap_used[0] swap_used = [x - swap_used_start for x in swap_used] load_avg_1 = [float(x.loadAvg1min) for x in stats] load_avg_5 = [float(x.loadAvg5min) for x in stats] load_avg_15 = [float(x.loadAvg15min) for x in stats] net_in = [None] net_out = [None] cpu_usage = [0] for i in range(1, len(stats)): if not (hasattr(stats[i], 'CPUTotalJiffies') and hasattr(stats[i-1], 'CPUTotalJiffies')): cpu_usage.append(None) else: dt = ((long(stats[i].IDLTotalJiffies) + long(stats[i].CPUTotalJiffies)) - (long(stats[i-1].IDLTotalJiffies) + long(stats[i-1].CPUTotalJiffies))) if dt: ttl = (float(long(stats[i].CPUTotalJiffies) - long(stats[i-1].CPUTotalJiffies)) / dt) else: ttl = None cpu_usage.append(ttl) if not (hasattr(stats[i], 'receiveBytes') and hasattr(stats[i-1], 'receiveBytes')): net_in.append(None) else: net_in.append((int(stats[i].receiveBytes) - int(stats[i-1].receiveBytes)) / (1024 * (float(stats[i].time) - float(stats[i-1].time)))) if not (hasattr(stats[i], 'transmitBytes') and hasattr(stats[i-1], 'transmitBytes')): net_out.append(None) else: net_out.append((int(stats[i].transmitBytes) - int(stats[i-1].transmitBytes))/ (1024 * (float(stats[i].time) - float(stats[i-1].time)))) image_path = str(os.path.join(self.report_dir, '%s_load.png' % host)) title = str('%s: cpu usage (green 1=100%%) and loadavg 1(red), ' '5 and 15 min' % host) gdchart.option(format=gdchart.GDC_PNG, set_color=(0x00ff00, 0xff0000, 0x0000ff), vol_color=0xff0000, bg_color=self.color_bg, plot_color=self.color_plot, line_color=self.color_line, title=title, xtitle='time and CUs', ylabel_fmt='%.2f', ytitle='loadavg', ylabel_density=50, requested_ymin=0.0) gdchart.chart(gdchart.GDC_LINE, self.big_chart_size, image_path, times, cpu_usage, load_avg_1, load_avg_5, load_avg_15) title = str('%s memory (green) and swap (red) usage' % host) image_path = str(os.path.join(self.report_dir, '%s_mem.png' % host)) gdchart.option(format=gdchart.GDC_PNG, title=title, ylabel_fmt='%.0f kB', ytitle='memory used kB') gdchart.chart(gdchart.GDC_LINE, self.big_chart_size, image_path, times, mem_used, swap_used) title = str('%s network in (green)/out (red)' % host) image_path = str(os.path.join(self.report_dir, '%s_net.png' % host)) gdchart.option(format=gdchart.GDC_PNG, title=title, ylabel_fmt='%.0f kB/s', ytitle='network') gdchart.chart(gdchart.GDC_LINE, self.big_chart_size, image_path, times, net_in, net_out) PKH\b4&22funkload/Recorder.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """TCPWatch FunkLoad Test Recorder. Requires tcpwatch.py available at: * http://hathawaymix.org/Software/TCPWatch/tcpwatch-1.3.tar.gz Credits goes to Ian Bicking for parsing tcpwatch files. $Id: Recorder.py 33463 2006-02-24 14:09:21Z bdelbosc $ """ import os import sys import re from cStringIO import StringIO from optparse import OptionParser, TitledHelpFormatter from tempfile import mkdtemp import rfc822 from cgi import FieldStorage from urlparse import urlsplit from utils import truncate, trace, get_version class Request: """Store a tcpwatch request.""" def __init__(self, file_path): """Load a tcpwatch request file.""" self.file_path = file_path f = open(file_path, 'rb') line = f.readline().split(None, 2) if not line: trace('# Warning: empty first line on %s\n' % self.file_path) line = f.readline().split(None, 2) self.method = line[0] url = line[1] scheme, host, path, query, fragment = urlsplit(url) self.host = scheme + '://' + host self.rurl = url[len(self.host):] self.url = url self.path = path self.version = line[2].strip() self.headers = dict(rfc822.Message(f).items()) self.body = f.read() f.close() def extractParam(self): """Turn muti part encoded form into params.""" environ = { 'CONTENT_TYPE': self.headers['content-type'], 'CONTENT_LENGTH': self.headers['content-length'], 'REQUEST_METHOD': 'POST', } form = FieldStorage(fp=StringIO(self.body), environ=environ, keep_blank_values=True) params = [] try: keys = form.keys() except TypeError: trace('# Warning: skipping invalid http post param in file: %s ' 'may be an xmlrpc call ?\n' % self.file_path) return params for key in keys: if not isinstance(form[key], list): values = [form[key]] else: values = form[key] for form_value in values: filename = form_value.filename if filename is None: params.append([key, form_value.value]) else: # got a file upload filename = filename or '' params.append([key, 'Upload("%s")' % filename]) if filename: if os.path.exists(filename): trace('# Warning: uploaded file: %s already' ' exists, keep it.\n' % filename) else: trace('# Saving uploaded file: %s\n' % filename) f = open(filename, 'w') f.write(str(form_value.value)) f.close() return params def __repr__(self): params = '' if self.body: params = self.extractParam() return '' % ( self.method, self.url, str(params)) class Response: """Store a tcpwatch response.""" def __init__(self, file_path): """Load a tcpwatch response file.""" self.file_path = file_path f = open(file_path, 'rb') line = f.readline().split(None, 2) self.version = line[0] self.status_code = line[1].strip() if len(line) > 2: self.status_message = line[2].strip() else: self.status_message = '' self.headers = dict(rfc822.Message(f).items()) self.body = f.read() f.close() def __repr__(self): return '' % ( self.status_code, self.headers.get('content-type'), self.status_message) class RecorderProgram: """A tcpwatch to funkload recorder.""" USAGE = """%prog [options] [test_name] %prog launch a TCPWatch proxy and record activities, then output a FunkLoad script or generates a FunkLoad unit test if test_name is specified. The default proxy port is 8090. Note that tcpwatch.py executable must be accessible from your env. See http://funkload.nuxeo.org/ for more information. Examples ======== %prog foo_bar Run a proxy and create a FunkLoad test case, generates test_FooBar.py and FooBar.conf file. To test it: fl-run-test -dV test_FooBar.py %prog -p 9090 Run a proxy on port 9090, output script to stdout. %prog -i /tmp/tcpwatch Convert a tcpwatch capture into a script. """ def __init__(self, argv=None): if argv is None: argv = sys.argv[1:] self.verbose = False self.tcpwatch_path = None self.prefix = 'watch' self.port = "8090" self.server_url = None self.class_name = None self.test_name = None self.script_path = None self.configuration_path = None self.parseArgs(argv) def parseArgs(self, argv): """Parse programs args.""" parser = OptionParser(self.USAGE, formatter=TitledHelpFormatter(), version="FunkLoad %s" % get_version()) parser.add_option("-v", "--verbose", action="store_true", help="Verbose output") parser.add_option("-p", "--port", type="string", dest="port", default=self.port, help="The proxy port.") parser.add_option("-i", "--tcp-watch-input", type="string", dest="tcpwatch_path", default=None, help="Path to an existing tcpwatch capture.") options, args = parser.parse_args(argv) if len(args) == 1: test_name = args[0] else: test_name = None self.verbose = options.verbose self.tcpwatch_path = options.tcpwatch_path self.port = options.port if test_name: class_name = ''.join([x.capitalize() for x in re.split('_|-', test_name)]) self.test_name = test_name self.class_name = class_name self.script_path = './test_%s.py' % class_name self.configuration_path = './%s.conf' % class_name def startProxy(self): """Start a tcpwatch session.""" self.tcpwatch_path = mkdtemp('_funkload') cmd = 'tcpwatch.py -p %s -s -r %s' % (self.port, self.tcpwatch_path) if self.verbose: cmd += ' | grep "T http"' else: cmd += ' > /dev/null' trace("Hit Ctrl-C to stop recording.\n") os.system(cmd) def searchFiles(self): """Search tcpwatch file.""" items = {} prefix = self.prefix for filename in os.listdir(self.tcpwatch_path): if not filename.startswith(prefix): continue name, ext = os.path.splitext(filename) name = name[len(self.prefix):] ext = ext[1:] if ext == 'errors': trace("Error in response %s\n" % name) continue assert ext in ('request', 'response'), "Bad extension: %r" % ext items.setdefault(name, {})[ext] = os.path.join( self.tcpwatch_path, filename) items = items.items() items.sort() return [(v['request'], v['response']) for name, v in items if v.has_key('response')] def extractRequests(self, files): """Filter and extract request from tcpwatch files.""" last_code = None filter_ctypes = ('image', 'css', 'javascript') filter_url = ('.jpg', '.png', '.gif', '.css', '.js') requests = [] for request_path, response_path in files: response = Response(response_path) request = Request(request_path) if self.server_url is None: self.server_url = request.host ctype = response.headers.get('content-type', '') url = request.url if request.method != "POST" and ( last_code in ('301', '302') or [x for x in filter_ctypes if x in ctype] or [x for x in filter_url if url.endswith(x)]): last_code = response.status_code continue last_code = response.status_code requests.append(request) return requests def reindent(self, code, indent=8): """Improve indentation.""" spaces = ' ' * indent code = code.replace('], [', '],\n%s [' % spaces) code = code.replace('[[', '[\n%s [' % spaces) code = code.replace(', description=', ',\n%s description=' % spaces) code = code.replace('self.', '\n%sself.' % spaces) return code def convertToFunkLoad(self, request): """return a funkload python instruction.""" text = [] if request.host != self.server_url: text.append('self.%s("%s"' % (request.method.lower(), request.url)) else: text.append('self.%s(server_url + "%s"' % ( request.method.lower(), request.rurl)) description = "%s %s" % (request.method.capitalize(), request.path | truncate(42)) if request.body: params = ('params=%s' % request.extractParam()) params = re.sub("'Upload\(([^\)]*)\)'", "Upload(\\1)", params) text.append(', ' + params) text.append(', description="%s")' % description) return ''.join(text) def extractScript(self): """Convert a tcpwatch capture into a FunkLoad script.""" files = self.searchFiles() requests = self.extractRequests(files) code = [self.convertToFunkLoad(request) for request in requests] if not code: trace("Sorry no action recorded.\n") return '' code.insert(0, '') return self.reindent('\n'.join(code)) def writeScript(self, script): """Write the FunkLoad test script.""" trace('Creating script: %s.\n' % self.script_path) from pkg_resources import resource_string tpl = resource_string('funkload', 'data/ScriptTestCase.tpl') content = tpl % {'script': script, 'test_name': self.test_name, 'class_name': self.class_name} if os.path.exists(self.script_path): trace("Error file %s already exists.\n" % self.script_path) return f = open(self.script_path, 'w') f.write(content) f.close() def writeConfiguration(self): """Write the FunkLoad configuration test script.""" trace('Creating configuration file: %s.\n' % self.configuration_path) from pkg_resources import resource_string tpl = resource_string('funkload', 'data/ConfigurationTestCase.tpl') content = tpl % {'server_url': self.server_url, 'test_name': self.test_name, 'class_name': self.class_name} if os.path.exists(self.configuration_path): trace("Error file %s already exists.\n" % self.configuration_path) return f = open(self.configuration_path, 'w') f.write(content) f.close() def run(self): """run it.""" if self.tcpwatch_path is None: self.startProxy() script = self.extractScript() if not script: return if self.test_name is not None: self.writeScript(script) self.writeConfiguration() else: print script if __name__ == '__main__': RecorderProgram().run() PKT;4o|m<>>funkload/CredentialBase.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """Interface of a Credential Server. $Id: CredentialBase.py 30282 2005-12-05 12:52:30Z bdelbosc $ """ class CredentialBaseServer: """Interface of a Credential server.""" def getCredential(self, group=None): """Return a credential (login, password). If group is not None return a credential that belong to the group. """ def listCredentials(self, group=None): """Return a list of all credentials. If group is not None return a list of credentials that belong to the group. """ def listGroups(self): """Return a list of all groups.""" PKH\b4*88funkload/CPS340TestCase.py# (C) Copyright 2005 Nuxeo SAS # Author: bdelbosc@nuxeo.com # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as published # by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA # 02111-1307, USA. # """FunkLoad test case for Nuxeo CPS. $Id: CPSTestCase.py 24728 2005-08-31 08:13:54Z bdelbosc $ """ import time import random from Lipsum import Lipsum from ZopeTestCase import ZopeTestCase from webunit.utility import Upload class CPSTestCase(ZopeTestCase): """Common CPS tasks. setUp must set a server_url attribute.""" cps_test_case_version = (3, 4, 0) server_url = None _lipsum = Lipsum() _all_langs = ['en', 'fr', 'de', 'it', 'es', 'pt_BR', 'nl', 'mg', 'ro', 'eu'] _default_langs = _all_langs[:4] _default_extensions = ['CPSForum:default', 'CPSSkins:cps3', 'CPSSubscriptions:default'] _cps_login = None # ------------------------------------------------------------ # cps actions # def cpsLogin(self, login, password, comment=None): """Log in a user. Raise on invalid credential.""" self._cps_login = None params = [['__ac_name', login], ['__ac_password', password], ['__ac_persistent', 'on'], ['submit', 'Login'], ] self.post("%s/logged_in" % self.server_url, params, description="Log in user [%s] %s" % (login, comment or '')) # assume we are logged in if we have a logout link... self.assert_([link for link in self.listHref() if link.endswith('logout')], 'invalid credential: [%s:%s].' % (login, password)) self._cps_login = login def cpsLogout(self): """Log out the current user.""" if self._cps_login is not None: self.get('%s/logout' % self.server_url, description="Log out [%s]" % self._cps_login) def cpsCreateSite(self, admin_id, admin_pwd, manager_id, manager_password, manager_mail, langs=None, title=None, description=None, interface="portlets", zope_url=None, site_id=None, extensions=None): """Create a CPS Site. if zope_url or site_id is not provided guess them from the server_url. """ if zope_url is None or site_id is None: zope_url, site_id = self.cpsGuessZopeUrl() self.setBasicAuth(admin_id, admin_pwd) params = { 'site_id': site_id, 'title': title or "FunkLoad CPS Portal", 'manager_id': manager_id, 'password': manager_password, 'password_confirm': manager_password, 'manager_email': manager_mail, 'manager_firstname': 'Manager', 'manager_lastname': 'CPS Manager', 'extension_ids:list': extensions or self._default_extensions, 'description': description or "A funkload cps test site", 'languages:list': langs or self._default_langs, 'submit': 'Add', 'profile_id': 'CPSDefault:default'} self.post("%s/manage_addProduct/CPSDefault/addConfiguredCPSSite" % zope_url, params, description="Create a CPS Site") self.clearBasicAuth() def cpsCreateGroup(self, group_name): """Create a cps group.""" server_url = self.server_url params = [["dirname", "groups"], ["id", ""], ["widget__group", group_name], ["widget__members:tokens:default", ""], ["cpsdirectory_entry_create_form:method", "Create"]] self.post("%s/" % server_url, params) self.assert_(self.getLastUrl().find('psm_entry_created')!=-1, 'Failed to create group %s' % group_name) def cpsVerifyGroup(self, grou