#!/usr/bin/env python from nose.tools import eq_ from utilities import execution_path, run_all import threading import os, mapnik import sqlite3 def setup(): # All of the paths used are relative, if we run the tests # from another directory we need to chdir() os.chdir(execution_path('.')) NUM_THREADS = 10 TOTAL = 245 def create_ds(test_db,table): ds = mapnik.SQLite(file=test_db,table=table) ds.all_features() del ds if 'sqlite' in mapnik.DatasourceCache.plugin_names(): def test_rtree_creation(): test_db = '../data/sqlite/world.sqlite' index = test_db +'.index' table = 'world_merc' if os.path.exists(index): os.unlink(index) threads = [] for i in range(NUM_THREADS): t = threading.Thread(target=create_ds,args=(test_db,table)) t.start() threads.append(t) for i in threads: i.join() eq_(os.path.exists(index),True) conn = sqlite3.connect(index) cur = conn.cursor() try: cur.execute("Select count(*) from idx_%s_GEOMETRY" % table.replace("'","")) conn.commit() eq_(cur.fetchone()[0],TOTAL) except sqlite3.OperationalError: # don't worry about testing # of index records if # python's sqlite module does not support rtree pass cur.close() conn.close() ds = mapnik.SQLite(file=test_db,table=table) fs = ds.all_features() del ds eq_(len(fs),TOTAL) os.unlink(index) ds = mapnik.SQLite(file=test_db,table=table,use_spatial_index=False) fs = ds.all_features() del ds eq_(len(fs),TOTAL) eq_(os.path.exists(index),False) ds = mapnik.SQLite(file=test_db,table=table,use_spatial_index=True) fs = ds.all_features() #TODO - this loop is not releasing something # because it causes the unlink below to fail on windows # as the file is still open #for feat in fs: # query = mapnik.Query(feat.envelope()) # selected = ds.features(query) # eq_(len(selected.features)>=1,True) del ds eq_(os.path.exists(index),True) os.unlink(index) def test_geometry_round_trip(): test_db = '/tmp/mapnik-sqlite-point.db' ogr_metadata = True # create test db conn = sqlite3.connect(test_db) cur = conn.cursor() cur.execute(''' CREATE TABLE IF NOT EXISTS point_table (id INTEGER PRIMARY KEY AUTOINCREMENT, geometry BLOB, name varchar) ''') # optional: but nice if we want to read with ogr if ogr_metadata: cur.execute('''CREATE TABLE IF NOT EXISTS geometry_columns ( f_table_name VARCHAR, f_geometry_column VARCHAR, geometry_type INTEGER, coord_dimension INTEGER, srid INTEGER, geometry_format VARCHAR )''') cur.execute('''INSERT INTO geometry_columns (f_table_name, f_geometry_column, geometry_format, geometry_type, coord_dimension, srid) VALUES ('point_table','geometry','WKB', 1, 1, 4326)''') conn.commit() cur.close() # add a point as wkb (using mapnik) to match how an ogr created db looks x = -122 # longitude y = 48 # latitude wkt = 'POINT(%s %s)' % (x,y) # little endian wkb (mapnik will auto-detect and ready either little or big endian (XDR)) wkb = mapnik.Geometry.from_wkt(wkt).to_wkb(mapnik.wkbByteOrder.NDR) values = (None,sqlite3.Binary(wkb),"test point") cur = conn.cursor() cur.execute('''INSERT into "point_table" (id,geometry,name) values (?,?,?)''',values) conn.commit() cur.close() conn.close() def make_wkb_point(x,y): import struct byteorder = 1; # little endian endianess = '' if byteorder == 1: endianess = '<' else: endianess = '>' geom_type = 1; # for a point return struct.pack('%sbldd' % endianess, byteorder, geom_type, x, y) # confirm the wkb matches a manually formed wkb wkb2 = make_wkb_point(x,y) eq_(wkb,wkb2) # ensure we can read this data back out properly with mapnik ds = mapnik.Datasource(**{'type':'sqlite','file':test_db, 'table':'point_table'}) fs = ds.featureset() feat = fs.next() eq_(feat.id(),1) eq_(feat['name'],'test point') geom = feat.geometry; eq_(geom.to_wkt(),'POINT(-122 48)') del ds # ensure it matches data read with just sqlite conn = sqlite3.connect(test_db) cur = conn.cursor() cur.execute('''SELECT * from point_table''') conn.commit() result = cur.fetchone() cur.close() feat_id = result[0] eq_(feat_id,1) name = result[2] eq_(name,'test point') geom_wkb_blob = result[1] eq_(str(geom_wkb_blob),geom.to_wkb(mapnik.wkbByteOrder.NDR)) new_geom = mapnik.Geometry.from_wkb(str(geom_wkb_blob)) eq_(new_geom.to_wkt(),geom.to_wkt()) conn.close() os.unlink(test_db) if __name__ == "__main__": setup() returncode = run_all(eval(x) for x in dir() if x.startswith("test_")) exit(returncode)