+ update tests to use Python iterator protocol

This commit is contained in:
artemp 2012-08-08 17:31:30 +01:00
parent 7847af51e7
commit 92eff33433
7 changed files with 59 additions and 53 deletions

View file

@ -21,7 +21,7 @@ if 'csv' in mapnik.DatasourceCache.instance().plugin_names():
broken = glob.glob("../data/csv/fails/*.*")
broken.extend(glob.glob("../data/csv/warns/*.*"))
# Add a filename that doesn't exist
# Add a filename that doesn't exist
broken.append("../data/csv/fails/does_not_exist.csv")
for csv in broken:
@ -107,13 +107,15 @@ if 'csv' in mapnik.DatasourceCache.instance().plugin_names():
eq_(ds.fields(),['x', 'y', 'text', 'date', 'integer', 'boolean', 'float', 'time', 'datetime', 'empty_column'])
eq_(ds.field_types(),['int', 'int', 'str', 'str', 'int', 'str', 'float', 'str', 'str', 'str'])
fs = ds.featureset()
feat = fs.next()
attr = {'x': 0, 'empty_column': u'', 'text': u'a b', 'float': 1.0, 'datetime': u'1971-01-01T04:14:00', 'y': 0, 'boolean': u'True', 'time': u'04:14:00', 'date': u'1971-01-01', 'integer': 40}
eq_(feat.attributes,attr)
while feat:
first = True
for feat in fs:
if first:
first=False
eq_(feat.attributes,attr)
eq_(len(feat),10)
eq_(feat['empty_column'],u'')
feat = fs.next()
desc = ds.describe()
eq_(desc['geometry_type'],mapnik.DataGeometryType.Point)
eq_(desc['name'],'csv')

View file

@ -5,6 +5,7 @@ from nose.tools import *
from utilities import execution_path, Todo
import os, sys, glob, mapnik
import itertools
def setup():
# All of the paths used are relative, if we run the tests
@ -23,17 +24,12 @@ def compare_shape_between_mapnik_and_ogr(shapefile,query=None):
fs1 = ds1.featureset()
fs2 = ds2.featureset()
count = 0;
while(True):
for feat1,feat2 in itertools.izip(fs1,fs2):
count += 1
feat1 = fs1.next()
feat2 = fs2.next()
if not feat1:
break
#import pdb;pdb.set_trace()
eq_(feat1.id(),feat2.id(),
'%s : ogr feature id %s "%s" does not equal shapefile feature id %s "%s"'
'%s : ogr feature id %s "%s" does not equal shapefile feature id %s "%s"'
% (count,feat1.id(),str(feat1.attributes), feat2.id(),str(feat2.attributes)))
return True
return True
def test_shapefile_line_featureset_id():

Binary file not shown.

Before

Width:  |  Height:  |  Size: 135 KiB

After

Width:  |  Height:  |  Size: 124 KiB

View file

@ -15,10 +15,9 @@ def test_add_feature():
featureset = md.features_at_point(mapnik.Coord(2,3))
retrieved = []
feat = featureset.next()
while feat:
for feat in featureset:
retrieved.append(feat)
feat = featureset.next()
eq_(len(retrieved), 1)
f = retrieved[0]
@ -26,8 +25,7 @@ def test_add_feature():
featureset = md.features_at_point(mapnik.Coord(20,30))
retrieved = []
feat = featureset.next()
while feat:
for feat in featureset:
retrieved.append(feat)
eq_(len(retrieved), 0)

View file

@ -27,12 +27,9 @@ if 'shape' in plugins and 'ogr' in plugins:
fs1 = ds1.featureset()
fs2 = ds2.featureset()
count = 0;
while(True):
import itertools
for feat1,feat2 in itertools.izip(fs1, fs2):
count += 1
feat1 = fs1.next()
feat2 = fs2.next()
if not feat1:
break
eq_(str(feat1),str(feat2))
# TODO - revisit this: https://github.com/mapnik/mapnik/issues/1093
#eq_(feat1.geometries().to_wkt(),feat2.geometries().to_wkt())

View file

