by Eliot http://www.saltycrane.com/blog/
2010-06-17
This presentation is an HTML5 website (copied from http://ihumanable.com/jquery/presentation.html)
Use right arrow key to advance, ctrl +/- to zoom
Source code: http://github.com/saltycrane/live-log-analyzer
Live demo: http://184.73.64.212:9000/graph
(takes 5 seconds to update right-hand side plots, 5 minutes to update left-hand side plots)
Starts a subprocess and runs a ssh command to get data from a remote server [ sources.py]
from subprocess import Popen, PIPE, STDOUT class SourceBase(object): def start_stream(self): self._assemble_ssh_command() self.p = Popen(self.ssh_cmd, shell=True, stdout=PIPE, stderr=STDOUT) def _assemble_ssh_command(self): """Use the dict, self.ssh_params and return the ssh command to run """ def get_line(self): """Use self.p.stdout.readline() to get a line of data """ def filter(self, line): """Return the line after optionally altering the line """ class SourceLog(SourceBase): def __init__(self, ssh_params, filepath, encoding='utf-8'): self.ssh_params = ssh_params self.encoding = encoding self.cmd = 'tail --follow=name %s' % filepath
Use regular expressions to parse data (e.g. from a log file) [ parsers.py]
class BaseParser(object): @classmethod def parse_line(cls, line): """Parse one line of data using the regex pattern, cls.pattern Return the data as a dict """ @classmethod def convert_time(cls, time_str): """Convert date string to datetime object """ @classmethod def post_process(cls, data): """Return optionally modified data """ class NginxCacheParser(BaseParser): date_format = "%d/%b/%Y:%H:%M:%S" pattern = ' '.join([ r'\*\*\*(?P<time>\S+ -\d{4})', r'\[(?P<ip>[\d\.]+)\]', r'(?P<status>HIT|MISS|EXPIRED|UPDATING|STALE|-)', r'ups_ad: (?P<ups_ad>.*)', r'ups_rt: (?P<ups_rt>.*)', # ... ])
Parameters pertaining to the getting and storing of data [ settings.py]
from sources import SourceLog from parsers import NginxCacheParser NG_CACHE_COLL = 'ng_cache' # the name of the MongoDB collection HOSTS = { 'us-ng1': {'host': 'us-ng1', 'hostname': '111.111.111.15', 'identityfile': '/home/saltycrane/sshkeys/myprivatekey', 'user': 'myusername', }, # ... } SOURCES_SETTINGS = [ {'source': (SourceLog, {'ssh_params': HOSTS['us-ng1'], 'filepath': '/var/log/nginx/cache.log', 'encoding': 'latin-1',}), 'parser': NginxCacheParser, 'collection': NG_CACHE_COLL, }, {'source': (SourceLog, {'ssh_params': HOSTS['us-ng2'], 'filepath': '/var/log/nginx/cache.log', 'encoding': 'latin-1',}), 'parser': NginxCacheParser, 'collection': NG_CACHE_COLL, }, # ... ]
Executive which gets data, parses it, and stores it to MongoDB [ sourceexecutive.py]
class SourceExecutive(object): def __init__(self, settings): self.collection = settings['collection'] self.parser = settings['parser'] self.source_class = settings['source'][0] self.kwargs = settings['source'][1] def start(self): self.start_source_stream() self.connect_to_mongo() self.store_data() def start_source_stream(self): self.source = self.source_class(**self.kwargs) self.source.start_stream() def connect_to_mongo(self): """Connect to the MongoDB collection, self.collection """ def store_data(self): while True: line = self.source.get_line() data = self.parser.parse_line(line) self.mongo.insert(data)
from settings import SOURCES_SETTINGS for ss in SOURCES_SETTINGS: s = SourceExecutive(settings) s.start()
This class is used to query MongoDB for the average upstream response time grouped by upstream server. [ analyzers.py]
class AvgUpstreamResponseTimePerServerLoggedIn(object): def __init__(self, mongo_collection, logged_in): self.mongo = mongo_collection self.logged_in = logged_in def run(self, time_limit): self.mongo.ensure_index([('time', ASCENDING), ('ups_rt', ASCENDING), ('ups_ad', ASCENDING), ('wp_login', ASCENDING), ]) result = self.mongo.group( key=['ups_ad'], condition={'time': {'$gt': time_limit[0], '$lt': time_limit[1]}, 'ups_rt': {'$ne': '-'}, 'wp_login': re.compile(self.logged_in), }, initial={'count': 0, 'total': 0}, reduce=textwrap.dedent(''' function(doc, out) { out.count++; out.total += parseFloat(doc.ups_rt); }'''), finalize='function(out) {out.avg = out.total / out.count}', ) result = sorted(result, key=lambda item: item['ups_ad']) self.data = [r['avg'] for r in result]
Assemble a timeline of datapoints to send to the browser for plotting [ reportgenerators.py]
class FlotReportGenerator(object): def __init__(self, settings, index, processed_collection): """Set attributes to parameters from the settings module and connect to MongoDB """ def connect_to_mongo(self): """Connect to the MongoDB collection for storing history data """ def run(self): """Generate all the data to be passed to flot """ self.create_metadata() self.calc_window_endpoints() self.run_analyzers_for_all_groups() self.write_datapoint_to_mongodb() self.get_and_assemble_history_data_for_flot() self.assemble_current_datapoints() self.prepare_output_data() def create_metadata(self): """Assemble metadata data structures to send to the web frontend """ def calc_window_endpoints(self): """Calculate the window start and end points to be passed to each analyzer """
# class FlotReportGenerator(object): # continued... def run_analyzers_for_all_groups(self): """Loop through the list of groups (plots) to create and generate datapoints """ def write_datapoint_to_mongodb(self): """Write datapoint data structure to mongoDB """ def get_and_assemble_history_data_for_flot(self): """Get history data from mongoDB and assemble it for transmission to a flot stacked bar chart """ def assemble_current_datapoints(self): """Assemble a data structure of all the data points for the current time formatted according to self.groups[groupname]['format'] """ def prepare_output_data(self): """Assign all data to a single data structure, self.out """
Parameters pertaining to analyzing the data [ settings.py]
from analyzers import AvgUpstreamResponseTimePerServerLoggedIn PLOT_SET = { 'cache0': { 'label': 'Cache status (non-media)', 'format': '%.1f%%', 'collection': NG_CACHE_COLL, 'flot_options': {'yaxis': {'max': 100, }, }, 'analyzers': [ (CacheStatus, {'status': 'HIT', 'media': '0'}), (CacheStatus, {'status': 'MISS', 'media': '0'}), (CacheStatus, {'status': 'EXPIRED', 'media': '0'}), (CacheStatus, {'status': 'UPDATING', 'media': '0'}), (CacheStatus, {'status': 'STALE', 'media': '0'}), ], }, 'aurt': { 'label': 'Avg Upstream Resp Time', 'format': '%.2f', 'collection': NG_CACHE_COLL, 'analyzers': [ (AvgUpstreamResponseTimePerServerLoggedIn, {'logged_in': r'^\s*$', }), ], }, # ... }
ANALYSIS_SETTINGS = { 'channel_name': '/topic/graph', 'time_periods': [ {'interval': 5 * 60, # in seconds 'history_length': 144, # number of processed data points to save 'default_window_length': 5 * 60 + 5, # in seconds 'default_flot_options': { 'series': {'stack': 0, 'bars': {'show': True, 'barWidth': (5 * 60) * (0.8 * 1000), 'lineWidth': 1, }, }, 'xaxis': {'mode': "time", 'timeformat': "%H:%M", 'labelWidth': 20, 'labelHeight': 8, }, 'yaxis': {'min': 0, }, }, 'groups': PLOT_SET, }, # ... ], }
Queries MongoDB, assembles data, and sends data to the browser. [ analyzerexecutive.py]
from stompservice import StompClientFactory from twisted.internet import reactor from twisted.internet.task import LoopingCall from reportgenerators import FlotReportGenerator from settings import ANALYSIS_SETTINGS class AnalyzerExecutive(StompClientFactory): def __init__(self, settings): self.channel_name = settings['channel_name'] self.time_periods = settings['time_periods'] self.intervals = [] self.report_generators = [] self.instantiate_report_generators() def instantiate_report_generators(self): """Do this once for each report generator because we want to connect to mongoDB only once """ for i, settings in enumerate(self.time_periods): processed_collection = 'processed_%d' % i rg = FlotReportGenerator(settings, i, processed_collection) self.intervals.append(settings['interval']) self.report_generators.append(rg)
# class AnalyzerExecutive(StompClientFactory): # continued... def recv_connected(self, msg): """Start infinite loops for each of the time periods """ for i in range(len(self.report_generators)): self.timer = LoopingCall(self.generate_and_send_data, i) self.timer.start(self.intervals[i]) def generate_and_send_data(self, i): """This is called every loop """ self.report_generators[i].run() jsondata = json.dumps(self.report_generators[i].out) self.send(self.channel_name, jsondata) ae = AnalyzerExecutive(ANALYSIS_SETTINGS) reactor.connectTCP('localhost', 61613, ae) reactor.run()