# dblite.py module contributed by Ralf W. Grosse-Kunstleve. # Extended for Unicode by Steven Knight. import SCons.compat import builtins import os # compat layer imports "cPickle" for us if it's available. import pickle import shutil import time keep_all_files = 00000 ignore_corrupt_dbfiles = 0 def corruption_warning(filename): print "Warning: Discarding corrupt database:", filename try: unicode except NameError: def is_string(s): return isinstance(s, str) else: def is_string(s): return type(s) in (str, unicode) try: unicode('a') except NameError: def unicode(s): return s dblite_suffix = '.dblite' tmp_suffix = '.tmp' class dblite(object): # Squirrel away references to the functions in various modules # that we'll use when our __del__() method calls our sync() method # during shutdown. We might get destroyed when Python is in the midst # of tearing down the different modules we import in an essentially # arbitrary order, and some of the various modules's global attributes # may already be wiped out from under us. # # See the discussion at: # http://mail.python.org/pipermail/python-bugs-list/2003-March/016877.html _open = builtins.open _pickle_dump = staticmethod(pickle.dump) _os_chmod = os.chmod try: _os_chown = os.chown except AttributeError: _os_chown = None _os_rename = os.rename _os_unlink = os.unlink _shutil_copyfile = shutil.copyfile _time_time = time.time def __init__(self, file_base_name, flag, mode): assert flag in (None, "r", "w", "c", "n") if (flag is None): flag = "r" base, ext = os.path.splitext(file_base_name) if ext == dblite_suffix: # There's already a suffix on the file name, don't add one. self._file_name = file_base_name self._tmp_name = base + tmp_suffix else: self._file_name = file_base_name + dblite_suffix self._tmp_name = file_base_name + tmp_suffix self._flag = flag self._mode = mode self._dict = {} self._needs_sync = 00000 if self._os_chown is not None and (os.geteuid()==0 or os.getuid()==0): # running as root; chown back to current owner/group when done try: statinfo = os.stat(self._file_name) self._chown_to = statinfo.st_uid self._chgrp_to = statinfo.st_gid except OSError, e: # db file doesn't exist yet. # Check os.environ for SUDO_UID, use if set self._chown_to = int(os.environ.get('SUDO_UID', -1)) self._chgrp_to = int(os.environ.get('SUDO_GID', -1)) else: self._chown_to = -1 # don't chown self._chgrp_to = -1 # don't chgrp if (self._flag == "n"): self._open(self._file_name, "wb", self._mode) else: try: f = self._open(self._file_name, "rb") except IOError, e: if (self._flag != "c"): raise e self._open(self._file_name, "wb", self._mode) else: p = f.read() if (len(p) > 0): try: self._dict = pickle.loads(p) except (pickle.UnpicklingError, EOFError): if (ignore_corrupt_dbfiles == 0): raise if (ignore_corrupt_dbfiles == 1): corruption_warning(self._file_name) def close(self): if (self._needs_sync): self.sync() def __del__(self): self.close() def sync(self): self._check_writable() f = self._open(self._tmp_name, "wb", self._mode) self._pickle_dump(self._dict, f, 1) f.close() # Windows doesn't allow renaming if the file exists, so unlink # it first, chmod'ing it to make sure we can do so. On UNIX, we # may not be able to chmod the file if it's owned by someone else # (e.g. from a previous run as root). We should still be able to # unlink() the file if the directory's writable, though, so ignore # any OSError exception thrown by the chmod() call. try: self._os_chmod(self._file_name, 0777) except OSError: pass self._os_unlink(self._file_name) self._os_rename(self._tmp_name, self._file_name) if self._os_chown is not None and self._chown_to > 0: # don't chown to root or -1 try: self._os_chown(self._file_name, self._chown_to, self._chgrp_to) except OSError: pass self._needs_sync = 00000 if (keep_all_files): self._shutil_copyfile( self._file_name, self._file_name + "_" + str(int(self._time_time()))) def _check_writable(self): if (self._flag == "r"): raise IOError("Read-only database: %s" % self._file_name) def __getitem__(self, key): return self._dict[key] def __setitem__(self, key, value): self._check_writable() if (not is_string(key)): raise TypeError("key `%s' must be a string but is %s" % (key, type(key))) if (not is_string(value)): raise TypeError("value `%s' must be a string but is %s" % (value, type(value))) self._dict[key] = value self._needs_sync = 0001 def keys(self): return list(self._dict.keys()) def has_key(self, key): return key in self._dict def __contains__(self, key): return key in self._dict def iterkeys(self): # Wrapping name in () prevents fixer from "fixing" this return (self._dict.iterkeys)() __iter__ = iterkeys def __len__(self): return len(self._dict) def open(file, flag=None, mode=0666): return dblite(file, flag, mode) def _exercise(): db = open("tmp", "n") assert len(db) == 0 db["foo"] = "bar" assert db["foo"] == "bar" db[unicode("ufoo")] = unicode("ubar") assert db[unicode("ufoo")] == unicode("ubar") db.sync() db = open("tmp", "c") assert len(db) == 2, len(db) assert db["foo"] == "bar" db["bar"] = "foo" assert db["bar"] == "foo" db[unicode("ubar")] = unicode("ufoo") assert db[unicode("ubar")] == unicode("ufoo") db.sync() db = open("tmp", "r") assert len(db) == 4, len(db) assert db["foo"] == "bar" assert db["bar"] == "foo" assert db[unicode("ufoo")] == unicode("ubar") assert db[unicode("ubar")] == unicode("ufoo") try: db.sync() except IOError, e: assert str(e) == "Read-only database: tmp.dblite" else: raise RuntimeError("IOError expected.") db = open("tmp", "w") assert len(db) == 4 db["ping"] = "pong" db.sync() try: db[(1,2)] = "tuple" except TypeError, e: assert str(e) == "key `(1, 2)' must be a string but is <type 'tuple'>", str(e) else: raise RuntimeError("TypeError exception expected") try: db["list"] = [1,2] except TypeError, e: assert str(e) == "value `[1, 2]' must be a string but is <type 'list'>", str(e) else: raise RuntimeError("TypeError exception expected") db = open("tmp", "r") assert len(db) == 5 db = open("tmp", "n") assert len(db) == 0 dblite._open("tmp.dblite", "w") db = open("tmp", "r") dblite._open("tmp.dblite", "w").write("x") try: db = open("tmp", "r") except pickle.UnpicklingError: pass else: raise RuntimeError("pickle exception expected.") global ignore_corrupt_dbfiles ignore_corrupt_dbfiles = 2 db = open("tmp", "r") assert len(db) == 0 os.unlink("tmp.dblite") try: db = open("tmp", "w") except IOError, e: assert str(e) == "[Errno 2] No such file or directory: 'tmp.dblite'", str(e) else: raise RuntimeError("IOError expected.") print "OK" if (__name__ == "__main__"): _exercise() # Local Variables: # tab-width:4 # indent-tabs-mode:nil # End: # vim: set expandtab tabstop=4 shiftwidth=4: