close sqlite db after testing

This commit is contained in:
Dane Springmeyer 2014-10-14 17:35:59 -07:00
parent 09f91a24a0
commit a98158e97d

View file

@ -15,24 +15,24 @@ def setup():
NUM_THREADS = 10 NUM_THREADS = 10
TOTAL = 245 TOTAL = 245
DB = '../data/sqlite/world.sqlite'
TABLE= 'world_merc'
def create_ds(): def create_ds(test_db,table):
ds = mapnik.SQLite(file=DB,table=TABLE) ds = mapnik.SQLite(file=test_db,table=table)
fs = ds.all_features() fs = ds.all_features()
if 'sqlite' in mapnik.DatasourceCache.plugin_names(): if 'sqlite' in mapnik.DatasourceCache.plugin_names():
def test_rtree_creation(): def test_rtree_creation():
test_db = '../data/sqlite/world.sqlite'
index = test_db +'.index'
table = 'world_merc'
index = DB +'.index'
if os.path.exists(index): if os.path.exists(index):
os.unlink(index) os.unlink(index)
threads = [] threads = []
for i in range(NUM_THREADS): for i in range(NUM_THREADS):
t = threading.Thread(target=create_ds) t = threading.Thread(target=create_ds,args=(test_db,table))
t.start() t.start()
threads.append(t) threads.append(t)
@ -43,7 +43,7 @@ if 'sqlite' in mapnik.DatasourceCache.plugin_names():
conn = sqlite3.connect(index) conn = sqlite3.connect(index)
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.execute("Select count(*) from idx_%s_GEOMETRY" % TABLE.replace("'","")) cur.execute("Select count(*) from idx_%s_GEOMETRY" % table.replace("'",""))
conn.commit() conn.commit()
eq_(cur.fetchone()[0],TOTAL) eq_(cur.fetchone()[0],TOTAL)
except sqlite3.OperationalError: except sqlite3.OperationalError:
@ -51,23 +51,26 @@ if 'sqlite' in mapnik.DatasourceCache.plugin_names():
# python's sqlite module does not support rtree # python's sqlite module does not support rtree
pass pass
cur.close() cur.close()
conn.close()
ds = mapnik.SQLite(file=DB,table=TABLE) ds = mapnik.SQLite(file=test_db,table=table)
fs = ds.all_features() fs = ds.all_features()
eq_(len(fs),TOTAL) eq_(len(fs),TOTAL)
os.unlink(index) os.unlink(index)
ds = mapnik.SQLite(file=DB,table=TABLE,use_spatial_index=False) ds = mapnik.SQLite(file=test_db,table=table,use_spatial_index=False)
fs = ds.all_features() fs = ds.all_features()
eq_(len(fs),TOTAL) eq_(len(fs),TOTAL)
eq_(os.path.exists(index),False) eq_(os.path.exists(index),False)
ds = mapnik.SQLite(file=DB,table=TABLE,use_spatial_index=True) ds = mapnik.SQLite(file=test_db,table=table,use_spatial_index=True)
fs = ds.all_features() fs = ds.all_features()
for feat in fs: for feat in fs:
query = mapnik.Query(feat.envelope()) query = mapnik.Query(feat.envelope())
selected = ds.features(query) selected = ds.features(query)
eq_(len(selected.features)>=1,True) eq_(len(selected.features)>=1,True)
del ds
eq_(os.path.exists(index),True) eq_(os.path.exists(index),True)
os.unlink(index) os.unlink(index)
@ -109,6 +112,7 @@ if 'sqlite' in mapnik.DatasourceCache.plugin_names():
cur.execute('''INSERT into "point_table" (id,geometry,name) values (?,?,?)''',values) cur.execute('''INSERT into "point_table" (id,geometry,name) values (?,?,?)''',values)
conn.commit() conn.commit()
cur.close() cur.close()
conn.close()
def make_wkb_point(x,y): def make_wkb_point(x,y):
import struct import struct
@ -134,8 +138,10 @@ if 'sqlite' in mapnik.DatasourceCache.plugin_names():
geoms = feat.geometries() geoms = feat.geometries()
eq_(len(geoms),1) eq_(len(geoms),1)
eq_(geoms.to_wkt(),'Point(-122 48)') eq_(geoms.to_wkt(),'Point(-122 48)')
del ds
# ensure it matches data read with just sqlite # ensure it matches data read with just sqlite
conn = sqlite3.connect(test_db)
cur = conn.cursor() cur = conn.cursor()
cur.execute('''SELECT * from point_table''') cur.execute('''SELECT * from point_table''')
conn.commit() conn.commit()
@ -149,11 +155,11 @@ if 'sqlite' in mapnik.DatasourceCache.plugin_names():
eq_(str(geom_wkb_blob),geoms.to_wkb(mapnik.wkbByteOrder.NDR)) eq_(str(geom_wkb_blob),geoms.to_wkb(mapnik.wkbByteOrder.NDR))
new_geom = mapnik.Path.from_wkb(str(geom_wkb_blob)) new_geom = mapnik.Path.from_wkb(str(geom_wkb_blob))
eq_(new_geom.to_wkt(),geoms.to_wkt()) eq_(new_geom.to_wkt(),geoms.to_wkt())
conn.close()
# cleanup # cleanup
os.unlink(test_db) os.unlink(test_db)
os.unlink(test_db + '.index')
if __name__ == "__main__": if __name__ == "__main__":
setup() setup()
exit(run_all(eval(x) for x in dir() if x.startswith("test_"))) returncode = run_all(eval(x) for x in dir() if x.startswith("test_"))
exit(returncode)