@ -197,7 +197,11 @@ if 'postgis' in mapnik.DatasourceCache.instance().plugin_names() \
def test_empty_db():
ds = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table='empty')
fs = ds.featureset()
feature = fs.next()
feature = None
try:
feature = fs.next()
except StopIteration:
pass
eq_(feature,None)
eq_(ds.describe()['geometry_type'],mapnik.DataGeometryType.Collection)
@ -397,22 +401,16 @@ if 'postgis' in mapnik.DatasourceCache.instance().plugin_names() \
geometry_field='geom',
autodetect_key_field=True)
fs = ds.featureset()
eq_(fs.next().id(),1)
eq_(fs.next().id(),2)
eq_(fs.next().id(),3)
eq_(fs.next().id(),4)
eq_(fs.next(),None)
for id in range(1,5):
eq_(fs.next().id(),id)
def test_querying_subquery_with_mixed_case():
ds = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table='(SeLeCt * FrOm "tableWithMixedCase") as MixedCaseQuery',
geometry_field='geom',
autodetect_key_field=True)
fs = ds.featureset()
eq_(fs.next().id(),1)
eq_(fs.next().id(),2)
eq_(fs.next().id(),3)
eq_(fs.next().id(),4)
eq_(fs.next(),None)
for id in range(1,5):
eq_(fs.next().id(),id)
def test_bbox_token_in_subquery1():
ds = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table='''
@ -420,11 +418,8 @@ if 'postgis' in mapnik.DatasourceCache.instance().plugin_names() \
geometry_field='geom',
autodetect_key_field=True)
fs = ds.featureset()
eq_(fs.next().id(),1)
eq_(fs.next().id(),2)
eq_(fs.next().id(),3)
eq_(fs.next().id(),4)
eq_(fs.next(),None)
for id in range(1,5):
eq_(fs.next().id(),id)
def test_bbox_token_in_subquery2():
ds = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table='''
@ -432,11 +427,8 @@ if 'postgis' in mapnik.DatasourceCache.instance().plugin_names() \
geometry_field='geom',
autodetect_key_field=True)
fs = ds.featureset()
eq_(fs.next().id(),1)
eq_(fs.next().id(),2)
eq_(fs.next().id(),3)
eq_(fs.next().id(),4)
eq_(fs.next(),None)
for id in range(1,5):
eq_(fs.next().id(),id)
atexit.register(postgis_takedown)

View file

@ -34,14 +34,18 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
'''
)
fs = ds.featureset()
feature = fs.next()
feature = None
try :
feature = fs.next()
except StopIteration:
pass
# the above should not throw but will result in no features
eq_(feature,None)
def test_attachdb_with_absolute_file():
# The point table and index is in the qgis_spatiallite.sqlite
# database. If either is not found, then this fails
ds = mapnik.SQLite(file=os.getcwd() + '/../data/sqlite/world.sqlite',
ds = mapnik.SQLite(file=os.getcwd() + '/../data/sqlite/world.sqlite',
table='point',
attachdb='scratch@qgis_spatiallite.sqlite'
)
@ -59,8 +63,13 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
insert into scratch.idx_attachedtest_the_geom values (1,-7799225.5,-7778571.0,1393264.125,1417719.375);
'''
)
fs = ds.featureset()
feature = fs.next()
feature = None
try :
feature = fs.next()
except StopIteration:
pass
eq_(feature,None)
def test_attachdb_with_explicit_index():
@ -75,7 +84,11 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
'''
)
fs = ds.featureset()
feature = fs.next()
feature = None
try:
feature = fs.next()
except StopIteration:
pass
eq_(feature,None)
def test_attachdb_with_sql_join():
@ -127,7 +140,7 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
eq_(feature[str(k)],v)
except:
#import pdb;pdb.set_trace()
print 'invalid key/v %s/%s for: %s' % (k,v,feature)
print 'invalid key/v %s/%s for: %s' % (k,v,feature)
def test_attachdb_with_sql_join_count():
ds = mapnik.SQLite(file='../data/sqlite/world.sqlite',
@ -267,16 +280,20 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
eq_(feature['fips'],u'AC')
def test_empty_db():
ds = mapnik.SQLite(file='../data/sqlite/empty.db',
ds = mapnik.SQLite(file='../data/sqlite/empty.db',
table='empty',
)
fs = ds.featureset()
feature = fs.next()
feature = None
try:
feature = fs.next()
except StopIteration:
pass
eq_(feature,None)
@raises(RuntimeError)
def test_that_nonexistant_query_field_throws(**kwargs):
ds = mapnik.SQLite(file='../data/sqlite/empty.db',
ds = mapnik.SQLite(file='../data/sqlite/empty.db',
table='empty',
)
eq_(len(ds.fields()),25)
@ -310,7 +327,11 @@ if 'sqlite' in mapnik.DatasourceCache.instance().plugin_names():
table='(select * from empty where "a"!="b" and !intersects!)',
)
fs = ds.featureset()
feature = fs.next()
feature = None
try :
feature = fs.next()
except StopIteration:
pass
eq_(feature,None)
if __name__ == "__main__":