Contact: fumanchu@aminus.org

Log in as guest/misc to create tickets

Changeset 184

Show
Ignore:
Timestamp:
09/20/10 17:18:03
Author:
fumanchu
Message:

Futility: better SQLA integration, more controls.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • futility/__init__.py

    r183 r184  
    1111from sqlalchemy.types import Integer, String, DateTime, Boolean 
    1212from sqlalchemy.orm import relation 
    13 from sqlalchemy.sql import and_ 
     13from sqlalchemy.sql import and_, desc 
    1414# If you want to use threadlocals, import sqlalchemy.mods.threadlocal before you 
    1515# import this module, so that the following line imports the threadlocal mapper: 
     
    2020    increment_test[0] = increment_test[0] + amt 
    2121task_executors = {'Increment Test': _incr} 
    22 current_efforts = {} 
    2322 
    2423 
     
    4746        if not self.paused: 
    4847            # Make a new effort to complete the task. 
    49             e = FutileEffort(taskid=task.id) 
    50             e.flush() 
    51             e.start() 
     48            start_effort(task) 
    5249     
    5350    def run_next(self): 
     
    5855        tasks = FutileTask.select( 
    5956            and_(FutileTask.c.node==self.node, 
    60                  FutileTask.c.nexteffort <= now), 
     57                 FutileTask.c.nexteffort <= now, 
     58                 ), 
    6159            order_by=['nexteffort'], 
    6260            limit=1) 
    6361        if tasks: 
     62            print 'Running', tasks[0].id 
    6463            self.run_task(tasks[0]) 
    6564            return 
     
    6867        tasks = FutileTask.select( 
    6968            and_(FutileTask.c.node==self.node, 
    70                  FutileTask.c.nexteffort >= now), 
     69                 FutileTask.c.nexteffort >= now, 
     70                 ), 
    7171            order_by=['nexteffort'], 
    7272            limit=1) 
     
    7575            delay = (delay.days * 86400) + delay.seconds 
    7676            if delay <= 0: 
    77                 delay = 0 
     77                self.run_task(tasks[0]) 
     78                return 
     79             
    7880            if delay > self.idle_poll_rate: 
    7981                # Why not just: 
     
    9496            delay = self.idle_poll_rate 
    9597         
     98        print 'Sleeping', delay 
    9699        time.sleep(delay) 
    97100     
     
    105108        threading.Thread(target=self.run_continuously).start() 
    106109 
     110def start_effort(task): 
     111    """Start a futile effort to perform the given futile task in its own thread.""" 
     112    e = FutileEffort(taskid=task.id) 
     113    t = threading.Thread( 
     114        target=run_effort, args=(e.id, task.name, task.kwargs)) 
     115    t.setName("FutileEffort %d (%r)" % (e.id, task.name)) 
     116    t.start() 
     117 
     118def run_effort(effortid, taskname, kwargs): 
     119    """Run the given futile effort to perform a futile task.""" 
     120    executor = None 
     121    try: 
     122        futileefforts = FutileEffort.mapper.local_table 
     123        futileefforts.update(futileefforts.c.id==effortid, 
     124                             {'starttime': datetime.datetime.now()}).execute() 
     125        executor = task_executors[taskname] 
     126        executor(**kwargs) 
     127    except: 
     128        if hasattr(executor, 'on_error'): 
     129            executor.on_error() 
     130    else: 
     131        if hasattr(executor, 'on_success'): 
     132            executor.on_success() 
     133    finally: 
     134        futileefforts.update(futileefforts.c.id==effortid, 
     135                             {'endtime': datetime.datetime.now()}).execute() 
     136 
    107137 
    108138class FutileTask(object): 
    109139    """A futile task which is attempted at regular intervals.""" 
     140     
     141    def __init__(self, **kwargs): 
     142        for k, v in kwargs.iteritems(): 
     143            setattr(self, k, v) 
     144        self.flush() 
    110145     
    111146    def _get_kwargs(self): 
     
    124159class FutileEffort(object): 
    125160     
    126     def start(self): 
    127         """Start this futile effort to perform a futile task in its own thread.""" 
    128         t = threading.Thread(target=self.run) 
    129         t.setName("FutileEffort %d (%r)" % (self.id, self.task.name)) 
    130         current_efforts[self.id] = self 
    131         t.start() 
     161    def __init__(self, **kwargs): 
     162        for k, v in kwargs.iteritems(): 
     163            setattr(self, k, v) 
     164        self.flush() 
    132165     
    133     def run(self): 
    134         """Run this futile effort to perform a futile task.""" 
    135         try: 
    136             self.starttime = datetime.datetime.now() 
    137             self.flush() 
    138             self.executor = task_executors[self.task.name] 
    139             self.executor(**self.task.kwargs) 
    140         except: 
    141             self.error = traceback.format_exc() 
    142             if hasattr(self.executor, 'on_error'): 
    143                 self.executor.on_error() 
    144         else: 
    145             if hasattr(self.executor, 'on_success'): 
    146                 self.executor.on_success() 
    147         finally: 
    148             self.endtime = datetime.datetime.now() 
    149             self.flush() 
    150  
     166    def pending_effort(cls, taskid): 
     167        """Return the most recent unfinished effort, or None.""" 
     168        efforts = cls.select( 
     169            and_(cls.c.taskid==taskid, cls.c.starttime!=None, cls.c.endtime==None), 
     170            order_by=[desc('starttime')], 
     171            limit=1) 
     172        if efforts: 
     173            return efforts[0] 
     174        return None 
     175    pending_effort = classmethod(pending_effort) 
    151176 
    152177def connect(metadata): 
     
    158183        Column('nexteffort', DateTime), 
    159184        Column('interval', Integer), 
    160         Column('enabled', Boolean), 
    161185    ) 
    162186     
  • futility/root.py

    r183 r184  
    88 
    99import datetime 
     10delta_zero = datetime.timedelta(0) 
    1011import os 
    1112thisdir = os.path.abspath(os.path.dirname(__file__)) 
     
    1415import cherrypy 
    1516import futility 
    16 from futility import FutileTask, FutileEffort 
    17  
    1817futility._incr.on_error = lambda: cherrypy.log('Error in increment test', traceback=True) 
     18 
     19from futility import FutileTask, FutileEffort, start_effort 
     20 
     21from sqlalchemy import objectstore 
     22 
    1923 
    2024def formatdt(dt): 
     
    2226        return '' 
    2327    return dt.strftime('%Y-%m-%d %H:%M:%S') 
     28 
     29def formattd(td): 
     30    if td is None: 
     31        return '' 
     32    if td.days: 
     33        return '%d d' % td.days 
     34    elif td.seconds > 60: 
     35        return '%d m %d s' % divmod(td.seconds, 60) 
     36    elif td.seconds: 
     37        return '%d s' % td.seconds 
     38    return '%d ms' % int(td.microseconds / 1000) 
     39 
     40 
     41def cleanup_sqla(debug=False): 
     42    if debug: 
     43        cherrypy.log('Running cleanup.', 'TOOLS.SQLA') 
     44    try: 
     45        objectstore.flush() 
     46    finally: 
     47        objectstore.clear() 
     48cherrypy.tools.sqla = cherrypy.Tool('before_finalize', cleanup_sqla) 
    2449 
    2550 
     
    3863                'node': task.node, 
    3964                'interval': str(task.interval), 
    40                 'enabled': ' checked="checked"' if task.enabled else '', 
    4165                } 
    4266     
    4367    @cherrypy.expose 
    4468    def default(self, task, name=None, kwargs=None, node=None, 
    45                 interval=None, enabled=None): 
     69                interval=None): 
    4670        if cherrypy.request.method == 'POST' and name is not None: 
    4771            task.name = (None if name == 'None' else name) 
     
    4973            task.node = int(node) 
    5074            task.interval = int(interval) 
    51             if enabled == 'True': 
    52                 task.enabled = True 
    53                 now = datetime.datetime.now() 
    54                 task.nexteffort = now + datetime.timedelta(seconds=int(interval)) 
    55             else: 
    56                 task.enabled = False 
    57                 task.nexteffort = None 
    58             task.flush() 
    5975            raise cherrypy.HTTPRedirect('/tasks/') 
    60         return open(self.default_template, 'rb').read() % self.default_info(task) 
     76        cherrypy.response.headers['Cache-Control'] = 'no-cache' 
     77        info = self.default_info(task) 
     78        return open(self.default_template, 'rb').read() % info 
    6179     
    6280    @cherrypy.expose 
    6381    def run(self, task): 
    64         # Advance the task's 'nexteffort' attribute 
    65         if task.enabled
     82        # Advance the task's 'nexteffort' attribute. 
     83        if task.nexteffort is not None
    6684            task.nexteffort = (datetime.datetime.now() + 
    6785                               datetime.timedelta(seconds=task.interval)) 
    68             task.flush() 
    6986         
    7087        # Make a new effort to complete the task. 
    71         e = FutileEffort(taskid=task.id) 
    72         e.flush() 
    73         e.start() 
     88        start_effort(task) 
     89        raise cherrypy.HTTPRedirect('/tasks/') 
     90     
     91    @cherrypy.expose 
     92    def start(self, task): 
     93        # Advance the task's 'nexteffort' attribute. 
     94        task.nexteffort = (datetime.datetime.now() + 
     95                           datetime.timedelta(seconds=task.interval)) 
     96        raise cherrypy.HTTPRedirect('/tasks/') 
     97     
     98    @cherrypy.expose 
     99    def stop(self, task): 
     100        task.nexteffort = None 
    74101        raise cherrypy.HTTPRedirect('/tasks/') 
    75102taskresource = TaskResource() 
     
    84111        for task in FutileTask.select(order_by=['name']): 
    85112            interval = datetime.timedelta(seconds=task.interval) 
     113             
     114            effort = FutileEffort.pending_effort(task.id) 
     115            if effort: 
     116                cureffort = ('%s (%s)' % (formatdt(effort.starttime), 
     117                    formattd(datetime.datetime.now() - effort.starttime))) 
     118            else: 
     119                cureffort = ( 
     120                    '<form action="/tasks/%s/run" method="POST" style="display: inline">' 
     121                    '<input type="submit" value="Run Once" /></form>' % task.id) 
     122             
     123            if task.nexteffort: 
     124                nexteffort = formatdt(task.nexteffort) 
     125                td = task.nexteffort - datetime.datetime.now() 
     126                if td >= delta_zero: 
     127                    nexteffort += ' (%s)' % formattd(td) 
     128                else: 
     129                    nexteffort += ' (now)' 
     130                nexteffort += ( 
     131                    '<form action="/tasks/%s/stop" method="POST" style="display: inline">' 
     132                    '<input type="submit" value="Stop" /></form>' % task.id) 
     133            else: 
     134                nexteffort = ( 
     135                    '<form action="/tasks/%s/start" method="POST" style="display: inline">' 
     136                    '<input type="submit" value="Start" /></form>' % task.id) 
     137             
    86138            tasks.append('    <tr><td><a href="%s/">%s</a></td><td>%s</td>' 
    87139                         '<td>%s</td><td>%s</td><td>%s</td><td>%s</td>' 
    88140                         '<td>%s</td></tr>' % 
    89141                         (task.id, task.id, task.name, task.jsonkwargs, 
    90                           str(task.node), str(interval), formatdt(task.nexteffort), 
    91                           str(task.enabled))) 
     142                          str(task.node), str(interval), cureffort, nexteffort)) 
    92143        return {'tasks': '\n'.join(tasks), 
    93144                'increment_test': futility.increment_test[0], 
     
    96147    @cherrypy.expose 
    97148    def index(self): 
     149        cherrypy.response.headers['Cache-Control'] = 'no-cache' 
    98150        return open(self.index_template, 'rb').read() % self.index_info() 
    99151     
    100152    @cherrypy.expose 
    101153    def new(self): 
    102         task = FutileTask(name=None, kwargs='{}', interval=3600, enabled=False) 
    103         task.flush() 
     154        task = FutileTask(name=None, kwargs={}, interval=3600) 
    104155        raise cherrypy.HTTPRedirect('/tasks/%d' % task.id) 
    105156     
     
    141192 
    142193app_config = { 
     194    '/': { 
     195        'tools.sqla.on': True, 
     196    }, 
    143197    '/futility.css': { 
    144198        'tools.staticfile.on': True, 
  • futility/task.html

    r183 r184  
    2020    <tr><th>Interval</th> 
    2121        <td><input type="text" name="interval" value="%(interval)s" size="20" /> seconds</td></tr> 
    22     <tr><th>Enabled</th> 
    23         <td><input type="checkbox" name="enabled" value="True" %(enabled)s /></td></tr> 
    2422</table> 
    2523<input type="submit" value="Submit" /> 
    2624</form> 
    2725<form action="/tasks/%(taskid)s/run" method="POST"> 
    28     <input type="submit" value="Run Now" /> 
     26    <input type="submit" value="Run Once" /> 
    2927</form> 
    3028</body> 
  • futility/tasks.html

    r183 r184  
    4545<table class='tasks'> 
    4646    <tr><th>ID</th><th>Name</th><th>Args</th><th>Node</th> 
    47         <th>Interval</th><th>Next Effort</th><th>Enabled</th></tr> 
     47        <th>Interval</th><th>Current Effort</th> 
     48        <th>Next Effort</th></tr> 
    4849%(tasks)s 
    4950</table>