Index: trunk/__init__.py =================================================================== --- (revision ) +++ trunk/__init__.py (revision 1) @@ -1,0 +1,689 @@ +import ConfigParser +import containers +import logic +import xray + +from dejavu.analysis import * +from dejavu.readme import * + + +########################################################################### +## ## +## Units ## +## ## +########################################################################### + + +# All Units currently must possess an 'ID' UnitProperty. The sequencing of +# IDs depends upon their type and the particular needs of the class. Pick +# one of these UnitSequencers to fit your subclass. + +# At the moment, no ID sequences are allowed None as a value, since this +# signals a Unit which needs to be sequenced when memorized. In addition, +# you should aim to create new sequencers which generate IDs that obey +# the builtin max() and min() functions. + +class UnitSequencerNull(object): + """A null sequencer for Unit IDs. Sequencing will error. + + In many cases, ID values simply have no algorithmic sequence; + for example, a set of Employee Units might use Social Security + Numbers for IDs (which you should never, ever do ;). + + In other cases, sequencing will be best handled by custom algorithms + within application code; that is, the job of abstracting the sequence + logic would not be worth the effort. + """ + + def __init__(self, type=unicode): + self.type = type + + def next(self, sequence): + raise UnableToFulfillRequestError("No sequence defined.") + + +class UnitSequencerInteger(object): + """A sequencer for Unit IDs, where id[i+1] == id[i] + 1.""" + + def __init__(self, type=int, initial=1): + self.type = type + self.initial = initial + + def next(self, sequence): + if sequence: + m = max(sequence) + if m is not None: + return m + 1 + return self.initial + + +class UnitSequencerUnicode(object): + """A sequencer for Unit IDs, where e.g. next(['abc']) == 'abd'.""" + + def __init__(self, type=unicode, width=6, + range="abcdefghijklmnopqrstuvwxyz"): + self.type = type + self.width = width + self.range = range + + def next(self, sequence): + r = self.range + if sequence: + maxid = max(sequence) + if len(maxid) != self.width: + raise ValueError("'%s' is not of width %s." % + (maxid, self.width)) + for i in range(self.width - 1, -1, -1): + pos = r.index(maxid[i]) + 1 + if pos >= len(r) or pos < 0: + maxid = maxid[:i] + r[0] + maxid[i+1:] + else: + maxid = maxid[:i] + r[pos] + maxid[i+1:] + break + else: + raise OverflowError("Next ID exceeds width %s." % self.width) + return maxid + return r[0] * self.width + + +class UnitProperty(object): + """Dejavu.Unit data which will persist in storage. + + pre, post: Override these with functions to provide custom behaviors + upon attribute modification. They are not called if sandbox is None. + If you need a behavior which fires regardless, you should override + __set__. They also are not called if the new value matches the + existing value. Again, override __set__ if you need an exception. + + hints: A dictionary which provides named hints to Storage Managers + concerning the nature of the data. A common use, for example, + is to inform Managers that would usually store unicode strings + as strings of length 255, that a particular value should be + a larger object; this is done with a 'Size' mapping, such as: + hints = {u'Size': 0}, where 0 implies no limit. Canonical storage + hint names and implementation details may be found in storage.py + documentation. + """ + + pre = None + post = None + + def __init__(self, cls, key, type, index=False, hints={}): + self.key = key + self.type = type + self.index = index + self.hints = hints + cls._properties[key] = None + cls._property_types[key] = type + + def __get__(self, unit, unitclass=None): + if unit is None: + return self + else: + return unit._properties[self.key] + + def __set__(self, unit, value): + if self.coerce: + value = self.coerce(unit, value) + + oldvalue = unit._properties[self.key] + # This test is expensive, but it saves us a lot of + # unnecessary save() operations later on. + if value != oldvalue: + if unit.sandbox and self.pre: + self.pre(unit, value) + unit.dirty = True + unit._properties[self.key] = value + if unit.sandbox and self.post: + self.post(unit, value) + + def coerce(self, unit, value): + if value is not None and not isinstance(value, self.type): + # Try to coerce the value. + try: + value = self.type(value) + except Exception, x: + x.args += (value, type(value)) + raise x + return value + + def __delete__(self, unit): + raise AttributeError("Unit Properties may not be deleted.") + + +class MetaUnit(type): + def __init__(cls, name, bases, dct): + cls._associations = {} + + # Make a copy of the parent class' _properties keys, and store + # it in the _properties attribute of this subclass. In this + # manner, Unit Properties should propagate down to subclasses, + # but not back up to superclasses. + cls._properties = dict.fromkeys(cls._properties.keys()) + cls._property_types = cls._property_types.copy() + + +class Unit(object): + """A generic object, the building-block of Dejavu. + + These are purposefully lightweight, relying on Sandboxes to cache + them, which in turn rely on Storage Managers to load and save them. + + They maintain their own "schema" via UnitProperty objects, so that the + Storage Managers don't need to know every detail about every Unit. + Storage Managers for simple databases, for example, will simply create + a single flat table for each unit type. If you write a custom Storage + Manager, you can do as you like; the only place you might run into a + problem is if you write a custom Storage Manager for custom Unit types, + because the knowledge between the two is indeterminate. For example, + if we provide a standard StorageManagerForLotusNotes, and you create + custom Units which interface with it, you should probably subclass and + extend our StorageManagerForLotusNotes with some custom storage logic. + + sandbox: The sandbox in which the Unit "lives". Also serves as a flag + indicating whether this Unit has finished the initial creation + process. While sandbox is None, pre and post descriptor functions + will not be called. + + Sandboxes receive Units during recall() and memorize(); + these processes should set the sandbox attribute. + + dirty: a flag indicating whether elements in the _properties dictionary + have been modified. This flag is used by Sandboxes to optimize + forget(): they do not ask Storage Managers to save data for Units + which have not been modified. Because SM's may cache Units, no code + should set this flag other than UnitProperty.__set__ and SM's. + + temporary: a flag indicating that the Unit should NOT be saved to + permanent storage. This should be False for most Units; some Units + may require an additional condition (usually a confirmation by the + user) that their state should persist. Note that the implementation + of 'temporary' is left to Storage Managers, not Sandboxes. + """ + __metaclass__ = MetaUnit + _properties = {} + _property_types = {} + sequencer = UnitSequencerInteger() + + def __init__(self): + # Copy the class _properties dict into self. + self._properties = self.__class__._properties.copy() + + self.sandbox = None + self.dirty = False + self.temporary = False + + def set_property(cls, key, type=unicode, index=False, + descriptor=UnitProperty): + """Set a Unit Property for cls.""" +## if hasattr(cls, key): +## raise ValueError(u"%s already has an attribute named '%s'." +## % (cls, key)) + setattr(cls, key, descriptor(cls, key, type, index)) + set_property = classmethod(set_property) + + def set_properties(cls, types={}, descriptor=UnitProperty): + """Set Unit Properties for cls.""" + for key, type in types.items(): + cls.set_property(key, type, False, descriptor) + set_properties = classmethod(set_properties) + + def indices(cls): + product = [] + for key in cls.properties(): + try: + if getattr(cls, key).index: + product.append(key) + except AttributeError: + raise StandardError(cls, key) + return tuple(product) + indices = classmethod(indices) + + def properties(cls): + return cls._properties.iterkeys() + properties = classmethod(properties) + + def property_type(cls, key): +## return getattr(cls, key).type + return cls._property_types[key] + property_type = classmethod(property_type) + + def adjust(self, **values): + for key, val in values.iteritems(): + setattr(self, key, val) + + def repress(self): + self.sandbox.repress(self) + + def forget(self): + self.sandbox.forget(self) + + def __copy__(self): + newUnit = self.__class__() + for key in self.__class__.properties(): + if key != u'ID': + newUnit._properties[key] = self._properties[key] + newUnit.ID = None + newUnit.sandbox = None + return newUnit + +# The default ID type is int. If you wish to use a different type for +# the ID's of a subclass of Unit, just overwrite ID, e.g.: +# UnitSubclass.set_property('ID', unicode, index=True) +# You will probably also want to override Unit.sequencer in the class body. +Unit.set_property(u'ID', int, index=True) + + + +########################################################################### +## ## +## Arenas ## +## ## +########################################################################### + + +class Arena(object): + """A namespace/workspace for a Dejavu application.""" + + def __init__(self): + self.defaultStore = None + self.stores = {} + self.roster = containers.Prism('name', 'cls', 'store') + self.associations = containers.Graph() + self.engine_functions = {} + + def load(self, configFileName): + """Load StorageManagers.""" + parser = ConfigParser.ConfigParser() + # Make names case-sensitive by overriding optionxform. + parser.optionxform = unicode + parser.read(configFileName) + + stores = [] + for section in parser.sections(): + opts = dict(parser.items(section)) + stores.append((int(opts.get("Load Order", "0")), section, opts)) + stores.sort() + + for order, name, options in stores: + self.add_store(name, options) + + def add_store(self, name, options): + """Load and attach the requested StoreManager.""" + storage_mgr_class = xray.classes(options[u'Class']) + self.stores[name] = store = storage_mgr_class(name, self, options) + + store.shutdownOrder = int(options.get('Shutdown Order', '0')) + + # Fill Roster, a Prism of class-associated data. + unitClasses = options.get('Units', '') + if unitClasses: + for clsname in unitClasses.split(","): + clsname = clsname.strip() + if clsname: + self.roster.add(name=clsname, cls=None, store=store) + else: + self.defaultStore = store + + def shutdown(self): + # Tell all stores to shut down. + stores = [(x.shutdownOrder, x) for x in self.stores.itervalues()] + stores.sort() + for order, store in stores: + store.shutdown() + + def new_sandbox(self): + return Sandbox(self) + + ########################################### + ## Unit Class Registration ## + ########################################### + + def register(self, cls): + """register(cls) -> Assert that Units of class 'cls' will be handled.""" + try: + row = self.roster.row_number(name=cls.__name__) + except ValueError: + self.roster.add(name=cls.__name__, cls=cls, store=self.defaultStore) + else: + # We left cls == None in _load(). Set it now. + self.roster.facets['cls'][row] = cls + + def class_by_name(self, classname): + return self.roster.cls(name=classname) + + def storage(self, cls): + return self.roster.store(cls=cls) + + def create_storage(self, cls): + self.storage(cls).create_storage(cls) + + def associate(self, cls, key, farClass, farKey, + nearFactory=None, farFactory=None): + """Associate one Unit class with another by relating attributes. + + cls, key: The 'near' class and its key. + farClass, farKey: the 'far' class and its key. + + Far Units will be recalled if their farKey matches cls.key. + However, if cls.key is empty or None, no Units will be recalled. + """ + + # Disallow overwriting of existing attributes. + if hasattr(cls, farClass.__name__): + raise ValueError(u"%s already has an attribute named '%s'." + % (cls, farClass.__name__)) + if hasattr(farClass, cls.__name__): + raise ValueError(u"%s already has an attribute named '%s'." + % (farClass, cls.__name__)) + + # Assert that both classes are registered. + self.register(cls) + self.register(farClass) + + # Add a method to cls which retrieves farClass synapses + if nearFactory is None: + nearFactory = _synapses_func + func = nearFactory(key, farClass, farKey) + setattr(cls, farClass.__name__, func) + cls._associations[farClass] = (key, farKey) + + # Add a method to farClass which retrieves cls synapses + if farFactory is None: + farFactory = _synapses_func + func = farFactory(farKey, cls, key) + setattr(farClass, cls.__name__, func) + farClass._associations[cls] = (farKey, key) + + # Register the association(s) in an undirected graph. + self.associations.connect(cls, farClass) + + +# You can use this arena instance if you are +# deploying a single application per process. +# Otherwise, you should create your own instance +# per application. +arena = Arena() + +def _synapses_func(key, farClass, farKey): + """Produce a new synapses() function to be bound to a Unit class.""" + def synapses(self, expr=None): + """Recall associated '%(farname)s' Units. + + %(farname)s Units will be recalled if their %(farkey)s matches + self.%(key)s. However, if self.%(key)s is None, + no Units will be recalled. + """ % {'farname': farClass.__name__, + 'farkey': farKey, + 'key': key, + } + value = getattr(self, key) + if value is None: + return iter([]) + + # kwargs won't take unicode keys + f = logic.filter(**{str(farKey): value}) + if expr is not None: + f += expr + return self.sandbox.recall(farClass, f) + return synapses + + +########################################################################### +## ## +## Sandboxes ## +## ## +########################################################################### + + +class Sandbox(object): + """Data sandbox for Dejavu arenas. + + Each consumer (that is, each UI process) maintains a Sandbox for + managing Units. Sandboxes populate themselves with Units on a lazy + basis, allowing UI code to request data as it's needed. However, once + obtained, such Units are persisted (usually for the lifetime of the + thread); this important detail means that multiple requests for the + same Units result in multiple references to the same objects, rather + than multiple objects. Sandboxes are basically what Fowler calls + Identity Maps. + + The *REALLY* important thing to understand if you're customizing this + is that Sandboxes won't survive sharing across threads--DON'T TRY IT. + If you need to share unit data across requests, use or make an SM which + persists the data, and chain it with another, more normal SM. + + _cache(), _caches, and _stores are private for a reason--don't access + them from interface code--tell the Sandbox to do it for you. + """ + + def __init__(self, arena): + self.arena = arena + self._caches = {} + + def memorize(self, unit): + """Attach a unit to this sandbox so that it will persist.""" + cls = unit.__class__ + unit.sandbox = self + + # Ask the store to accept the unit, assigning it an ID if + # necessary. The store should also set unit.dirty to False + # if it saves the whole unit state on this call or not. + store = self.arena.storage(cls) + if store: + store.reserve(unit) + + # Insert the unit into the cache. + self._cache(cls)[unit.ID] = unit + + # Do this at the end of the func, since most on_memorize + # will want to have an ID when called. + if hasattr(unit, "on_memorize"): + unit.on_memorize() + + def forget(self, unit): + """Destroy the unit, both in the cache and storage.""" + cls = unit.__class__ + store = self.arena.storage(cls) + if store: + store.destroy(unit) + + del self._cache(cls)[unit.ID] + + if hasattr(unit, "on_forget"): + unit.on_forget() + + unit.sandbox = None + + def recall(self, cls, expr=None, *args): + """recall(cls, expr=None) -> Recall units of cls which match expr. + + If additional args are supplied: + + 1) They shall be of the form: (cls, expr, cls, expr, cls, [expr]). + The final expr is optional. + 2) Each such secondary cls/expr pair will be recalled; however, + only those secondary Units which are associated with Units + in the primary set will be returned. + 3) Instead of single Units, the yielded value will be a tuple of + Units, in the same order as the cls args were supplied. This + facilitates consumer code like: + + for invoice, price in sandbox.recall(Invoice, f, Price): + deal_with(invoice) + deal_with(price) + + 4) If any secondary Units are found, all combinations of those + Units, together with each primary Unit, will be returned. + 5) If no secondary Units are recalled, then a token tuple will be + returned of the form (primary_unit, None). For example: + + for invoice, price in sandbox.recall(Invoice, f, Price): + if price is None: + handle_no_prices(invoice) + + """ + + store = self.arena.storage(cls) + + if args: + # Format extra class/expr pairs more rigorously + pairs = [] + args = list(args) + while args: + c = args.pop(0) + if self.arena.storage(c) is not store: + raise ValueError(u"recall() does not currently support " + u"multiple classes in disparate stores.") + if args: + e = args.pop(0) + else: + e = None + pairs.append((c, e)) + + # Give up on in-memory techniques, flush it all, + # and ask the SM to give us what we want. + self.flush(cls) + for c, e in pairs: + self.flush(c) + for units in store.recall(cls, expr, pairs): + for unit in units: + if unit is not None: + ID = unit.ID + cache = self._cache(unit.__class__) + if ID not in cache: + cache[ID] = unit + unit.sandbox = self + yield units + raise StopIteration + + # Run through our cache first. + cache = self._cache(cls) + skip_cache = False + + if expr: + fc = expr.func.func_code + if (fc.co_code == '|\x00\x00i\x01\x00d\x01\x00j\x02\x00S' + and fc.co_names[-1] == 'ID'): + # Special-case the scenario where one Unit is + # expected and called by ID. We should be able + # to save a database hit. + expectedID = fc.co_consts[-1] + if cache.has_key(expectedID): + yield cache[expectedID] + raise StopIteration + else: + skip_cache = True + + if not skip_cache: + for unit in cache.itervalues(): + if expr is None or expr.evaluate(unit): + yield unit + + # Query Storage. + if store: + for unit in store.recall(cls, expr): + ID = unit.ID + # Very important that we check for existing unit, as its + # state may have changed in memory but not in storage. + if ID not in cache: + cache[ID] = unit + unit.sandbox = self + yield unit + + def unit(self, cls, **kwargs): + """Recall a single Unit, else None. + + **kwargs will be combined into an Expression via logic.filter. + The first Unit matching that expression is returned; if no + Units match, None is returned. + + If you need a single Unit which matches a more complex + expression, use recall().next(). + """ + expr = None + if kwargs: + expr = logic.filter(**kwargs) + try: + return self.recall(cls, expr).next() + except StopIteration: + return None + + def distinct(self, cls, fields, expr=None): + """Recall distinct Unit property values. + + If only one field is specified, a list of values will be returned. + If more than one field is specified, a zipped list will be returned. + + Notice that you can also use this function as a count() function + (in fact it's the only way to do it) by using fields = ['ID']. + """ + seen = {} + cache = self._cache(cls) + for unit in cache.itervalues(): + if expr is None or expr.evaluate(unit): + row = tuple([getattr(unit, field) for field in fields]) + if row not in seen: + seen[row] = None + + store = self.arena.storage(cls) + if store: + for row in store.distinct(cls, fields, expr): + if row not in seen: + seen[row] = None + + seen = seen.keys() + seen.sort() + if len(fields) == 1: + seen = [x[0] for x in seen] + return seen + + def count(self, cls, expr): + return len(self.distinct(cls, ['ID'], expr)) + + #################################### + ## Cache Management ## + #################################### + + def _cache(self, cls): + """Return the cache for the specified class. + + This base class creates a new cache for each cls per request. + """ + if cls not in self._caches: + self._caches[cls] = {} + return self._caches[cls] + + def purge(self, cls): + del self._caches[cls] + + def flush(self, cls): + """flush(cls) -> Repress all units of the specified class.""" + cache = self._cache(cls) + store = self.arena.storage(cls) + while cache: + id, unit = cache.popitem() + + if hasattr(unit, "on_repress"): + unit.on_repress() + + if store and unit.dirty: + store.save(unit) + + def flush_all(self): + """flush_all() -> repress() all units.""" + for cls in self._caches.iterkeys(): + self.flush(cls) + + def repress(self, unit): + """repress(unit) -> Remove unit from cache (but don't destroy).""" + if hasattr(unit, "on_repress"): + unit.on_repress() + + cls = unit.__class__ + store = self.arena.storage(cls) + if store and unit.dirty: + store.save(unit) + + del self._cache(cls)[unit.ID] + Index: trunk/analysis.py =================================================================== --- (revision ) +++ trunk/analysis.py (revision 1) @@ -1,0 +1,120 @@ +"""Analysis tools for dejavu Units.""" + +def sort(attrs, descending=False): + """sort(attrs, descending=False) -> create a function for list.sort() for Units.""" + if isinstance(attrs, (str, unicode)): + attrs = (attrs, ) + def sort_func(x, y): + for attr in attrs: + xv = getattr(x, attr) + if callable(xv): + xv = xv() + if xv is None: + diff = -1 + else: + yv = getattr(y, attr) + if callable(yv): + yv = yv() + if yv is None: + diff = 1 + else: + diff = cmp(xv, yv) + if descending: + diff = -diff + if diff != 0: + return diff + return 0 + return sort_func + + +def _force_function(attr): + if callable(attr): + return attr + + def g(obj): + return getattr(obj, attr) + + return g + + +def SUM(attribute): + """sum(attribute) -> create an aggregate function for use with crosstab(). + + 'attribute' can be either the name of an attribute defined for + all objects in self.source, or a further callable to which each obj + is passed and evaluated. + """ + if callable(attribute): + def aggfunc(obj, current_agg_value): + a, b = current_agg_value, attribute(obj) + if a is None: + return b + if b is None: + return a + return a + b + else: + def aggfunc(obj, current_agg_value): + a, b = current_agg_value, getattr(obj, attribute) + if a is None: + return b + if b is None: + return a + return a + b + return aggfunc + +def COUNT(obj, current_agg_value): + """count -> an aggregate function for use with crosstab().""" + return (current_agg_value or 0) + 1 + + +class CrossTab(list): + """Tool to form crosstabs of Unit property values. + + Example: + data = CrossTab(source, (lambda x: x.FirstDate.month,), + 'Field', CrossTab.sum('Size')).results() + """ + + def __init__(self, source=[], groups=[], pivot=None, aggfunc=COUNT): + """CrossTab(source, groups, pivot, aggfunc=count) + + groups: a sequence of attribute names or callables, + which will form the rows of the result. + + pivot: either an attribute name or a callable, which will + form the columns of the result. + """ + # Iterate through generator if provided. We do this here rather + # than results() because we want to allow multiple calls to + # results() without exhausting the generator. + self.source = [x for x in source] + + if not isinstance(groups, (tuple, list)): + groups = [groups,] + self.groups = groups + + self.pivot = pivot + self.aggfunc = aggfunc + + def results(self): + # Force all groups to functions. The reason we do it here instead + # of __init__ is so consumers can still read self.groups as strings + # if that's what they supplied. + groups = [_force_function(group) for group in self.groups] + pivot = _force_function(self.pivot) + aggfunc = self.aggfunc + + data = {} + columns = {} + for obj in self.source: + key = tuple([group(obj) for group in groups]) + val = pivot(obj) + columns[val] = None + + row = data.setdefault(key, {}) + row[val] = aggfunc(obj, row.get(val)) + + columns = columns.keys() + columns.sort() + return data, columns + Index: trunk/doc/api.html =================================================================== --- (revision ) +++ trunk/doc/api.html (revision 1) @@ -1,0 +1,86 @@ +This doc is unfinished. + +Compare to Python's SQLObject: +http://blog.colorstudy.com/ianb/weblog/2004/09/06.html#P154 + +Which was a reply to Ruby's ActiveRecord: +http://www.loudthinking.com/arc/000297.html + +Which was a reply to Java's Hibernate: +http://informit.com/guides/content.asp?g=java&seqNum=127&f1=rss + +
Using dejavu, the application developer supplies the following code +to define the Units and their relationships:
+import dejavu
+import fixedpoint # or decimal, for Python 2.4+
+import datetime
+
+class Book(dejavu.Unit): pass
+# The ID field is alreday set to 'int' for all Unit subclasses.
+Book.set_properties({'title': str,
+ 'price': fixedpoint.Fixedpoint,
+ 'publishDate': datetime.datetime,
+ 'publisher': int,
+ })
+
+class Publisher(dejavu.Unit): pass
+Publisher.set_property('name', str)
+
+class Author(dejavu.Unit):
+ def addAuthor(self, author):
+ a = Authorship()
+ a.authorID = author.ID
+ a.bookID = self.ID
+ self.sandbox.memorize(a)
+Author.set_property('name', str)
+
+class Authorship(dejavu.Unit): pass
+Authorship.set_properties({'authorID': int,
+ 'bookID': int,
+ })
+
+arena = dejavu.Arena()
+arena.associate(Book, 'publisher', Publisher, 'ID')
+arena.associate(Authorship, 'bookID', Book, 'ID')
+arena.associate(Authorship, 'authorID', Author, 'ID')
+
+
+
+The deployer would write in a .conf file:
+[Books] +Class: dejavu.storage.storepgsql +Connection: postgresql:///testdb+ +
To create the tables, the deployer would run the code:
+arena.create_storage(Author) +arena.create_storage(Publisher) +arena.create_storage(Book)+ +
The app developer's runtime code reads as follows:
++ppython = Book +ppython.title = 'Programming Python' +ppython.price = fixedpoint.Fixedpoint(20) +ppython.publishDate = datetime.datetime(2001, 3, 1) +# This next line is redundant; all properties default to None. +# But explicitness is rarely a bad thing. +ppython.publisher = None + +print ppython.title # output: 'Programming Python' + +mlutz = Author +mlutz.name = 'Mark Lutz' +ppython.addAuthor(mlutz) + +len(ppython.authors) # output: 1 +ppython.Authorship().next().Author().next().name # output: 'Mark Lutz' + +oreilly = Publisher(name="O'Reilly") + +ppython.publisher = oreilly +ppython.publisher # output:Index: trunk/engines.py =================================================================== --- (revision ) +++ trunk/engines.py (revision 1) @@ -1,0 +1,482 @@ +""" +Notice in particular that UnitCollection, UnitEngineRule, and UnitEngine +are all _temporary_ Units. Even when you memorize them, they won't be +persistent unless you mark each instance as no longer temporary. If you +use UnitEngine.permanent(), it will make all of its rules permanent +(not temporary) as well. +""" + +import threading +import datetime +try: + import cPickle as pickle +except ImportError: + import pickle +import dejavu +import logic +import sets +import xray + + +class UnitCollection(dejavu.Unit): + """A Set of Unit IDs. + + Type: Unit Type of all Units referenced by this collection. + + The Unit Collection is primarily for use as an index for Units. + Unit Engines use Expressions and other rules to transform a Collection + as a whole. These classes consume and produce Unit Collections. + The Unit Collection provides special methods for iteration, whether + reading or writing, to avoid errors common with multi-process/ + multi-threaded access. + + UnitCollection is a subclass of Unit, so that it can be managed by + Sandboxes. However, due to the structure of the data contained in a + UnitCollection, it is recommended that Storage Managers use different + techniques to store and retrieve Unit Collections. They do not need + more than the ID's of their contained Units stored, since they will + recall such Units as needed. Not every Storage Manager is going to be + able to handle this kind of dynamic storage; deployers-- examine your + Storage Managers and make sure they can! + """ + + _IDs = None + + def __init__(self, Type=None): + dejavu.Unit.__init__(self) + self._IDs = sets.Set() + self._mutex = threading.RLock() + self.Type = Type + + def acquire(self): + self._mutex.acquire(True) + + def release(self): + self._mutex.release() + + def __len__(self): + return len(self._IDs) + + def add(self, ID): + self.acquire() + try: + self._IDs.add(ID) + finally: + self.release() + + def unit_class(self): + return self.sandbox.arena.class_by_name(self.Type) + + def ids(self): + self.acquire() + for eachID in self._IDs: + yield eachID + self.release() + + def units(self, quota=None): + cls = self.unit_class() + self.acquire() + for i, eachID in enumerate(self._IDs): + if quota and i >= quota: + break + unit = self.sandbox.unit(cls, ID=eachID) + if unit: + yield unit + self.release() + + def xdict(self, attr): + """Return a dictionary of {Unit.attr: [Unit, Unit, ...]}.""" + product = {} + self.acquire() + try: + for unit in self.units(): + key = getattr(unit, attr) + product.setdefault(key, []).append(unit) + finally: + self.release() + return product + + def __copy__(self): + newUnit = dejavu.Unit.__copy__(self) + newUnit._IDs = self._IDs.copy() + return newUnit + +UnitCollection.set_property(u'EngineID', int, index=True) +UnitCollection.set_properties({u'Type': unicode, + u'Timestamp': datetime.datetime, + }) + + +operations = [ # OPERAND + 'COPY', # SetID of mixin + 'CREATE', # New type (= class.__name__) + 'DIFFERENCE', # SetID of mixin + 'FILTER', # logic.Expression + 'FUNCTION', # key into arena.engine_functions dict + 'INTERSECTION', # SetID of mixin + 'RETURN', # + 'TRANSFORM', # New type (= class.__name__) + 'UNION', # SetID of mixin + ] + + +class UnitEngineRule(dejavu.Unit): + """A Rule for Unit Engines.""" + + def __init__(self, Operation=None, SetID=None, Operand=None): + """(Operation, SetID, Operand=(Type | logic.Expression | otherSet) + + Expressions: + If the Operation is a logic.Expression, then the snapshot + will consist of the IDs of units which match the Expression. + + Everything else: + transforms: the snapshot will consist of IDs of all units + which are associated with the current snapshot. + union, difference, and intersection: these all take a setID. + + So, a typical Engine might have a set of rules which look like: + --Operation-- --Set-- --Operand-- + CREATE 1 Invoice # Full set + FILTER 1 (Expression) # modifies Set 1 + CREATE 2 Inventory # Full set + FILTER 2 (Expression) # modifies Set 2 + FILTER 2 (Expression) # modifies Set 2 + TRANSFORM 2 Invoice # modifies Set 2 + DIFFERENCE 1 2 # Set1 -= Set2 + RETURN 1 # This is optional. + + The last RETURN statement is optional. If omitted, the last Set + touched will be returned. + + For all operations, the Set ID indicates which Set will be + modified by the operation. Using the above example, you can + see that for the DIFFERENCE operation, the Set which is modified + is Set 1. + """ + dejavu.Unit.__init__(self) + self.Operation = Operation + self.SetID = SetID + if Operation == 'FILTER': + if not isinstance(Operand, (str, unicode)): + Operand = pickle.dumps(Operand) + self.Operand = Operand + + def __repr__(self): + op = self.Operand + if self.Operation == 'FILTER': + op = pickle.loads(op) + return ("dejavu.engines.UnitEngineRule(%s, %s, %s)" + % (self.Operation, self.SetID, repr(op))) + + def expr(self): + if self.Operation == 'FILTER': + op = self.Operand + return pickle.loads(op) + return None + +class RuleProperty(dejavu.UnitProperty): + def post(self, unit, value): + eng = unit.sandbox.unit(UnitEngine, ID=unit.EngineID) + if eng: + eng.update_final_class() +UnitEngineRule.set_property(u'Operation', str, descriptor=RuleProperty) +UnitEngineRule.set_property(u'SetID', int, descriptor=RuleProperty) +UnitEngineRule.set_property(u'Operand', str, descriptor=RuleProperty) +UnitEngineRule.Operand.hints = {u'Size': 0} +UnitEngineRule.set_property(u'Sequence', int, descriptor=RuleProperty) +UnitEngineRule.set_property(u'EngineID', int, index=True) + + +class UnitEngine(dejavu.Unit): + """A factory for Unit Collections.""" + + def __init__(self): + dejavu.Unit.__init__(self) + self.Created = datetime.datetime.today() + self.Owner = u'' + + def on_forget(self): + # Rules and Snapshots shouldn't persist past + # the life of their Engines. Forget them. + for rule in self.rules(): + rule.forget() + for snap in self.snapshots(): + snap.forget() + + def update_final_class(self): + results = {} + last_set = 1 + for rule in self.rules(): + last_set = rule.SetID + operation = rule.Operation + if operation in ('CREATE', 'TRANSFORM'): + results[last_set] = rule.Operand + if operation == 'RETURN': + break + if last_set in results: + self.FinalClassName = results[last_set] + + def final_class(self): + return self.sandbox.arena.class_by_name(self.FinalClassName) + + def rules(self): + """An ordered list of Rules for this Engine.""" + f = logic.filter(EngineID=self.ID) + allrules = [x for x in self.sandbox.recall(UnitEngineRule, f)] + allrules.sort(dejavu.sort(u'Sequence')) + return allrules + + def add_rule(self, Operation, SetID=None, Operand=None): + allrules = self.rules() + if isinstance(Operation, UnitEngineRule): + newRule = Operation + else: + if SetID is None: + try: + SetID = allrules[-1].SetID + except IndexError: + SetID = 1 + newRule = UnitEngineRule(Operation, SetID, Operand) + + try: + nextSeq = allrules[-1].Sequence + 1 + except IndexError: + nextSeq = 0 + newRule.Sequence = nextSeq + + newRule.EngineID = self.ID + newRule.temporary = self.temporary + self.sandbox.memorize(newRule) + self.update_final_class() + + def snapshots(self): + """Unit Collections obtained by executing the rules sometime in the past.""" + f = logic.filter(EngineID=self.ID) + allSnap = [x for x in self.sandbox.recall(UnitCollection, f)] + allSnap.sort(dejavu.sort(u'Timestamp')) + return allSnap + + def take_snapshot(self, args={}): + """Execute the rules and return a Unit Collection (or None).""" + allrules = self.rules() + snap = RuleProcessor(self.sandbox).process(allrules, args) + if snap is not None: + snap.EngineID = self.ID + snap.Timestamp = datetime.datetime.now() + snap.temporary = True + self.sandbox.memorize(snap) + return snap + + def last_snapshot(self, args={}): + allSnaps = self.snapshots() + if len(allSnaps) == 0: + aSnap = self.take_snapshot(args) + else: + aSnap = allSnaps[-1] + return aSnap + + def permanent(self): + self.temporary = False + for rule in self.rules(): + rule.temporary = False + + def __copy__(self): + newUnit = dejavu.Unit.__copy__(self) + newUnit.Name = "Copy of %s" % newUnit.Name + newUnit.Created = datetime.datetime.now() + return newUnit + + def clone(self, user, temporary=True): + """Copy self and all Rules of self. Memorize automatically.""" + newUnit = self.__copy__() + newUnit.Owner = user + newUnit.temporary = temporary + self.sandbox.memorize(newUnit) + for rule in self.rules(): + newRule = rule.__copy__() + newRule.EngineID = newUnit.ID + newRule.temporary = temporary + self.sandbox.memorize(newRule) + return newUnit + + def permit(self, user, write_access=True): + if write_access: + return self.Owner in (u'Public', user) + else: + return self.Owner in ('System', 'Public', user) + +UnitEngine.set_properties({u'Owner': unicode, + u'Name': unicode, + u'Created': datetime.datetime, + u'FinalClassName': unicode, + }) + + +class RuleProcessor(object): + """Processor for the Rules of a Unit Engine.""" + + def __init__(self, sandbox): + self.sandbox = sandbox + self.arena = sandbox.arena + + def process(self, rules, args): + """Execute the rules and return a Unit Collection (or None).""" + self.sets = {} + self.args = args + final = None + for rule in rules: + operation = rule.Operation + func = getattr(self, 'visit_' + operation) + final = rule.SetID + func(final, rule.Operand) + if final is None: + return None + else: + return self.sets[final] + + def visit_COPY(self, setID, operand): + """Copy the set whose ID = operand into another set, whose ID = setID.""" + A = self.sets[setID] + operand = int(operand) + if operand in self.sets: + # Overwrite the existing set. + B = self.sets[operand] + else: + # Create a new set. + B = UnitCollection(A.Type) + self.sets[operand] = B + B.empty = A.empty + A.acquire() + B.acquire() + try: + B._IDs = A._IDs.copy() + finally: + A.release() + B.release() + + def visit_CREATE(self, setID, operand): + """Create an empty set. The next instruction is responsible to fill it.""" + newset = UnitCollection(operand) + newset.empty = True + self.sets[setID] = newset + + def realize_empty(self, setID): + """realize_empty(setID). Populate the specified set only if empty.""" + A = self.sets[setID] + if hasattr(A, 'empty') and A.empty: + A.empty = False + A.acquire() + try: + for unit in self.sandbox.recall(self.arena.class_by_name(A.Type)): + A._IDs.add(unit.ID) + finally: + A.release() + + def visit_DIFFERENCE(self, setID, operand): + self.realize_empty(setID) + A = self.sets[setID] + B = self.sets[int(operand)] + A.acquire() + B.acquire() + try: + A._IDs = A._IDs.difference(B._IDs) + finally: + A.release() + B.release() + + def visit_FILTER(self, setID, operand): + expr = pickle.loads(operand) + expr.bind_args(**self.args) + A = self.sets[setID] + if hasattr(A, 'empty') and A.empty: + A.empty = False + A.acquire() + try: + cls = self.arena.class_by_name(A.Type) + for unit in self.sandbox.recall(cls, expr): + A._IDs.add(unit.ID) + finally: + A.release() + else: + A.acquire() + try: + cls = self.arena.class_by_name(A.Type) + newset = sets.Set() + for id in A._IDs: + unit = self.sandbox.unit(cls, ID=id) + if unit and expr.evaluate(unit): + newset.add(id) + A._IDs = newset + finally: + A.release() + + def visit_FUNCTION(self, setID, operand): + func = self.arena.engine_functions[operand] + + A = self.sets[setID] + A.acquire() + try: + func(A) + finally: + A.release() + + def visit_INTERSECTION(self, setID, operand): + self.realize_empty(setID) + A = self.sets[setID] + B = self.sets[int(operand)] + A.acquire() + B.acquire() + try: + A._IDs = A._IDs.intersection(B._IDs) + finally: + A.release() + B.release() + + def visit_RETURN(self, setID, operand): + self.realize_empty(setID) + + def visit_TRANSFORM(self, setID, operand): + """operand=far class name. Multiple hops are supported.""" + self.realize_empty(setID) + A = self.sets[setID] + start = self.arena.class_by_name(A.Type) + end = self.arena.class_by_name(operand) + nodes = self.arena.associations.shortest_path(start, end) + if nodes is None: + raise KeyError("No association found between '%s' and '%s'" + % (start, end)) + + # Skip the first node, which should be A.Type + nodes.pop(0) + A.acquire() + try: + for eachType in nodes: + # Add all associated Units to the collection A. + oppfunc = getattr(start, eachType.__name__) + cls = self.arena.class_by_name(A.Type) + newset = sets.Set() + for id in A._IDs: + unit = self.sandbox.unit(cls, ID=id) + if unit: + for farUnit in oppfunc(unit): + newset.add(farUnit.ID) + A._IDs = newset + start = eachType + A.Type = eachType.__name__ + finally: + A.release() + + def visit_UNION(self, setID, operand): + self.realize_empty(setID) + A = self.sets[setID] + B = self.sets[int(operand)] + A.acquire() + B.acquire() + try: + A._IDs = A._IDs.union(B._IDs) + finally: + A.release() + B.release() + + Index: trunk/readme.py =================================================================== --- (revision ) +++ trunk/readme.py (revision 1) @@ -1,0 +1,204 @@ +""" +This work, including the source code, documentation +and related data, is placed into the public domain. + +The original author is Robert Brewer, Amor Ministries. + +THIS SOFTWARE IS PROVIDED AS-IS, WITHOUT WARRANTY +OF ANY KIND, NOT EVEN THE IMPLIED WARRANTY OF +MERCHANTABILITY. THE AUTHOR OF THIS SOFTWARE +ASSUMES _NO_ RESPONSIBILITY FOR ANY CONSEQUENCE +RESULTING FROM THE USE, MODIFICATION, OR +REDISTRIBUTION OF THIS SOFTWARE. + + +Dejavu is an Object-Relational Mapper. Objects are called "Units", +and are served into Sandboxes within an Arena. Each Unit instance +has a class, which maintains its schema via Unit Properties. + +"Dejavu", to quote Flying Circus episode 16, means "that strange feeling +we sometimes get that we've lived through something before." What better +name for an object server? Our terminology reflects this cognitive bent: +sandboxes "memorize", "recall" and "forget" Units. + +Most Unit lifecycles follow the same pattern: + aUnit = sandbox.unit(cls, ID=ID) + val = aUnit.propertyName + aUnit.propertyName = newValue + del aUnit # or otherwise release the reference, e.g. close the scope. + +When creating new Units, a similar pattern would be: + newUnit = unit_class() + newUnit.propertyName = newValue + sandbox.memorize(newUnit) + del newUnit # or otherwise release the reference. + +Using recall(), you get an iterator: + for unit in sandbox.recall(cls, expr): + do_something_with(unit) + +You destroy a Unit via Unit.forget(). + +Applications only need to call Unit.repress() when they wish to stop +caching the object, returning it to storage. This is very rare, and +should really only be performed within dejavu code. +""" + +todo = """ +1. Add a convenience function to Unit for associations. That is, + when you have a class A with an association to class B, provide: + a = A() + b = B() + a.add(b) + Since the keys are known, take whichever is None and supply it + from the other Unit instance. +2. Convenience function for retrieving all far classes from an association + and treating them as a list, with len, slicing, etc. +3. Allow Unit properties to be set in __init__() by **kwargs. + SM's should still go the manual route, but app developers could + use it to good effect. +4. Batched triggers: + Some pre/post triggers should be delayed until end of request, + particularly those which affect related Units. Also on_forget(), etc. + There should be a generic trigger mechanism with the ability for + client code to register new trigger points. Probably each Unit class + should maintain an attribute triggers = {}, where each key is a string + and each value is a callable which gets passed "self". Standard + triggers would include: + 1. Pre-modify property X + 2. Post-modify property X. + 3. Pre-forget/post-forget. + 4. on_flush (end of request) +5. For UnitProperty types which are mutable (like dicts), we currently have to + explicitly set dirty, because dirty only sets automatically on rebind, + not mutate. Fix if possible, or doc the heck out of it. +5. Speed issues: + 1. Chained, related Unit classes would be nice to have in Expressions. + lambda x: x.Amount > 0 and x.Characteristic.Type = 'grr' + 2. It would be nice to declare e.g. Job has only one Project; + this would allow job.Project() to set quota = 1 on recall. + 3. Batch memorize when adding several units with auto ID's. + +2.0: + Classes are associated across Arenas. :/ But that shouldn't be a + huge issue--it only affects those who wish to deploy the same app + twice within the same process--an odd corner case. + Add the new (python 2.3) Decimal type. + UnitProperty.pre should be able to modify value. + http://sqlobject.org/docs/FAQ.html would be a great page to rewrite + with respect to dejavu instead of sqlobject. +""" + +version = "1.2.3" + +changelog = """ +1.2.3 (9/3/04): + 1. Decoupled construction of stores from arena._load() into add_store(). + 2. Exceptions in SocketServer now get passed back to the calling app + and reraised. + 3. Fixed nasty latent bug where engine FILTER rules weren't filtering + if the set already had elements; it would just pass through every + existing element without evaluating them against the filter operand. + 4. Wrote dejavu.analysis.py, which provides aggregation and analysis + tools to dejavu applications. Moved dejavu.unit_sort to analysis.sort. + 5. storeado.CALL_FUNCTION failed on some logic functions. If the func + object was stored in co_consts, it would error on .endswith. Now + falls through, setting imperfect, so dejavu can test with the lambda. + +1.2.2 (8/18/04): + 1. Changed storage/sockets to use blocking sockets, resulting in a 40% speedup. + 2. Use faster CPickle if possible in sockets, storeado, engines. + 3. Moved on_forget calls after the unit is forgotten. All known use + cases are OK with this; some needed it to update parent sums. + +1.2.1 (8/10/04): + 1. Got rid of unit.has(), which didn't fit the new unit formation process. + 2. Rewrote CachingProxy.sweep() to destroy temporary units (which rolls + back the reserve(), in effect). + 3. CachingProxy.shutdown duplicated sweep_all(). + 4. Sandbox was setting Unit.dirty = False on recall; that should be left + to SM's, since they might cache Units. + 5. storeado.AdapterToADOFields.coerce_datetime() didn't trap + value == None (tried to add None + None). + +1.2.0 (8/9/04): + 1. Sandbox.forget had a NameError: store = self.arena.storage(cls) + 2. Storage.CachingProxy.sweep errored on some _recallTimes[id] + 3. for job, project in recall(Job, [expr, [Project, [expr]]]). + Recalls associated Units in a single SM query. + 4. Added a special case for Expression(lambda x: x.ID == CONST), + to save the database hit if it's found in the cache. + 5. Profiling: Changed storeado.py, storage/sockets.py to set + unit._properties directly to save __set__ overhead (which isn't + really needed until the unit is concrete). + 7. Profiling: added unit._property_types for faster lookup. + +1.2.0 RC (8/5/04): + 1. Did another big rework. Sandboxes now use straight dicts for caching + Units, and are never shared across requests. Any sharing has been + pushed into subclasses of StorageManager, for example, CachingProxy. + This should also give us the framework to write multi-Unit recall. + 2. Given the new chainable StorageManagers, added "Load Order" and + "Shutdown Order" to config options. + 3. Merged config "Cache" sections with "StorageManager" sections (since + there are no more Cache classes). + 4. Changed arena.roster to be a Prism; removed RegisteredUnitClass class. + +1.1.3 (7/30/04): + 1. Dang. Forgot to have memorize() set dirty with all the refactoring. + 2. Times and datetimes not being saved correctly (math/parens typo). + 3. Times and datetimes not being retrieved correctly due to naive + rounding of times, which are fractional. + +1.1.2 (7/29/04): + 1. Change storeado to drop UnitCollection tables instead of emptying them; + this makes modifying the type of the ID column easier. + 2. engines.take_snapshot(): nontemp Engines now create temp snapshots. + 3. UnitEngine.on_repress removed--it isn't necessary. + +1.1.1 (7/28/04): + 1. storeado was failing on load_collection when looking up class. + 2. storeado wasn't retrieving currencies correctly (lost fractions). + +1.1.0 (7/27/04): + 1. storeado.load_collection had a bug setting idtype. + 2. Added shutdown() to Storage Managers. + +1.1.0 RC (7/26/04): + 1. distinct() functions at sandbox, cache, storage levels. + 2. Completely reworked caching mechanism so apps could flush caches + after each request (which turns out to be most efficient for + web apps, rather than having each thread serialize access to + shared caches). UnitServer is gone, and Unit caches are now owned + by Sandboxes, which are doled out to apps. + 3. Sandbox attribute of Units gets set on memorize, not init. No more + Unit.memorize(); instead, it's sandbox.memorize(unit), and any + custom unit code that was in overridden Unit.memorize functions + now needs to be in Unit.on_memorize(). + 4. storeado.save_collection: Unit.ID.type was not being detected + correctly; hence no save of UnitCollections. + +1.0.3 (7/21/04): + 1. Since cache wasn't being indexed anyway, went back to a plain dict. + 2. Added an option to storeado to create a new ADO Connection + for each thread. Doesn't seem to make a difference on speed. + 3. Added UnitServerWeakref. + 4. Units were being saved on repress() even if not dirty. + 5. Changed default UnitServer to no cache. UnitServerCaching should + be considered broken until its cache management can be made + thread-safe (that is, it's far too slow because it has to + synchronize every recall). + 6. Modified Arena to handle unit class->server mappings more + statically, with a 'roster' dictionary. ALL unit classes must + now be registered via arena.register() or .associate(). + 7. Made Unit.server an attribute, set at __init__, rather than + a method. Consequently, was able to remove some of the litmus + testing done in UnitServer methods--the litmus should be tested + long before those calls; i.e., when arena.roster is populated. + +1.0.2 (7/19/04): + 1. Reworked recall by removing Recallers completely. Big speedup. + +1.0.1 (7/16/04): + 1. Added the COPY rule to engines.py. +""" Index: trunk/storage/__init__.py =================================================================== --- (revision ) +++ trunk/storage/__init__.py (revision 1) @@ -1,0 +1,281 @@ +"""Storage Managers for Dejavu.""" + +import datetime +import threading +import thread +import recur + + +class StorageManager(object): + """A Manager base class for storing and retrieving Units. + + The base StorageManager class doesn't actually store anything; + it needs to be subclassed. + """ + + name = u'' + shutdownOrder = 0 + + def __init__(self, name, arena, allOptions={}): + raise NotImplementedError + + def recall(self, unitClass, expr=None): + """Return an iterable object which will populate Units.""" + raise NotImplementedError + + def save(self, unit, forceSave=False): + """Store the object's data{} dictionary.""" + raise NotImplementedError + + def destroy(self, unit): + """Delete the object.""" + raise NotImplementedError + + def create_storage(self, unitClass): + raise NotImplementedError + + def reserve(self, unit): + """Reserve storage space for the Unit.""" + raise AttributeError + + def shutdown(self): + pass + + +class ProxyStorage(StorageManager): + """A Storage Manager which passes calls to another Storage Manager.""" + + nextstore = None + + def __init__(self, name, arena, allOptions={}): + self.name = name + nextstore = allOptions.get('Next Store') + if nextstore: + self.nextstore = arena.stores[nextstore] + + def recall(self, unitClass, expr=None): + if self.nextstore: + for unit in self.nextstore.recall(unitClass, expr): + yield unit + + def save(self, unit, forceSave=False): + """Store the unit.""" + if self.nextstore: + self.nextstore.save(unit, forceSave) + + def destroy(self, unit): + """Delete the unit.""" + if self.nextstore: + self.nextstore.destroy(unit) + + def create_storage(self, unitClass): + if self.nextstore: + self.nextstore.create_storage(unitClass) + + def reserve(self, unit): + """Reserve storage space for the Unit.""" + if self.nextstore: + self.nextstore.reserve(unit) + + +class CachingProxy(ProxyStorage): + """A Proxy Storage Manager which recalls and keeps Units in memory.""" + + def __init__(self, name, arena, allOptions={}): + ProxyStorage.__init__(self, name, arena, allOptions) + + self._caches = {} + self._cache_locks = {} + self._recallTimes = {} + + # Create and motivate a worker to sweep out idle Units. + lifetime = allOptions.get('Lifetime', '') + if lifetime: + recurrence = recur.Recurrence(None, lifetime) + sweeper = Sweeper(self, recurrence) + sweeper.motivate() + + def cachelen(self, cls): + return len(self._caches.get(cls, {})) + + def cached_units(self, cls): + return self._caches.get(cls, {}).values() + + def recall(self, unitClass, expr=None): + """Return a Unit iterator.""" + if unitClass not in self._caches: + self._caches[unitClass] = {} + self._cache_locks[unitClass] = thread.allocate_lock() + lock = self._cache_locks[unitClass] + + currentTime = datetime.datetime.now() + lock.acquire(True) + try: + cache = self._caches[unitClass] + seen = {} + + # Run through our cache first. Hopefully, this will save us + # calling expr.evaluate twice for each unit. + for id, unit in cache.iteritems(): + if expr is None or expr.evaluate(unit): + seen[id] = unit + + if self.nextstore: + for unit in self.nextstore.recall(unitClass, expr): + if unit.ID not in seen: + seen[unit.ID] = unit + if unit.ID not in cache: + cache[unit.ID] = unit + self._recallTimes[unit.ID] = currentTime + + return iter(seen.values()) + finally: + lock.release() + + def save(self, unit, forceSave=False): + """Store the unit.""" + # Defer saves to either Sweepers or shutdown(). + pass + + def destroy(self, unit): + """Delete the unit.""" + unitClass = unit.__class__ + if unitClass not in self._caches: + self._caches[unitClass] = {} + self._cache_locks[unitClass] = thread.allocate_lock() + lock = self._cache_locks[unitClass] + + lock.acquire(True) + try: + cache = self._caches[unitClass] + if self.nextstore: + self.nextstore.destroy(unit) + if unit.ID in cache: + del cache[unit.ID] + del self._recallTimes[unit.ID] + finally: + lock.release() + + def reserve(self, unit): + """Reserve storage space for the Unit.""" + unitClass = unit.__class__ + if unitClass not in self._caches: + self._caches[unitClass] = {} + self._cache_locks[unitClass] = thread.allocate_lock() + lock = self._cache_locks[unitClass] + + lock.acquire(True) + try: + cache = self._caches[unitClass] + if self.nextstore: + self.nextstore.reserve(unit) + cache[unit.ID] = unit + self._recallTimes[unit.ID] = datetime.datetime.now() + finally: + lock.release() + + def sweep(self, cls, lastSweepTime=None): + if cls not in self._caches: + self._caches[cls] = {} + self._cache_locks[cls] = thread.allocate_lock() + cache = self._caches[cls] + lock = self._cache_locks[cls] + lock.acquire(True) + try: + for id in cache.keys(): + lastRecall = self._recallTimes.setdefault(id, None) + if (lastRecall is None or lastSweepTime is None + or lastRecall < lastSweepTime): + unit = cache[id] + if unit.temporary: + self.nextstore.destroy(unit) + elif unit.dirty: + self.nextstore.save(unit) + + del cache[id] + del self._recallTimes[id] + finally: + lock.release() + + def sweep_all(self, lastSweepTime=None): + for cls in self._caches: + self.sweep(cls, lastSweepTime) + + def shutdown(self): + self.sweep_all() + + +class Sweeper(object): + """Repress idle Units on a schedule.""" + + def __init__(self, store, recurrence): + self.store = store + self.recurrence = recurrence + if self.recurrence: + # Throw away the first occurrence value, + # which is almost always .now() + self.recurrence.next() + self.createdate = datetime.datetime.now() + self.lastrun = None + self.active = True + + def motivate(self): + """Start a new immediate or recurring thread for work.""" + if self.recurrence: + # Start a recurring, timed Thread. + threading.Timer(self.recurrence.interval(), self._cycle).start() + else: + # Start a single, non-recurring thread. + threading.Thread(None, self.run).start() + + def _cycle(self): + """Run the worker on a schedule.""" + self.motivate() + self.run() + + def run(self): + """Prepare for work.""" + if self.active: + if self.lastrun: + lastSweepTime = self.lastrun + else: + lastSweepTime = self.createdate + self.store.sweep_all(lastSweepTime) + self.lastrun = datetime.datetime.now() + + +class BurnedProxy(CachingProxy): + """A Caching Proxy Storage Manager which recalls and caches ALL Units. + + The big performance difference for a burned cache is that, once _any_ + Units have been recalled, further recalls won't hit the next store. + Notice we didn't say "performance _benefit_" ;) That would depend to + a great extent on the proxied store. + """ + + def recall(self, unitClass, expr=None): + """Return a Unit iterator.""" + if unitClass not in self._caches: + self._caches[unitClass] = {} + self._cache_locks[unitClass] = thread.allocate_lock() + lock = self._cache_locks[unitClass] + + lock.acquire(True) + try: + cache = self._caches[unitClass] + + if (not cache) and self.nextstore: + # Read ALL units from storage. + now = datetime.datetime.now() + for unit in self.nextstore.recall(unitClass, None): + cache[unit.ID] = unit + self._recallTimes[unit.ID] = now + + if expr is None: + return iter(cache.values()) + else: + return iter([unit for unit in cache.itervalues() + if expr.evaluate(unit)]) + finally: + lock.release() + Index: trunk/storage/sockets.py =================================================================== --- (revision ) +++ trunk/storage/sockets.py (revision 1) @@ -1,0 +1,144 @@ +import fixedpoint +try: + import cPickle as pickle +except ImportError: + import pickle +import socket +import select + + +def stream(unit): + data = {} + if isinstance(unit, dict): + for key, value in unit.iteritems(): + if isinstance(value, fixedpoint.FixedPoint): + # Unpicklable + data[key] = (value.n, value.p) + else: + data[key] = value + else: + # Assume it's a dejavu.Unit. + for key in unit.__class__.properties(): + value = getattr(unit, key) + if isinstance(value, fixedpoint.FixedPoint): + # Unpicklable + data[key] = (value.n, value.p) + else: + data[key] = value + return pickle.dumps(data) + +def destream(unit, data): + # data will be a pickled dictionary of properties for a unit. + attrdict = pickle.loads(data) + cls = unit.__class__ + for key in cls.properties(): + value = attrdict[key] + if cls.property_type(key) == fixedpoint.FixedPoint: + if value is not None: + n, p = value + value = fixedpoint.FixedPoint(0, 2) + value.n, value.p = n, p +## setattr(unit, key, value) + # Experimental: set the attribute directly to avoid __set__ overhead. + unit._properties[key] = value + +def dechunk(data): + """Extract chunks from a stream of data.""" + chunks = [] + while data: + size = (256 * int(ord(data[0]))) + int(ord(data[1])) + chunks.append(data[2:size + 2]) + data = data[size + 2:] + return chunks + +def sendall(conn, msg): + size = len(msg) + if size > (256 * 256): + raise ValueError("msg to send is too large (%s)." % size) + conn.sendall(chr(size >> 8)) + conn.sendall(chr(size & 0xFF)) + conn.sendall(msg) + + +class SocketClient(object): + """Client (dejavu) end of a socket for handling dejavu Units.""" + + # For some reason, the default of '' for localhost doesn't work on Win2k. + def __init__(self, host='127.0.0.1', port=51111, timeout=5): + self.host = host + self.port = port + self.timeout = timeout + + def query(self, msg): + """Send msg to the peer and return the response(s) in a list. + + Socket responses specify their length in the first two + bytes (hi, lo). Then, the data follows. If the socket wishes to + return another "object", it will follow the data with another + length+data stream. + """ + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +## conn.settimeout(self.timeout) + conn.connect((self.host, self.port)) + sendall(conn, msg) + conn.shutdown(1) + + # Receive reply. + data = [] + while True: + try: + chunk = conn.recv(1024) + except Exception, x: + if x.args[0] != 10035: + raise x + else: + if chunk == '': + break + data.append(chunk) + + conn.close() + + data = dechunk(''.join(data)) + if data and data[0] == 'ERROR': + raise pickle.loads(data[1]) + return data + + +class SocketServer(object): + """Server end of a socket for handling dejavu Units. + + Use this class to build wrappers for non-standard databases or + other sources of dejavu Units. + """ + + def open(self, host='127.0.0.1', port=51111, backlog=5): + """open(host='127.0.0.1', port=51111, backlog=5).""" + self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +## self.conn.setblocking(0) + self.conn.bind((host, port)) + self.conn.listen(backlog) + + def read(self): + """read(). Read from the socket.""" + readables, writables, e = select.select([self.conn], [self.conn], [], 1) + if self.conn in readables: + (sockobj, address) = self.conn.accept() + + data = [] + while True: + try: + chunk = sockobj.recv(1024) + except Exception, x: + if x.args[0] != 10035: + raise x + else: + if chunk == '': + break + data.append(chunk) + return sockobj, dechunk(''.join(data))[0] + else: + return None, '' + + def close(self): + self.conn.close() + Index: trunk/storage/storeado.py =================================================================== --- (revision ) +++ trunk/storage/storeado.py (revision 1) @@ -1,0 +1,1061 @@ +import sys +# Put COM in free-threaded mode +sys.coinit_flags = 0 + +import win32com.client +import fixedpoint +import pywintypes +import pythoncom +import threading +import codewalk +import datetime +try: + import cPickle as pickle +except ImportError: + import pickle +import recur +import logic +import new + +from dejavu import storage + +adOpenForwardOnly = 0 +adOpenKeyset = 1 +adOpenDynamic = 2 +adOpenStatic = 3 + +adLockReadOnly = 1 +adLockPessimistic = 2 +adLockOptimistic = 3 +adLockBatchOptimistic = 4 + +adModeShareExclusive = 12 + +adStateClosed = 0 +adStateOpen = 1 +adStateConnecting = 2 +adStateExecuting = 4 +adStateFetching = 8 + +# 12/30/1899, the zero-Date for ADO = 693594 +zeroHour = datetime.date(1899, 12, 30).toordinal() + + +def time_from_com(com_date): + """Return a valid (day, datetime.time) from a COM date or time object.""" + hour, min = divmod(86400 * (float(com_date) % 1), 3600) + min, sec = divmod(min, 60) + # Must do both int() and round() or we'll be up to 1 second off. + hour = int(round(hour)) + min = int(round(min)) + sec = int(round(sec)) + return recur.sane_time(0, hour, min, sec) + + +class AdapterFromADO(logic.Adapter): + """Coerce incoming values from ADO to Dejavu datatypes.""" + def __init__(self, unit=None): + self.default_processor = self.coerce_unicode + self.processors = {datetime.datetime: self.coerce_datetime, + datetime.date: self.coerce_date, + datetime.time: self.coerce_time, + float: self.coerce_float, + int: self.coerce_int, + bool: self.coerce_int, + fixedpoint.FixedPoint: self.coerce_fixed, + dict: self.pickle, + list: self.pickle, + str: self.coerce_str, + } + self.unit = unit + + def consume(self, key, value): + expectedType = self.unit.__class__.property_type(key) + value = self.coerce(value, expectedType) +## setattr(self.unit, key, value) + # Experimental: set the attribute directly to avoid __set__ overhead. + self.unit._properties[key] = value + + def pickle(self, value): + aType, value = value + if value is None: + return None + else: + # Coerce to str for pickle.loads restriction. + value = str(value) + return pickle.loads(value) + + def coerce_datetime(self, value): + # Illegal Date/Time values will crash the + # app when using value.Format(). Therefore, + # grab the value and figure the date ourselves. + # Use 1-second resolution only. + aType, value = value + if value is None: + return None + elif isinstance(value, basestring): + return datetime.datetime(int(value[0:4]), int(value[4:6]), + int(value[6:8])) + else: + # For some reason, we need both float and int. + aDate = datetime.date.fromordinal(int(float(value)) + zeroHour) + day, aTime = time_from_com(value) + return datetime.datetime.combine(aDate, aTime) + + def coerce_date(self, value): + # See coerce_datetime + aType, value = value + if value is None: + return None + elif isinstance(value, basestring): + return datetime.date(int(value[0:4]), int(value[4:6]), + int(value[6:8])) + else: + return datetime.date.fromordinal(int(float(value)) + zeroHour) + + def coerce_time(self, value): + # See coerce_datetime + aType, value = value + if value is None: + return None + else: + day, aTime = time_from_com(value) + return aTime + + def coerce_float(self, value): + aType, value = value + if value is None: + return None + if aType == 0x06: + # Currency + value = value[1] / 10000.0 + return float(value) + + def coerce_int(self, value): + aType, value = value + if value is None: + return None + if aType == 0x0b: + # Boolean + return value != 0 + return int(value) + + def coerce_fixed(self, value): + aType, value = value + if value is None: + return None + if aType == 0x06: + # Currency + value = value[1] / 10000.0 + return fixedpoint.FixedPoint(value) + + def coerce_str(self, value): + aType, value = value + if value is None: + return None + return str(value) + + def coerce_unicode(self, value): + aType, value = value + if value is None: + return None + if isinstance(value, unicode): + # For some reason, inValue is already a unicode object. + return value + if isinstance(value, str): + try: + return unicode(value, "ISO-8859-1") + except UnicodeError: + raise StandardError(type(value)) + return unicode(value) + + + +class AdapterToADOFields(logic.Adapter): + """Coerce outgoing values from Dejavu datatypes to ADO.Field types.""" + + def __init__(self): + self.default_processor = self.coerce_str + self.processors = {datetime.datetime: self.coerce_datetime, + datetime.date: self.coerce_date, + datetime.time: self.coerce_time, + fixedpoint.FixedPoint: self.coerce_fixed, + dict: self.pickle, + list: self.pickle, + bool: self.coerce_bool, + } + + def coerce_bool(self, value): + if value: + return True + return False + + def coerce_fixed(self, value): + if value is None: + return None + return float(value) + + def coerce_datetime(self, value): + if value is None: + return None + return self.coerce_date(value) + self.coerce_time(value) + + def coerce_date(self, value): + if value is None: + return None + return value.toordinal() - zeroHour + + def coerce_time(self, value): + if value is None: + return None + return ((value.second + (value.minute * 60) + (value.hour * 3600)) + / 86400.0) + + def coerce_str(self, value): + return value + + def pickle(self, value): + # We must not use a pickle format other than 0, because binary + # strings are not safe for all DB string fields. + return pickle.dumps(value) + + +class AdapterToADOSQL(logic.Adapter): + """Coerce Expression constants to ADO SQL.""" + + def __init__(self): + self.default_processor = str # NOT coerce_str + self.processors = {datetime.datetime: self.coerce_datetime, + datetime.date: self.coerce_date, + datetime.time: self.coerce_time, + datetime.timedelta: self.coerce_timedelta, + fixedpoint.FixedPoint: self.coerce_fixed, + str: self.coerce_str, + unicode: self.coerce_str, + tuple: self.coerce_tuple, + bool: self.coerce_bool, + type(None): self.coerce_none, + } + + def coerce_none(self, value): + return "Null" + + def coerce_bool(self, value): + if value: + return 'True' + return 'False' + + def coerce_datetime(self, value): + return (u'#%s/%s/%s %02d:%02d:%02d#' % + (value.month, value.day, value.year, + value.hour, value.minute, value.second)) + + def coerce_date(self, value): + return u'#%s/%s/%s#' % (value.month, value.day, value.year) + + def coerce_time(self, value): + return u'#%02d:%02d:%02d#' % (value.hour, value.minute, value.second) + + def coerce_timedelta(self, value): + float_val = value.days + (value.seconds / 86400.0) + return repr(float_val) + + def coerce_fixed(self, value): + return str(value) + + def coerce_str(self, value): + value = value.replace(u"'", u"''") + value = value.replace("%", "[%]") + value = value.replace("_", "[_]") + return "'" + value + "'" + + def coerce_tuple(self, value): + return "(" + ", ".join([self.coerce(x) for x in value]) + ")" + + +def icontainedby(op1, op2, notin=False): + # This test doesn't work right, now that we use lists as + # well as tuples with IN. Need a way to mark field refs. + if op2.startswith("[") and op2.endswith("]"): + # Looking for text in a field. Use Like (reverse terms). + value = op2 + " Like '%" + op1[1:-1] + "%'" + else: + # Looking for field in (a, b, c) + value = op1 + " in " + op2 + if notin: + value = "not " + value + return value + + +class ADOSQLDecompiler(codewalk.LambdaDecompiler): + """ADOSQLDecompiler(store, unitClass, expr, adapter=AdapterToADOSQL()). + + Produce SQL from a supplied lambda of the form: + lambda x, **kw: ... + + Attributes of x (or whatever the name of the first argument is) will be + mapped to table columns. Keyword arguments should be bound to the + Expression before calling this decompiler. + """ + + sql_cmp_op = ('<', '<=', '=', '!=', '>', '>=', 'in', 'not in') + functions = {logic.icontains: lambda x, y: x + " Like '%" + y[1:-1] + "%'", + logic.icontainedby: icontainedby, + logic.istartswith: lambda x, y: x + " Like '" + y[1:-1] + "%'", + logic.iendswith: lambda x, y: x + " Like '%" + y[1:-1] + "'", + logic.ieq: lambda x, y: x + " = " + y, + logic.now: lambda: "getdate()", + logic.today: lambda: "DATEADD(dd, DATEDIFF(dd,0,getdate()), 0)", + logic.year: lambda x: "YEAR(" + x + ")", + } + + def __init__(self, store, unitClass, expr, adapter=AdapterToADOSQL()): + self.store = store + self.unitClass = unitClass + self.expr = expr + self.adapter = adapter + obj = expr.func + codewalk.LambdaDecompiler.__init__(self, obj) + + def code(self): + self.imperfect = False + self.walk() + result = self.stack[0] + if result is None: + result = 'True' + return result, self.imperfect + + def visit_target(self, terms): +## terms.reverse() + comp = self.stack.pop() + while terms: + term, operation = terms.pop() + # All this checking of None is done so that a function + # (like logic.iscurrentweek) can be labeled imperfect-- + # all Units (which match the rest of the Expression) + # will be recalled. They can then be compared in + # expr.evaluate(unit). + if comp is None: + if term is not None: + comp = term + else: + if term is not None: + comp = "(%s) %s (%s)" % (term, operation, comp) + self.stack.append(comp) + + def visit_LOAD_GLOBAL(self, lo, hi): + pass + + def visit_LOAD_FAST(self, lo, hi): + pass + + def visit_LOAD_ATTR(self, lo, hi): + name = self.co_names[lo + (hi << 8)] + self.stack.append("[%s%s].[%s]" % + (self.store.prefix, self.unitClass.__name__, name)) + + def visit_LOAD_CONST(self, lo, hi): + val = self.co_consts[lo + (hi << 8)] + # Some constants are function or class objects, + # which should not be coerced. + no_coerce = (new.function, type) + if not isinstance(val, no_coerce): + val = self.adapter.coerce(val) + self.stack.append(val) + + def visit_BUILD_TUPLE(self, lo, hi): + terms = ", ".join([self.stack.pop() for i in range(lo + hi << 8)]) + self.stack.append("(" + terms + ")") + + def visit_BUILD_LIST(self, lo, hi): + self.visit_BUILD_TUPLE(lo, hi) + + def visit_CALL_FUNCTION(self, lo, hi): + kwargs = {} + for i in range(hi): + val = self.stack.pop() + key = self.stack.pop() + kwargs[key] = val + kwargs = [k + "=" + v for k, v in kwargs.iteritems()] + + args = [] + for i in range(lo): + arg = self.stack.pop() + args.append(arg) + args.reverse() + + if kwargs: + args += kwargs + + func = self.stack.pop() + + # Handle function objects. + if func in self.functions: + self.stack.append(self.functions[func](*args)) + else: + if isinstance(func, basestring): + if func.endswith("[startswith]"): + self.stack[-1] = self.stack[-1] + " Like '" + args[0][1:-1] + "%'" + self.imperfect = True + return + elif func.endswith("[endswith]"): + self.stack[-1] = self.stack[-1] + " Like '%" + args[0][1:-1] + "'" + self.imperfect = True + return + elif func == '+ +len(oreilly.books) # output: 1 + +print 'Hi,', oreilly.books[0].authors[0].name +