touch sqlite tests further
This commit is contained in:
parent
f8e6b4f27d
commit
f977134afd
1 changed files with 41 additions and 7 deletions
|
@ -73,25 +73,58 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
|
|||
|
||||
def test_geometry_round_trip():
|
||||
test_db = '/tmp/mapnik-sqlite-point.db'
|
||||
ogr_metadata = True
|
||||
|
||||
# create test db
|
||||
conn = sqlite3.connect(test_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute('''
|
||||
CREATE TABLE IF NOT EXISTS "point_table"
|
||||
(id INTEGER PRIMARY KEY AUTOINCREMENT, geometry BLOB, "name" varchar)
|
||||
CREATE TABLE IF NOT EXISTS point_table
|
||||
(id INTEGER PRIMARY KEY AUTOINCREMENT, geometry BLOB, name varchar)
|
||||
''')
|
||||
# optional: but nice if we want to read with ogr
|
||||
if ogr_metadata:
|
||||
cur.execute('''CREATE TABLE IF NOT EXISTS geometry_columns (
|
||||
f_table_name VARCHAR,
|
||||
f_geometry_column VARCHAR,
|
||||
geometry_type INTEGER,
|
||||
coord_dimension INTEGER,
|
||||
srid INTEGER,
|
||||
geometry_format VARCHAR )''')
|
||||
cur.execute('''INSERT INTO geometry_columns
|
||||
(f_table_name, f_geometry_column, geometry_format,
|
||||
geometry_type, coord_dimension, srid) VALUES
|
||||
('point_table','geometry','WKB', 1, 1, 4326)''')
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
# add a point as wkb to match how an ogr created db looks
|
||||
cur = conn.cursor()
|
||||
wkb = mapnik.Path.from_wkt('POINT(-122 48)').to_wkb(mapnik.wkbByteOrder.XDR)
|
||||
# add a point as wkb (using mapnik) to match how an ogr created db looks
|
||||
x = -122 # longitude
|
||||
y = 48 # latitude
|
||||
wkt = 'POINT(%s %s)' % (x,y)
|
||||
# little endian wkb (mapnik will auto-detect and ready either little or big endian (XDR))
|
||||
wkb = mapnik.Path.from_wkt(wkt).to_wkb(mapnik.wkbByteOrder.NDR)
|
||||
values = (None,sqlite3.Binary(wkb),"test point")
|
||||
cur = conn.cursor()
|
||||
cur.execute('''INSERT into "point_table" (id,geometry,name) values (?,?,?)''',values)
|
||||
conn.commit()
|
||||
cur.close()
|
||||
|
||||
def make_wkb_point(x,y):
|
||||
import struct
|
||||
byteorder = 1; # little endian
|
||||
endianess = ''
|
||||
if byteorder == 1:
|
||||
endianess = '<'
|
||||
else:
|
||||
endianess = '>'
|
||||
geom_type = 1; # for a point
|
||||
return struct.pack('%sbldd' % endianess, byteorder, geom_type, x, y)
|
||||
|
||||
# confirm the wkb matches a manually formed wkb
|
||||
wkb2 = make_wkb_point(x,y)
|
||||
eq_(wkb,wkb2)
|
||||
|
||||
# ensure we can read this data back out properly with mapnik
|
||||
ds = mapnik.Datasource(**{'type':'sqlite','file':test_db, 'table':'point_table'})
|
||||
fs = ds.featureset()
|
||||
|
@ -107,18 +140,19 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
|
|||
cur.execute('''SELECT * from point_table''')
|
||||
conn.commit()
|
||||
result = cur.fetchone()
|
||||
cur.close()
|
||||
feat_id = result[0]
|
||||
eq_(feat_id,1)
|
||||
name = result[2]
|
||||
eq_(name,'test point')
|
||||
geom_wkb_blob = result[1]
|
||||
eq_(str(geom_wkb_blob),geoms.to_wkb(mapnik.wkbByteOrder.XDR))
|
||||
eq_(str(geom_wkb_blob),geoms.to_wkb(mapnik.wkbByteOrder.NDR))
|
||||
new_geom = mapnik.Path.from_wkb(str(geom_wkb_blob))
|
||||
eq_(new_geom.to_wkt(),geoms.to_wkt())
|
||||
|
||||
# cleanup
|
||||
cur.close()
|
||||
os.unlink(test_db)
|
||||
os.unlink(test_db + '.index')
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup()
|
||||
|
|
Loading…
Reference in a new issue