Contact: fumanchu@aminus.org

Log in as guest/misc to create tickets

root/pyconquer.py

Revision 140 (checked in by fumanchu, 3 years ago)

Better tracebacks for pyconquer.

  • Property svn:eol-style set to native
Line 
1 #!/usr/bin/python
2 """PyConquer, a concurrent-thread analyzer."""
3
4 usage = """PyConquer tracing tool
5     
6     Usage:
7         pyconquer.py [-h] [-e=<event list>] [-f=path] [-n] [-r=.*] MODULE.py [ARG1 ARG2 ...]
8     
9     -e --events=<event list>: Trace the given events (separate by commas).
10         Available events:
11             'c_events' = ('c_call', 'c_return', 'c_exception')
12             'non_c_events' = ('call', 'return', 'exception', 'line')
13             'all_events' = c_events + non_c_events
14             'all_call_events' = ('call', 'return', 'c_call', 'c_return')
15     
16     -f --filename=<path>: the file where trace output will be written.
17         If omitted, output goes to stdout.
18     
19     -n --notimes: do not emit call times in the output.
20     
21     -r --regex=<regular expression>: limit traced modules to filenames
22         matching the given regular expression. If omitted, all modules
23         are traced. If the given regex begins with "!", filenames which
24         do NOT match will be traced.
25     """
26
27 import os
28 import re
29
30 try:
31     set
32 except NameError:
33     from sets import Set as set
34
35 import sys
36 import thread
37 import threading
38 import time
39 import traceback
40 import warnings
41
42
43 non_c_events = ('call', 'return', 'exception', 'line')
44 c_events = ('c_call', 'c_return', 'c_exception')
45 all_events = c_events + non_c_events
46 all_call_events = ('call', 'return', 'c_call', 'c_return')
47
48 def scope_name(frame):
49     path, name = os.path.split(frame.f_code.co_filename)
50     # Strip off any file extension
51     name = os.path.splitext(name)[0]
52     if name == "__init__":
53         return "/".join((os.path.split(path)[1], name))
54     else:
55         return name
56
57 def is_thread_start(frame, event):
58     """Return True if current frame/event is threading.Thread.start()."""
59     if event == 'call' and frame.f_code.co_name == 'start':
60         localself = frame.f_locals.get('self')
61         if localself and isinstance(localself, threading.Thread):
62             return True
63     return False
64
65 def is_thread_stop(frame, event):
66     """Return True if current frame/event is threading.Thread.__stop()."""
67     if event == 'return' and frame.f_code.co_name == '__stop':
68         # Process threading.Thread.__stop()
69         localself = frame.f_locals.get('self')
70         if localself and isinstance(localself, threading.Thread):
71             return True
72     return False
73
74
75 TERMINATED = 0
76 RUNNING = 1
77
78 class Swimlane:
79     """A thread (since thread id's get re-used, this differentiates them)."""
80    
81     def __init__(self, index):
82         self.indent = 0
83         self.index = index
84         self.state = RUNNING
85         self.calltimes = []
86         self.ccalltimes = []
87
88
89 class Tracer:
90    
91     def __init__(self, fileregex=".*", events=None):
92         self.active = False
93        
94         self.swimlanes = {}
95         self.numlanes = 0
96         self.tabsize = 8
97        
98         self.fileregex = fileregex
99         if events is None:
100             events = ['call', 'return', 'exception']
101         self.events = set(events)
102    
103     def start(self, caller=None):
104         self._using_profile_hook = False
105         for e in c_events:
106             if e in self.events:
107                 # Must also use the profile hook,
108                 # since tracing does not support C events.
109                 threading.setprofile(self.profhook)
110                 sys.setprofile(self.profhook)
111                 self._using_profile_hook = True
112                 break
113        
114         # Set the trace hook for all events other than C events
115         # (it's easier to clean up nicely).
116         threading.settrace(self.hook)
117         sys.settrace(self.hook)
118        
119         # Set the calling frame's local trace func to our hook.
120         if caller is None:
121             caller = sys._getframe().f_back
122         caller.f_trace = self.hook
123        
124         # Set '_caller_lineno' to help with writing tests.
125         self._caller_lineno = caller.f_lineno
126        
127         self.active = True
128    
129     def stop(self, caller=None):
130         self.active = False
131        
132         if threading._trace_hook == self.hook:
133             threading.settrace(None)
134         if threading._profile_hook == self.profhook:
135             threading.setprofile(None)
136        
137         # Ugly hack to get around no sys.gettrace().
138         if caller is None:
139             caller = sys._getframe().f_back
140         if caller.f_trace == self.hook:
141             sys.settrace(None)
142         if caller.f_back and caller.f_back.f_trace == self.hook:
143             sys.settrace(None)
144        
145         # Ugly hack to get around no sys.getprofile().
146         if self._using_profile_hook:
147             sys.setprofile(None)
148    
149     def profhook(self, frame, event, arg):
150         if not event.startswith("c_"):
151             return
152         self.hook(frame, event, arg)
153    
154     def hook(self, frame, event, arg):
155         # Base method; must be overridden.
156         pass
157
158
159
160 #                                 Logging                                 #
161
162
163 class WatchedValue:
164    
165     def __init__(self, obj, attr, default=None, name=None):
166         self.obj = obj
167         self.attr = attr
168         self.default = default
169        
170         if name is None:
171             try:
172                 name = ("%s.%s" % (obj.__class__.__module__, obj.__class__.__name__))
173             except AttributeError:
174                 name = repr(obj)
175             name = "%s.%s" % (name, attr)
176         self.name = name
177        
178         self.oldvalue = self.getvalue()
179    
180     def getvalue(self):
181         return getattr(self.obj, self.attr, self.default)
182
183
184 class Logger(Tracer):
185    
186     def __init__(self, fileregex=".*", events=None):
187         Tracer.__init__(self, fileregex, events)
188         self.currentlane = Swimlane(-1)
189         self.watches = {}
190         self.out = sys.stdout
191         self.time_calls = True
192        
193         # Don't use an RLock here! hook() will hang because Thread.__delete
194         # will acquire the _active_limbo_lock, and then RLock.acquire()
195         # will call currentThread (which creates a _DummyThread, which will
196         # try in its __init__ method to acquire the _active_limbo_lock).
197         self._mutex = threading.Lock()
198    
199     def start(self):
200         self.out.write("\n\n________ pyconquer tracing started ________\n")
201         self.write_watches()
202         Tracer.start(self, sys._getframe().f_back)
203    
204     def stop(self):
205         self.currentlane = Swimlane(-1)
206         Tracer.stop(self, sys._getframe().f_back)
207         self.out.write("\n\n________ pyconquer tracing stopped ________\n")
208    
209     def hook(self, frame, event, arg):
210         try:
211             if not self.active:
212                 return
213            
214             self._mutex.acquire(True)
215             try:
216                 filename = frame.f_code.co_filename
217                
218                 # Deny tracing this module itself.
219                 # TODO: is this Windows-only?
220                 if r"\pyconquer.py" in filename:
221                     return self.hook
222                
223                 # Handle this event if requested.
224                 if event in self.events:
225                     try:
226                         if self.fileregex.startswith("!"):
227                             process = not re.search(self.fileregex[1:], filename)
228                         else:
229                             process = re.search(self.fileregex, filename)
230                     except GeneratorExit:
231                         warnings.warn("Generator bug hit; proceeding anyway. "
232                                       "See http://bugs.python.org/issue1454.")
233                         process = True
234                     if not process:
235                         if is_thread_start(frame, event):
236                             process = True
237                             event = 'threadstart'
238                         elif is_thread_stop(frame, event):
239                             process = True
240                             event = 'threadstop'
241                    
242                     if process:
243                         self.process_event(frame, event, arg)
244                
245                 # See if any of our watched attributes have changed.
246                 self.write_watches(changes_only=True)
247                
248                 return self.hook
249             finally:
250                 self._mutex.release()
251         except:
252             self.out.write("\n\n________ pyconquer traceback start ________\n")
253             self.out.write(traceback.format_exc())
254             self.out.write("\n\n________ pyconquer traceback end ________\n")
255             raise
256    
257     def write_lane(self, msg, indent=None):
258         lane = self.currentlane
259         tab = " " * (self.tabsize * lane.index)
260         if indent is None:
261             indent = lane.indent * 2
262         self.out.write("\n%s%s%s" % (tab, "-" * indent, msg))
263    
264    
265     #                              Processors                              #
266    
267     def process_event(self, frame, event, arg):
268         # Locate the current lane.
269         ct = thread.get_ident()
270         lane = self.swimlanes.get(ct)
271         if lane is None:
272             # Since we're logging only (and not verifying), re-use
273             # the lane index to reduce right-shift.
274             for k, v in self.swimlanes.items():
275                 if v.state == TERMINATED:
276                     index = v.index
277                     break
278             else:
279                 index = self.numlanes
280                 self.numlanes += 1
281             lane = self.swimlanes[ct] = Swimlane(index)
282         elif lane.state == TERMINATED:
283             # Terminated threads may have their ID's re-used;
284             # but use the same lane to reduce right-shift
285             lane = self.swimlanes[ct] = Swimlane(lane.index)
286        
287         if event == 'threadstop':
288             lane.state = TERMINATED
289        
290         if not (lane is self.currentlane):
291             self.currentlane = lane
292            
293             # Write a header row with the current thread's name.
294             self.out.write("\n")
295             self.write_lane("[ %s ]" % threading.currentThread().getName())
296        
297         getattr(self, "process_" + event)(frame, arg)
298    
299     def process_call(self, frame, arg):
300         self.currentlane.indent += 1
301         name = frame.f_code.co_name
302         if name == "?":
303             self.write_lane("> import %s" % frame.f_code.co_filename)
304         else:
305             self.write_lane("> %s (%s:%s)" %
306                             (name, scope_name(frame), frame.f_lineno))
307         if self.time_calls:
308             self.currentlane.calltimes.append(time.clock())
309    
310     def process_return(self, frame, arg):
311         if self.time_calls:
312             end = time.clock()
313             start = self.currentlane.calltimes.pop()
314             diff = " %.3fms" % ((end - start) * 1000)
315         else:
316             diff = ""
317        
318         retval = ""
319         if arg is not None:
320             retval = ": %s" % repr(arg)
321        
322         name = frame.f_code.co_name
323         if name == "?":
324             self.write_lane("< import %s (%s lines)" %
325                             (frame.f_code.co_filename, frame.f_lineno))
326         else:
327             self.write_lane("< %s (%s:%s)%s%s" %
328                             (name, scope_name(frame), frame.f_lineno,
329                              retval, diff))
330         self.currentlane.indent -= 1
331    
332     def process_c_call(self, frame, arg):
333         self.currentlane.indent += 1
334        
335         # arg should be the called C function
336         name = type(arg.__self__).__name__
337         if name == 'NoneType':
338             name = repr(arg)
339         else:
340             name = name + "." + arg.__name__
341        
342         self.write_lane("[ %s (%s:%s)" % (name, scope_name(frame),
343                                           frame.f_lineno))
344         if self.time_calls:
345             self.currentlane.ccalltimes.append(time.clock())
346    
347     def process_c_return(self, frame, arg):
348         if self.time_calls:
349             end = time.clock()
350             start = self.currentlane.ccalltimes.pop()
351             diff = " %.3fms" % ((end - start) * 1000)
352         else:
353             diff = ""
354        
355         # arg should be the called C function
356         name = type(arg.__self__).__name__
357         if name == 'NoneType':
358             name = repr(arg)
359         else:
360             name = name + "." + arg.__name__
361        
362         self.write_lane("] %s (%s:%s) %s" % (name, scope_name(frame),
363                                              frame.f_lineno, diff))
364         self.currentlane.indent -= 1
365    
366     def process_exception(self, frame, arg):
367         exc_class = arg[0]
368         if issubclass(exc_class, basestring):
369             retval = repr(exc_class)
370         else:
371             retval = exc_class.__name__
372         self.write_lane("E %s (%s:%s): %s" %
373                         (frame.f_code.co_name, scope_name(frame),
374                          frame.f_lineno, retval))
375    
376     def process_c_exception(self, frame, arg):
377         # Note that Python exceptions will also (eventually)
378         # call process_return, but C exceptions won't.
379         # So we figure time and dedent here.
380         if self.time_calls:
381             end = time.clock()
382             start = self.currentlane.ccalltimes.pop()
383             diff = " %.3fms" % ((end - start) * 1000)
384         else:
385             diff = ""
386        
387         # arg should be the called C function
388         self.write_lane("e %s (%s:%s) %s: %s" %
389                         (frame.f_code.co_name, scope_name(frame),
390                          frame.f_lineno, diff, arg))
391         self.currentlane.indent -= 1
392    
393     def process_line(self, frame, arg):
394         self.write_lane(". %s (%s:%s)" % (frame.f_code.co_name,
395                                           scope_name(frame), frame.f_lineno))
396    
397     def process_threadstart(self, frame, arg):
398         threadname = frame.f_locals['self'].getName()
399         frame = frame.f_back
400         self.write_lane("* %s (%s:%s) New Thread: %s"
401                        % (frame.f_code.co_name, scope_name(frame),
402                           frame.f_lineno, threadname))
403    
404     def process_threadstop(self, frame, arg):
405         threadname = frame.f_locals['self'].getName()
406         self.write_lane("X Thread terminated: %s" % threadname)
407    
408     #                               Watches                               #
409    
410     def watch(self, obj, attr, default=None, name=None):
411         """Log the state of obj.attr each time it changes."""
412         self.watches[(obj, attr)] = WatchedValue(obj, attr, default, name)
413    
414     def write_watches(self, changes_only=False):
415         for w in self.watches.itervalues():
416             newvalue = w.getvalue()
417             if (not changes_only) or newvalue != w.oldvalue:
418                 w.oldvalue = newvalue
419                 self.write_lane("= %s: %s" % (w.name, repr(newvalue)))
420
421
422 def log(callback, filename, fileregex=".*", events=None):
423     tr = Logger(fileregex, events=events)
424     tr.out = open(filename, "wb")
425     try:
426         tr.start()
427         callback()
428     finally:
429         tr.stop()
430         tr.out.close()
431
432 def log_times(globpath, pattern=".*"):
433     """Return a dict of matching (float(time in ms), line) pairs for each file."""
434     import glob
435    
436     results = {}
437     for fname in glob.glob(globpath):
438         f = open(fname, 'rb')
439         lines = []
440         for line in f.readlines():
441             line = line.rstrip()
442             if line.endswith("ms") and re.search(pattern, line):
443                 line = line.rsplit(" ", 1)
444                 lines.append((float(line[1][:-2]), line[0]))
445         results[fname] = lines
446         f.close()
447     return results
448
449 #                                Verifier                                 #
450
451 SLEEPING = 2
452
453 class Node:
454     """A node on an execution tree, with references to children."""
455    
456     def __init__(self, data=None, exhausted=False, children=None):
457         self.data = data
458         self.exhausted = exhausted
459         # The 'children' dict will eventually have one entry for each lane.
460         self.children = children or {}
461         self.result = None
462    
463     def __repr__(self):
464         return "Node(%s, %s, %s, %s)" % (self.data, self.exhausted,
465                                          self.children, self.result)
466
467
468 class Verifier(Tracer):
469     """A tool for proving the thread-safety of closed, finite machines."""
470    
471     def __init__(self, fileregex="!(threading.py)", events=None):
472         if events is None:
473             events = non_c_events
474         Tracer.__init__(self, fileregex, events)
475        
476         self.exhausted = False
477         self.debug = False
478    
479     def start(self, caller=None):
480         # Overridden because we don't need to set hooks on the main thread.
481         self._using_profile_hook = False
482         for e in c_events:
483             if e in self.events:
484                 # Must also use the profile hook,
485                 # since tracing does not support C events.
486                 threading.setprofile(self.profhook)
487                 self._using_profile_hook = True
488                 break
489        
490         # Set the trace hook for all events other than C events
491         # (it's easier to clean up nicely).
492         threading.settrace(self.hook)
493        
494         self.active = True
495    
496     def hook(self, frame, event, arg):
497         """Trace hook."""
498        
499         if self.exhausted or not self.active:
500             return
501        
502         filename = frame.f_code.co_filename
503        
504         # Locate the current lane.
505         ct = thread.get_ident()
506         lane = self.swimlanes.get(ct)
507         if lane is None:
508             # event from thread_wrapper. ignore it.
509             return None
510        
511         # Deny tracing this module itself.
512         if filename.endswith(r"\pyconquer.py"):
513             process = False
514         elif self.fileregex.startswith("!"):
515             process = not re.search(self.fileregex[1:], filename)
516         else:
517             process = re.search(self.fileregex, filename)
518        
519         if process:
520             if lane.state != TERMINATED:
521                 while lane.state == SLEEPING:
522                     # TODO: why .001 here, but pass is OK in test()?
523                     time.sleep(.001)
524                 if self.exhausted:
525                     return None
526                
527                 # Handle this event if requested.
528                 if event in self.events:
529                     child = self.currentnode.children.get(lane.index)
530                     if child is None:
531                         data = {'index': lane.index,
532                                 'filename': frame.f_code.co_filename,
533                                 'function': frame.f_code.co_name,
534                                 'lineno': frame.f_lineno,
535                                 }
536                         child = Node(data)
537                         self.currentnode.children[lane.index] = child
538                     # Continue down the tree.
539                     self.currentnode = child
540                    
541                     if child.exhausted:
542                         self.abort_path()
543                     else:
544                         self._select_subpath(child, lane)
545             if self.debug:
546                 print ("e %s:%s" % (lane.index, frame.f_lineno)),
547         return self.hook
548    
549     def _select_subpath(self, node, lane):
550         """Sleep this lane and wake another."""
551         if self.debug:
552             print ("[%s?" % sys._getframe().f_back.f_code.co_name),
553        
554         for nextlane in self.swimlanes.values():
555             if nextlane.state != TERMINATED:
556                 child = node.children.get(nextlane.index)
557                 if child and child.exhausted:
558                     # This subpath has been fully explored.
559                     continue
560                
561                 if nextlane is lane:
562                     # Continue with the current lane.
563                     if self.debug:
564                         print ("= %s]" % lane.index),
565                 else:
566                     # Switch processing to a different lane.
567                     if lane.state != TERMINATED:
568                         lane.state = SLEEPING
569                     if self.debug:
570                         print ("> %s]" % nextlane.index),
571                     nextlane.state = RUNNING
572                 return
573         else:
574             # We've tried all possible subpaths from this node.
575             # Tell all remaining threads to finish normally; the
576             # current node will get marked "exhausted" inside test()
577             # after all threads have terminated.
578             self.abort_path()
579    
580     def abort_path(self):
581         self.exhausted = True
582         if self.debug:
583             print ("X]"),
584         for lane in self.swimlanes.values():
585             if lane.state == SLEEPING:
586                 lane.state = RUNNING
587    
588     def compile(self, func, concurrency=2):
589         """Graph all possible execution paths for 'func' in self.tree.
590         
591         'func' must be a closed system, with no external stimuli.
592         External state is fine, as long as it is only modified from
593         within 'func' during the test.
594         
595         If 'concurrency' is 2 or greater, then the code will be run by
596         that many threads 'simultaneously'. The default is 2 threads.
597         
598         'func' CANNOT be allowed to spawn its own threads.
599         If 'concurrency' is 1, then 'func' will be run once,
600         and the test will not prove anything.
601         """
602        
603         self.tree = Node(data={})
604         self.func_under_test = func
605        
606         try:
607             self.start()
608             ts = []
609             while True:
610                 if self.debug:
611                     print "\n----------------------------------"
612                 self.swimlanes = {}
613                 self.numlanes = 0
614                 self.currentnode = self.tree
615                 self.exhausted = False
616                
617                 # Start concurrent calls
618                 ts = [threading.Thread(target=self.thread_wrapper)
619                       for x in range(concurrency)]
620                 for t in ts:
621                     t.start()
622                 # Wait for all our threads to start and sleep
623                 while self.numlanes != concurrency:
624                     pass
625                
626                 # All threads are sleeping; wake one up.
627                 self._select_subpath(self.tree, Swimlane(-1))
628                 # Wait for all threads to complete.
629                 for t in ts:
630                     t.join()
631                
632                 self.currentnode.exhausted = True
633                 if self.tree.exhausted:
634                     # Done with all runs.
635                     break
636         finally:
637             self.stop()
638    
639     def thread_wrapper(self):
640         self.swimlanes[thread.get_ident()] = lane = Swimlane(self.numlanes)
641         lane.state = SLEEPING
642         self.numlanes += 1
643         try:
644             self.currentnode.result = self.func_under_test()
645         except Exception, x:
646             lane.state = TERMINATED
647             if self.debug:
648                 print "E", lane.index,
649             self.currentnode.result = x
650             self.abort_path()
651         else:
652             lane.state = TERMINATED
653             if self.debug:
654                 print "T", lane.index,
655             if not self.exhausted:
656                 # Wake another thread
657                 self._select_subpath(self.currentnode, lane)
658    
659     def ptree(self, node=None):
660         """Return a pretty-printed version of the execution tree."""
661         def descend(node, depth):
662             for k, v in node.children.iteritems():
663                 output.append(("  " * depth) + repr(v.data))
664                 descend(v, depth + 1)
665        
666         output = []
667         if node is None:
668             node = self.tree
669         descend(node, 0)
670         return "\n".join(output)
671    
672     def paths(self, node=None, tests=None):
673         """Generate a list of paths which pass the given test(s)."""
674         def descend(node, trail):
675             trail = trail + [node,]
676             success = True
677             if tests:
678                 for test in tests:
679                     if not test(node):
680                         success = False
681                         break
682             if success:
683                 yield trail
684             for k, v in node.children.iteritems():
685                 for item in descend(v, trail):
686                     yield item
687        
688         if node is None:
689             node = self.tree
690         for path in descend(node, []):
691             yield path
692
693 def has_errors(exc=(AssertionError,)):
694     """A Verifier test; if node resulted in an error, return True."""
695     def err_test(node):
696         if isinstance(node.result, exc):
697             return True
698         return False
699     return err_test
700
701 def failed_paths(func, tests=(has_errors(),), concurrency=2):
702     """Generate all paths which resulted in an AssertionError."""
703     t = Verifier()
704     t.compile(func, concurrency)
705     for path in t.paths(tests=tests):
706         yield path
707
708
709 if __name__ == '__main__':
710     _log = Logger()
711    
712     argv = sys.argv[1:]
713     import getopt
714     optmap = {
715         '-e:': 'events=',
716         '-f:': 'filename=',
717         '-h': 'help',
718         '-n': 'notimes',
719         '-r:': 'regex=',
720         }
721     short_opts = ''.join([k[1:] for k in optmap])
722     long_opts = optmap.values()
723     options, args = getopt.getopt(argv, short_opts, long_opts)
724     for o, a in options:
725         if o == '-e' or o == '--events':
726             if a in ('non_c_events', 'c_events', 'all_events', 'all_call_events'):
727                 _log.events = set(globals()[a])
728             else:
729                 _log.events = set([x.strip() for x in a.split(",") if x.strip()])
730         elif o == '-f' or o == '--filename':
731             _log.out = open(a, "wb")
732         elif o == '-h' or o == '--help':
733             print usage
734             sys.exit()
735         elif o == '-n' or o == '--notimes':
736             _log.time_calls = False
737         elif o == '-r' or o == '--regex':
738             _log.fileregex = a
739    
740     __file__ = args[0]
741     sys.path[0] = os.path.dirname(__file__)
742     sys.argv = args
743     try:
744         _log.start()
745         execfile(args[0])
746     finally:
747         _log.stop()
748         _log.out.close()
Note: See TracBrowser for help on using the browser.