[mappy] Parallel feature requests supports - PostGIS async requests

This commit is contained in:
David Marteau 2013-05-16 17:08:42 +02:00 committed by Alexandre Bonnasseau
parent f3b864541c
commit 7495d2f381
12 changed files with 934 additions and 265 deletions

View file

@ -30,6 +30,7 @@
#include <mapnik/query.hpp>
#include <mapnik/feature_layer_desc.hpp>
#include <mapnik/noncopyable.hpp>
#include <mapnik/feature_style_processor_context.hpp>
// boost
#include <boost/shared_ptr.hpp>
@ -109,6 +110,13 @@ public:
* @return The type of the datasource (Vector or Raster)
*/
virtual datasource_t type() const = 0;
virtual processor_context_ptr get_context(feature_style_context_map&) const { return processor_context_ptr(); }
virtual featureset_ptr features_with_context(const query& q,processor_context_ptr ctx= processor_context_ptr()) const
{
// default implementation without context use features method
return features(q);
}
virtual featureset_ptr features(query const& q) const = 0;
virtual featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const = 0;
virtual box2d<double> envelope() const = 0;

View file

@ -27,6 +27,7 @@
#include <mapnik/datasource.hpp> // for featureset_ptr
#include <mapnik/config.hpp>
// stl
#include <set>
#include <string>
@ -41,6 +42,7 @@ class projection;
class proj_transform;
class feature_type_style;
class rule_cache;
struct layer_rendering_material;
enum eAttributeCollectionPolicy
{
@ -67,6 +69,7 @@ public:
void apply(mapnik::layer const& lyr,
std::set<std::string>& names,
double scale_denom_override=0.0);
/*!
* \brief render a layer given a projection and scale.
*/
@ -91,6 +94,26 @@ private:
featureset_ptr features,
proj_transform const& prj_trans);
/*!
* \brief prepare features for rendering asynchronously.
*/
void prepare_layer(layer_rendering_material & mat,
feature_style_context_map & ctx_map,
Processor & p,
projection const& proj0,
double scale,
double scale_denom,
unsigned width,
unsigned height,
box2d<double> const& extent,
int buffer_size,
std::set<std::string>& names);
/*!
* \brief render features list queued when they are available.
*/
void render_material(layer_rendering_material & mat, Processor & p );
Map const& m_;
};
}

View file

@ -0,0 +1,47 @@
/*****************************************************************************
*
* This file is part of Mapnik (c++ mapping toolkit)
*
* Copyright (C) 2013 Artem Pavlenko
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*****************************************************************************/
#ifndef FEATURE_STYLE_PROCESSOR_CONTEXT_HPP
#define FEATURE_STYLE_PROCESSOR_CONTEXT_HPP
// boost
#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
// stl
#include <map>
#include <string>
namespace mapnik {
class IProcessorContext {
public:
virtual ~IProcessorContext() {}
};
typedef boost::shared_ptr<IProcessorContext> processor_context_ptr;
typedef std::map<std::string, processor_context_ptr > feature_style_context_map;
}
#endif /* FEATURE_STYLE_PROCESSOR_CONTEXT_HPP */

View file

@ -45,6 +45,7 @@
#include <mapnik/proj_transform.hpp>
#include <mapnik/util/featureset_buffer.hpp>
// boost
#include <boost/variant/apply_visitor.hpp>
#include <boost/variant/static_visitor.hpp>
@ -126,6 +127,25 @@ struct has_process
);
};
// Store material for layer rendering in a two step process
struct layer_rendering_material {
layer const& lay_;
projection proj0_;
projection proj1_;
box2d<double> layer_ext2_;
std::vector<feature_type_style const*> active_styles_;
std::vector<featureset_ptr> featureset_ptr_list_;
boost::ptr_vector<rule_cache> rule_caches_;
layer_rendering_material(layer const& lay, projection const& dest) :
lay_(lay),
proj0_(dest),
proj1_(lay.srs(),true) {}
};
typedef boost::shared_ptr<layer_rendering_material> layer_rendering_material_ptr;
template <typename Processor>
feature_style_processor<Processor>::feature_style_processor(Map const& m, double scale_factor)
: m_(m)
@ -148,27 +168,54 @@ void feature_style_processor<Processor>::apply(double scale_denom)
scale_denom = mapnik::scale_denominator(m_.scale(),proj.is_geographic());
scale_denom *= p.scale_factor();
// Asynchronous query supports:
// This is a two steps process,
// first we setup all queries at layer level
// in a second time, we fetch the results and
// do the actual rendering
std::vector<layer_rendering_material_ptr> mat_list;
// Define processing context map used by datasources
// implementing asynchronous queries
feature_style_context_map ctx_map;
BOOST_FOREACH ( layer const& lyr, m_.layers() )
{
if (lyr.visible(scale_denom))
{
std::set<std::string> names;
apply_to_layer(lyr,
p,
proj,
m_.scale(),
scale_denom,
m_.width(),
m_.height(),
m_.get_current_extent(),
m_.buffer_size(),
names);
layer_rendering_material_ptr mat = boost::make_shared<layer_rendering_material>(lyr, proj);
prepare_layer(*mat,
ctx_map,
p,
proj,
m_.scale(),
scale_denom,
m_.width(),
m_.height(),
m_.get_current_extent(),
m_.buffer_size(),
names);
// Store active material
if (!mat->active_styles_.empty())
{
mat_list.push_back(mat);
}
}
}
BOOST_FOREACH ( layer_rendering_material_ptr mat, mat_list )
{
if (!mat->active_styles_.empty())
{
render_material(*mat,p);
}
}
p.end_map_processing(m_);
}
template <typename Processor>
@ -199,8 +246,12 @@ void feature_style_processor<Processor>::apply(mapnik::layer const& lyr,
p.end_map_processing(m_);
}
/*!
* \brief render a layer given a projection and scale.
*/
template <typename Processor>
void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Processor & p,
void feature_style_processor<Processor>::apply_to_layer(layer const& lay,
Processor & p,
projection const& proj0,
double scale,
double scale_denom,
@ -210,6 +261,42 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
int buffer_size,
std::set<std::string>& names)
{
feature_style_context_map ctx_map;
layer_rendering_material mat(lay, proj0);
prepare_layer(mat,
ctx_map,
p,
proj0,
m_.scale(),
scale_denom,
m_.width(),
m_.height(),
m_.get_current_extent(),
m_.buffer_size(),
names);
if (!mat.active_styles_.empty())
{
render_material(mat,p);
}
}
template <typename Processor>
void feature_style_processor<Processor>::prepare_layer(layer_rendering_material & mat,
feature_style_context_map & ctx_map,
Processor & p,
projection const& proj0,
double scale,
double scale_denom,
unsigned width,
unsigned height,
box2d<double> const& extent,
int buffer_size,
std::set<std::string>& names)
{
layer const& lay = mat.lay_;
std::vector<std::string> const& style_names = lay.styles();
unsigned int num_styles = style_names.size();
@ -228,8 +315,8 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
return;
}
projection proj1(lay.srs(),true);
proj_transform prj_trans(proj0,proj1);
processor_context_ptr current_ctx = ds->get_context(ctx_map);
proj_transform prj_trans(mat.proj0_,mat.proj1_);
box2d<double> query_ext = extent; // unbuffered
box2d<double> buffered_query_ext(query_ext); // buffered
@ -288,6 +375,8 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
early_return = true;
}
std::vector<feature_type_style const*> & active_styles = mat.active_styles_;
if (early_return)
{
// check for styles needing compositing operations applied
@ -299,13 +388,13 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
{
continue;
}
if (style->comp_op() || style->image_filters().size() > 0)
{
if (style->active(scale_denom))
{
// trigger any needed compositing ops
p.start_style_processing(*style);
p.end_style_processing(*style);
// we'll have to handle compositing ops
active_styles.push_back(&(*style));
}
}
}
@ -319,7 +408,9 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
query_ext.clip(*maximum_extent);
}
box2d<double> layer_ext2 = lay.envelope();
box2d<double> & layer_ext2 = mat.layer_ext2_;
layer_ext2 = lay.envelope();
if (fw_success)
{
if (prj_trans.forward(query_ext, PROJ_ENVELOPE_POINTS))
@ -344,9 +435,8 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
height/qh);
query q(layer_ext,res,scale_denom,extent);
std::vector<feature_type_style const*> active_styles;
boost::ptr_vector<rule_cache> & rule_caches = mat.rule_caches_;
attribute_collector collector(names);
boost::ptr_vector<rule_cache> rule_caches;
// iterate through all named styles collecting active styles and attribute names
BOOST_FOREACH(std::string const& style_name, style_names)
@ -406,57 +496,90 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
{
q.add_property_name(group_by);
}
}
bool cache_features = lay.cache_features() && active_styles.size() > 1;
bool cache_features = lay.cache_features() && active_styles.size() > 1;
std::string group_by = lay.group_by();
// Render incrementally when the column that we group by changes value.
if (!group_by.empty())
std::vector<featureset_ptr> & featureset_ptr_list = mat.featureset_ptr_list_;
if ( (group_by != "") || cache_features)
{
featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx));
}
else
{
for(size_t i = 0 ; i < active_styles.size(); i++)
{
featureset_ptr features = ds->features(q);
if (features)
{
boost::shared_ptr<featureset_buffer> cache = boost::make_shared<featureset_buffer>();
feature_ptr feature, prev;
while ((feature = features->next()))
{
if (prev && prev->get(group_by) != feature->get(group_by))
{
// We're at a value boundary, so render what we have
// up to this point.
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
cache->prepare();
render_style(p, style, rule_caches[i], cache, prj_trans);
i++;
}
cache->clear();
}
cache->push(feature);
prev = feature;
}
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
cache->prepare();
render_style(p, style, rule_caches[i], cache, prj_trans);
i++;
}
}
featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx));
}
else if (cache_features)
}
}
template <typename Processor>
void feature_style_processor<Processor>::render_material(layer_rendering_material & mat, Processor & p )
{
std::vector<feature_type_style const*> & active_styles = mat.active_styles_;
std::vector<featureset_ptr> & featureset_ptr_list = mat.featureset_ptr_list_;
if (featureset_ptr_list.empty())
{
// The datasource wasn't querried because of early return
// but we have to apply compositing operations on styles
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
featureset_ptr features = ds->features(q);
p.start_style_processing(*style);
p.end_style_processing(*style);
}
return;
}
p.start_layer_processing(mat.lay_, mat.layer_ext2_);
layer const& lay = mat.lay_;
boost::ptr_vector<rule_cache> & rule_caches = mat.rule_caches_;
proj_transform prj_trans(mat.proj0_,mat.proj1_);
bool cache_features = lay.cache_features() && active_styles.size() > 1;
datasource_ptr ds = lay.datasource();
std::string group_by = lay.group_by();
// Render incrementally when the column that we group by
// changes value.
if (group_by != "")
{
featureset_ptr features = *featureset_ptr_list.begin();
if (features) {
// Cache all features into the memory_datasource before rendering.
boost::shared_ptr<featureset_buffer> cache = boost::make_shared<featureset_buffer>();
if (features)
feature_ptr feature, prev;
while ((feature = features->next()))
{
feature_ptr feature;
while ((feature = features->next()))
if (prev && prev->get(group_by) != feature->get(group_by))
{
cache->push(feature);
// We're at a value boundary, so render what we have
// up to this point.
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
cache->prepare();
render_style(p, style,
rule_caches[i],
cache,
prj_trans);
i++;
}
cache->clear();
}
cache->push(feature);
prev = feature;
}
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
@ -466,20 +589,48 @@ void feature_style_processor<Processor>::apply_to_layer(layer const& lay, Proces
}
cache->clear();
}
// We only have a single style and no grouping.
else
{
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
}
else if (cache_features)
{
boost::shared_ptr<featureset_buffer> cache = boost::make_shared<featureset_buffer>();
featureset_ptr features = *featureset_ptr_list.begin();
if (features) {
// Cache all features into the memory_datasource before rendering.
feature_ptr feature;
while ((feature = features->next()))
{
render_style(p, style, rule_caches[i], ds->features(q), prj_trans);
i++;
cache->push(feature);
}
}
int i = 0;
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
cache->prepare();
render_style(p, style,
rule_caches[i],
cache, prj_trans);
i++;
}
}
// We only have a single style and no grouping.
else
{
int i = 0;
std::vector<featureset_ptr>::iterator featuresets = featureset_ptr_list.begin();
BOOST_FOREACH (feature_type_style const* style, active_styles)
{
featureset_ptr features = *featuresets++;
render_style(p, style,
rule_caches[i],
features,
prj_trans);
i++;
}
}
p.end_layer_processing(lay);
}
p.end_layer_processing(mat.lay_);
}
template <typename Processor>
void feature_style_processor<Processor>::render_style(

View file

@ -0,0 +1,212 @@
/*****************************************************************************
*
* This file is part of Mapnik (c++ mapping toolkit)
*
* Copyright (C) 2013 Artem Pavlenko
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*****************************************************************************/
#ifndef POSTGIS_ASYNCRESULTSET_HPP
#define POSTGIS_ASYNCRESULTSET_HPP
#include <mapnik/debug.hpp>
#include <mapnik/datasource.hpp>
#include "connection_manager.hpp"
#include "resultset.hpp"
#include <queue>
class postgis_processor_context;
typedef boost::shared_ptr<postgis_processor_context> postgis_processor_context_ptr;
class AsyncResultSet : public IResultSet, private boost::noncopyable
{
public:
AsyncResultSet(postgis_processor_context_ptr const& ctx,
boost::shared_ptr< Pool<Connection,ConnectionCreator> > const& pool,
boost::shared_ptr<Connection> const &conn, const std::string& sql )
: ctx_(ctx),
pool_(pool),
conn_(conn),
sql_(sql),
is_closed_(false)
{
}
virtual bool use_connection() { return true; }
virtual ~AsyncResultSet()
{
close();
}
virtual void close()
{
if (!is_closed_)
{
rs_.reset();
is_closed_ = true;
if (conn_)
{
conn_.reset();
}
}
}
virtual int getNumFields() const
{
return rs_->getNumFields();
}
virtual bool next()
{
bool next_res = false;
if (!rs_)
{
// Ensure connection is valid
if (conn_ && conn_->isOK())
{
rs_ = conn_->getAsyncResult();
}
else
{
throw mapnik::datasource_exception("invalid connection in AsyncResultSet::next");
}
}
next_res = rs_->next();
if (!next_res)
{
rs_.reset();
rs_ = conn_->getNextAsyncResult();
if (rs_ && rs_->next())
{
return true;
}
close();
prepare_next();
}
return next_res;
}
virtual const char* getFieldName(int index) const
{
return rs_->getFieldName(index);
}
virtual int getFieldLength(int index) const
{
return rs_->getFieldLength(index);
}
virtual int getFieldLength(const char* name) const
{
return rs_->getFieldLength(name);
}
virtual int getTypeOID(int index) const
{
return rs_->getTypeOID(index);
}
virtual int getTypeOID(const char* name) const
{
return rs_->getTypeOID(name);
}
virtual bool isNull(int index) const
{
return rs_->isNull(index);
}
virtual const char* getValue(int index) const
{
return rs_->getValue(index);
}
virtual const char* getValue(const char* name) const
{
return rs_->getValue(name);
}
private:
postgis_processor_context_ptr ctx_;
boost::shared_ptr< Pool<Connection,ConnectionCreator> > pool_;
boost::shared_ptr<Connection> conn_;
std::string sql_;
boost::shared_ptr<ResultSet> rs_;
bool is_closed_;
void prepare()
{
conn_ = pool_->borrowObject();
if (conn_ && conn_->isOK())
{
conn_->executeAsyncQuery(sql_, 1);
}
else
{
throw mapnik::datasource_exception("Postgis Plugin: bad connection");
}
}
void prepare_next();
};
class postgis_processor_context : public mapnik::IProcessorContext
{
public:
postgis_processor_context():num_async_requests_(0) {}
~postgis_processor_context() {}
void add_request(boost::shared_ptr<AsyncResultSet> const& req)
{
q_.push(req);
}
boost::shared_ptr<AsyncResultSet> pop_next_request()
{
boost::shared_ptr<AsyncResultSet> r;
if (!q_.empty())
{
r = q_.front();
q_.pop();
}
return r;
}
int num_async_requests_;
private:
typedef std::queue<boost::shared_ptr<AsyncResultSet> > asynch_queue;
asynch_queue q_;
};
inline void AsyncResultSet::prepare_next()
{
// ensure cnx pool has unused cnx
boost::shared_ptr<AsyncResultSet> next = ctx_->pop_next_request();
if (next)
{
next->prepare();
}
}
#endif // POSTGIS_ASYNCRESULTSET_HPP

View file

@ -108,7 +108,7 @@ public:
if (! result || (PQresultStatus(result) != PGRES_TUPLES_OK))
{
std::string err_msg = status();
err_msg += "\nFull sql was: '";
err_msg += "\nin executeQuery Full sql was: '";
err_msg += sql;
err_msg += "'\n";
if (result)
@ -136,6 +136,77 @@ public:
return status;
}
bool executeAsyncQuery(std::string const& sql, int type = 0)
{
int result = 0;
if (type == 1)
{
result = PQsendQueryParams(conn_,sql.c_str(), 0, 0, 0, 0, 0, 1);
}
else
{
result = PQsendQuery(conn_, sql.c_str());
}
if (result != 1)
{
std::string err_msg = status();
err_msg += "\nin executeAsyncQuery Full sql was: '";
err_msg += sql;
err_msg += "'\n";
clearAsyncResult(PQgetResult(conn_));
close();
throw mapnik::datasource_exception(err_msg);
}
return result;
}
boost::shared_ptr<ResultSet> getNextAsyncResult()
{
PGresult *result = PQgetResult(conn_);
if( result && (PQresultStatus(result) != PGRES_TUPLES_OK))
{
std::string err_msg = status();
err_msg += "\nin getNextAsyncResult";
clearAsyncResult(result);
// We need to be guarded against losing the connection
// (i.e db restart), we invalidate the full connection
close();
throw mapnik::datasource_exception(err_msg);
}
return boost::make_shared<ResultSet>(result);
}
boost::shared_ptr<ResultSet> getAsyncResult()
{
PGresult *result = PQgetResult(conn_);
if ( !result || (PQresultStatus(result) != PGRES_TUPLES_OK))
{
std::string err_msg = status();
err_msg += "\nin getAsyncResult Full sql was: '";
clearAsyncResult(result);
// We need to be guarded against losing the connection
// (i.e db restart), we invalidate the full connection
close();
throw mapnik::datasource_exception(err_msg);
}
return boost::make_shared<ResultSet>(result);
}
std::string client_encoding() const
{
return PQparameterStatus(conn_, "client_encoding");
@ -169,6 +240,16 @@ private:
PGconn *conn_;
int cursorId;
bool closed_;
void clearAsyncResult(PGresult *result) const
{
// Clear all pending results
while(result)
{
PQclear(result);
result = PQgetResult(conn_);
}
}
};
#endif //CONNECTION_HPP

View file

@ -101,8 +101,12 @@ private:
class ConnectionManager : public singleton <ConnectionManager,CreateStatic>
{
friend class CreateStatic<ConnectionManager>;
public:
typedef Pool<Connection,ConnectionCreator> PoolType;
private:
friend class CreateStatic<ConnectionManager>;
typedef std::map<std::string,boost::shared_ptr<PoolType> > ContType;
typedef boost::shared_ptr<Connection> HolderType;
ContType pools_;

View file

@ -28,56 +28,23 @@
#include "connection.hpp"
#include "resultset.hpp"
class CursorResultSet : public IResultSet
class CursorResultSet : public IResultSet, private mapnik::noncopyable
{
public:
CursorResultSet(boost::shared_ptr<Connection> const &conn, std::string cursorName, int fetch_count)
: conn_(conn),
cursorName_(cursorName),
fetch_size_(fetch_count),
is_closed_(false),
refCount_(new int(1))
is_closed_(false)
{
getNextResultSet();
}
CursorResultSet(const CursorResultSet& rhs)
: conn_(rhs.conn_),
cursorName_(rhs.cursorName_),
rs_(rhs.rs_),
fetch_size_(rhs.fetch_size_),
is_closed_(rhs.is_closed_),
refCount_(rhs.refCount_)
{
(*refCount_)++;
}
virtual ~CursorResultSet()
{
if (--(*refCount_)==0)
{
close();
delete refCount_,refCount_=0;
}
close();
}
CursorResultSet& operator=(const CursorResultSet& rhs)
{
if (this==&rhs) return *this;
if (--(refCount_)==0)
{
close();
delete refCount_,refCount_=0;
}
conn_=rhs.conn_;
cursorName_=rhs.cursorName_;
rs_=rhs.rs_;
refCount_=rhs.refCount_;
fetch_size_=rhs.fetch_size_;
is_closed_ = false;
(*refCount_)++;
return *this;
}
virtual void close()
{
@ -92,6 +59,7 @@ public:
conn_->execute(s.str());
is_closed_ = true;
conn_.reset();
}
}
@ -105,6 +73,7 @@ public:
if (rs_->next()) {
return true;
} else if (rs_->size() == 0) {
close();
return false;
} else {
getNextResultSet();
@ -171,7 +140,8 @@ private:
boost::shared_ptr<ResultSet> rs_;
int fetch_size_;
bool is_closed_;
int *refCount_;
};
#endif // POSTGIS_CURSORRESULTSET_HPP

View file

@ -78,8 +78,11 @@ postgis_datasource::postgis_datasource(parameters const& params)
scale_denom_token_("!scale_denominator!"),
pixel_width_token_("!pixel_width!"),
pixel_height_token_("!pixel_height!"),
pool_max_size_(*params_.get<int>("max_size", 5)),
persist_connection_(*params.get<mapnik::boolean>("persist_connection", true)),
extent_from_subquery_(*params.get<mapnik::boolean>("extent_from_subquery", false)),
max_async_connections_(*params_.get<int>("max_async_connection", 1)),
asynchronous_request_(false),
// params below are for testing purposes only (will likely be removed at any time)
intersect_min_scale_(*params.get<int>("intersect_min_scale", 0)),
intersect_max_scale_(*params.get<int>("intersect_max_scale", 0))
@ -98,16 +101,29 @@ postgis_datasource::postgis_datasource(parameters const& params)
extent_initialized_ = extent_.from_string(*ext);
}
// NOTE: In multithread environment, pool_max_size_ should be
// max_async_connections_ * num_threads
if(max_async_connections_ > 1)
{
if(max_async_connections_ > pool_max_size_)
{
std::ostringstream err;
err << "PostGIS Plugin: Error: 'max_async_connections_ must be > pool_max_size_\n";
throw mapnik::datasource_exception(err.str());
}
asynchronous_request_ = true;
}
boost::optional<int> initial_size = params.get<int>("initial_size", 1);
boost::optional<int> max_size = params.get<int>("max_size", 10);
boost::optional<mapnik::boolean> autodetect_key_field = params.get<mapnik::boolean>("autodetect_key_field", false);
boost::optional<mapnik::boolean> estimate_extent = params.get<mapnik::boolean>("estimate_extent", false);
estimate_extent_ = estimate_extent && *estimate_extent;
boost::optional<mapnik::boolean> simplify_opt = params.get<mapnik::boolean>("simplify_geometries", false);
simplify_geometries_ = simplify_opt && *simplify_opt;
ConnectionManager::instance().registerPool(creator_, *initial_size, *max_size);
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
ConnectionManager::instance().registerPool(creator_, *initial_size, pool_max_size_);
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();
@ -413,6 +429,10 @@ postgis_datasource::postgis_datasource(parameters const& params)
rs->close();
}
// Close explicitly the connection so we can 'fork()' without sharing open connections
conn->close();
}
}
@ -420,7 +440,7 @@ postgis_datasource::~postgis_datasource()
{
if (! persist_connection_)
{
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();
@ -557,156 +577,220 @@ std::string postgis_datasource::populate_tokens(std::string const& sql, double s
}
boost::shared_ptr<IResultSet> postgis_datasource::get_resultset(boost::shared_ptr<Connection> const &conn, std::string const& sql) const
boost::shared_ptr<IResultSet> postgis_datasource::get_resultset(boost::shared_ptr<Connection> &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx) const
{
if (cursor_fetch_size_ > 0)
if (!ctx)
{
// cursor
std::ostringstream csql;
std::string cursor_name = conn->new_cursor_name();
csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY";
if (! conn->execute(csql.str()))
// ! asynchronous_request_
if (cursor_fetch_size_ > 0)
{
// TODO - better error
throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." );
// cursor
std::ostringstream csql;
std::string cursor_name = conn->new_cursor_name();
csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY";
if (! conn->execute(csql.str()))
{
// TODO - better error
throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." );
}
return boost::make_shared<CursorResultSet>(conn, cursor_name, cursor_fetch_size_);
}
else
{
// no cursor
return conn->executeQuery(sql, 1);
}
return boost::make_shared<CursorResultSet>(conn, cursor_name, cursor_fetch_size_);
}
else
{
// no cursor
return conn->executeQuery(sql, 1);
{ // asynchronous requests
boost::shared_ptr<postgis_processor_context> pgis_ctxt = boost::static_pointer_cast<postgis_processor_context>(ctx);
if (conn)
{
// lauch async req & create asyncresult with conn
conn->executeAsyncQuery(sql, 1);
return boost::make_shared<AsyncResultSet>(pgis_ctxt, pool, conn, sql);
}
else
{
// create asyncresult with null connection
boost::shared_ptr<AsyncResultSet> res = boost::make_shared<AsyncResultSet>(pgis_ctxt, pool, conn, sql);
pgis_ctxt->add_request(res);
return res;
}
}
}
processor_context_ptr postgis_datasource::get_context(feature_style_context_map & ctx) const
{
std::string ds_name(name());
if (! asynchronous_request_)
return processor_context_ptr();
if (!ctx.count(ds_name))
{
processor_context_ptr pgis_ctx = boost::make_shared<postgis_processor_context>();
ctx[ds_name] = pgis_ctx;
return ctx[ds_name];
}
else
return ctx[ds_name];
}
featureset_ptr postgis_datasource::features(const query& q) const
{
// if the driver is in asynchronous mode, return the appropriate fetaures
if ( asynchronous_request_ )
{
return features_with_context(q,boost::make_shared<postgis_processor_context>());
}
else
{
return features_with_context(q);
}
}
featureset_ptr postgis_datasource::features_with_context(const query& q,processor_context_ptr proc_ctx) const
{
#ifdef MAPNIK_STATS
mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features");
mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_with_context");
#endif
box2d<double> const& box = q.get_bbox();
double scale_denom = q.scale_denominator();
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();
if (conn && conn->isOK())
shared_ptr<Connection> conn;
if ( asynchronous_request_ )
{
if (geometryColumn_.empty())
// limit use to num_async_request_ => if reached don't borrow the last connexion object
boost::shared_ptr<postgis_processor_context> pgis_ctxt = boost::static_pointer_cast<postgis_processor_context>(proc_ctx);
if ( pgis_ctxt->num_async_requests_ < max_async_connections_ )
{
std::ostringstream s_error;
s_error << "PostGIS: geometry name lookup failed for table '";
if (! schema_.empty())
{
s_error << schema_ << ".";
}
s_error << geometry_table_
<< "'. Please manually provide the 'geometry_field' parameter or add an entry "
<< "in the geometry_columns for '";
if (! schema_.empty())
{
s_error << schema_ << ".";
}
s_error << geometry_table_ << "'.";
throw mapnik::datasource_exception(s_error.str());
conn = pool->borrowObject();
pgis_ctxt->num_async_requests_++;
}
std::ostringstream s;
const double px_gw = 1.0 / boost::get<0>(q.resolution());
const double px_gh = 1.0 / boost::get<1>(q.resolution());
s << "SELECT ST_AsBinary(";
if (simplify_geometries_) {
s << "ST_Simplify(";
}
s << "\"" << geometryColumn_ << "\"";
if (simplify_geometries_) {
// 1/20 of pixel seems to be a good compromise to avoid
// drop of collapsed polygons.
// See https://github.com/mapnik/mapnik/issues/1639
const double tolerance = std::min(px_gw, px_gh) / 20.0;
s << ", " << tolerance << ")";
}
s << ") AS geom";
mapnik::context_ptr ctx = boost::make_shared<mapnik::context_type>();
std::set<std::string> const& props = q.property_names();
std::set<std::string>::const_iterator pos = props.begin();
std::set<std::string>::const_iterator end = props.end();
if (! key_field_.empty())
}
else
{
// Always get a connection in synchronous mode
conn = pool->borrowObject();
if(!conn )
{
mapnik::sql_utils::quote_attr(s, key_field_);
ctx->push(key_field_);
for (; pos != end; ++pos)
{
if (*pos != key_field_)
{
mapnik::sql_utils::quote_attr(s, *pos);
ctx->push(*pos);
}
}
throw mapnik::datasource_exception("Postgis Plugin: Null connection");
}
else
}
if (geometryColumn_.empty())
{
std::ostringstream s_error;
s_error << "PostGIS: geometry name lookup failed for table '";
if (! schema_.empty())
{
for (; pos != end; ++pos)
s_error << schema_ << ".";
}
s_error << geometry_table_
<< "'. Please manually provide the 'geometry_field' parameter or add an entry "
<< "in the geometry_columns for '";
if (! schema_.empty())
{
s_error << schema_ << ".";
}
s_error << geometry_table_ << "'.";
throw mapnik::datasource_exception(s_error.str());
}
std::ostringstream s;
const double px_gw = 1.0 / boost::get<0>(q.resolution());
const double px_gh = 1.0 / boost::get<1>(q.resolution());
s << "SELECT ST_AsBinary(";
if (simplify_geometries_) {
s << "ST_Simplify(";
}
s << "\"" << geometryColumn_ << "\"";
if (simplify_geometries_) {
// 1/20 of pixel seems to be a good compromise to avoid
// drop of collapsed polygons.
// See https://github.com/mapnik/mapnik/issues/1639
const double tolerance = std::min(px_gw, px_gh) / 2.0;
s << ", " << tolerance << ")";
}
s << ") AS geom";
mapnik::context_ptr ctx = boost::make_shared<mapnik::context_type>();
std::set<std::string> const& props = q.property_names();
std::set<std::string>::const_iterator pos = props.begin();
std::set<std::string>::const_iterator end = props.end();
if (! key_field_.empty())
{
mapnik::sql_utils::quote_attr(s, key_field_);
ctx->push(key_field_);
for (; pos != end; ++pos)
{
if (*pos != key_field_)
{
mapnik::sql_utils::quote_attr(s, *pos);
ctx->push(*pos);
}
}
std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh);
s << " FROM " << table_with_bbox;
if (row_limit_ > 0)
{
s << " LIMIT " << row_limit_;
}
boost::shared_ptr<IResultSet> rs = get_resultset(conn, s.str());
return boost::make_shared<postgis_featureset>(rs, ctx, desc_.get_encoding(), !key_field_.empty());
}
else
{
std::string err_msg = "Postgis Plugin:";
if (conn)
for (; pos != end; ++pos)
{
err_msg += conn->status();
mapnik::sql_utils::quote_attr(s, *pos);
ctx->push(*pos);
}
else
{
err_msg += " Null connection";
}
throw mapnik::datasource_exception(err_msg);
}
std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh);
s << " FROM " << table_with_bbox;
if (row_limit_ > 0)
{
s << " LIMIT " << row_limit_;
}
boost::shared_ptr<IResultSet> rs = get_resultset(conn, s.str(), pool, proc_ctx);
return boost::make_shared<postgis_featureset>(rs, ctx, desc_.get_encoding(), !key_field_.empty());
}
return featureset_ptr();
}
featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double tol) const
{
#ifdef MAPNIK_STATS
mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_at_point");
#endif
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();
@ -775,7 +859,7 @@ featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double t
s << " LIMIT " << row_limit_;
}
boost::shared_ptr<IResultSet> rs = get_resultset(conn, s.str());
boost::shared_ptr<IResultSet> rs = get_resultset(conn, s.str(), pool);
return boost::make_shared<postgis_featureset>(rs, ctx, desc_.get_encoding(), !key_field_.empty());
}
}
@ -790,7 +874,7 @@ box2d<double> postgis_datasource::envelope() const
return extent_;
}
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();
@ -881,7 +965,7 @@ boost::optional<mapnik::datasource::geometry_t> postgis_datasource::get_geometry
{
boost::optional<mapnik::datasource::geometry_t> result;
shared_ptr< Pool<Connection,ConnectionCreator> > pool = ConnectionManager::instance().getPool(creator_.id());
CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id());
if (pool)
{
shared_ptr<Connection> conn = pool->borrowObject();

View file

@ -46,9 +46,12 @@
#include "connection_manager.hpp"
#include "resultset.hpp"
#include "cursorresultset.hpp"
#include "asyncresultset.hpp"
using mapnik::transcoder;
using mapnik::datasource;
using mapnik::feature_style_context_map;
using mapnik::processor_context_ptr;
using mapnik::box2d;
using mapnik::layer_descriptor;
using mapnik::featureset_ptr;
@ -57,6 +60,8 @@ using mapnik::query;
using mapnik::parameters;
using mapnik::coord2d;
typedef boost::shared_ptr< ConnectionManager::PoolType> CnxPool_ptr;
class postgis_datasource : public datasource
{
public:
@ -64,6 +69,8 @@ public:
~postgis_datasource();
mapnik::datasource::datasource_t type() const;
static const char * name();
processor_context_ptr get_context(feature_style_context_map &) const;
featureset_ptr features_with_context(const query& q, processor_context_ptr ctx= processor_context_ptr()) const;
featureset_ptr features(const query& q) const;
featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const;
mapnik::box2d<double> envelope() const;
@ -74,8 +81,7 @@ private:
std::string sql_bbox(box2d<double> const& env) const;
std::string populate_tokens(std::string const& sql, double scale_denom, box2d<double> const& env, double pixel_width, double pixel_height) const;
std::string populate_tokens(std::string const& sql) const;
boost::shared_ptr<IResultSet> get_resultset(boost::shared_ptr<Connection> const &conn, std::string const& sql) const;
boost::shared_ptr<IResultSet> get_resultset(boost::shared_ptr<Connection> &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx= processor_context_ptr()) const;
static const std::string GEOMETRY_COLUMNS;
static const std::string SPATIAL_REF_SYS;
static const double FMAX;
@ -102,12 +108,16 @@ private:
const std::string scale_denom_token_;
const std::string pixel_width_token_;
const std::string pixel_height_token_;
int pool_max_size_;
bool persist_connection_;
bool extent_from_subquery_;
bool estimate_extent_;
int max_async_connections_;
bool asynchronous_request_;
// params below are for testing purposes only (will likely be removed at any time)
int intersect_min_scale_;
int intersect_max_scale_;
};
#endif // POSTGIS_DATASOURCE_HPP

View file

@ -45,49 +45,16 @@ public:
virtual const char* getValue(const char* name) const = 0;
};
class ResultSet : public IResultSet
class ResultSet : public IResultSet, private mapnik::noncopyable
{
public:
ResultSet(PGresult *res)
: res_(res),
pos_(-1),
refCount_(new int(1))
pos_(-1)
{
numTuples_ = PQntuples(res_);
}
ResultSet(const ResultSet& rhs)
: res_(rhs.res_),
pos_(rhs.pos_),
numTuples_(rhs.numTuples_),
refCount_(rhs.refCount_)
{
(*refCount_)++;
}
ResultSet& operator=(const ResultSet& rhs)
{
if (this == &rhs)
{
return *this;
}
if (--(refCount_) == 0)
{
close();
delete refCount_;
refCount_ = 0;
}
res_ = rhs.res_;
pos_ = rhs.pos_;
numTuples_ = rhs.numTuples_;
refCount_ = rhs.refCount_;
(*refCount_)++;
return *this;
}
virtual void close()
{
PQclear(res_);
@ -96,13 +63,7 @@ public:
virtual ~ResultSet()
{
if (--(*refCount_) == 0)
{
PQclear(res_);
delete refCount_;
refCount_ = 0;
}
PQclear(res_);
}
virtual int getNumFields() const
@ -184,7 +145,6 @@ private:
PGresult* res_;
int pos_;
int numTuples_;
int *refCount_;
};
#endif // POSTGIS_RESULTSET_HPP

View file

@ -0,0 +1,119 @@
#!/usr/bin/env python
from nose.tools import *
import sys
import time
from utilities import execution_path
from subprocess import Popen, PIPE
import os, mapnik
MAPNIK_TEST_DBNAME = 'mapnik-tmp-postgis-async-test-db'
POSTGIS_TEMPLATE_DBNAME = 'template_postgis'
def setup():
# All of the paths used are relative, if we run the tests
# from another directory we need to chdir()
os.chdir(execution_path('.'))
def call(cmd,silent=False):
stdin, stderr = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE).communicate()
if not stderr:
return stdin.strip()
elif not silent and not 'NOTICE' in stderr:
raise RuntimeError(stderr.strip())
def psql_can_connect():
"""Test ability to connect to a postgis template db with no options.
Basically, to run these tests your user must have full read
access over unix sockets without supplying a password. This
keeps these tests simple and focused on postgis not on postgres
auth issues.
"""
try:
call('psql %s -c "select postgis_version()"' % POSTGIS_TEMPLATE_DBNAME)
return True
except RuntimeError, e:
print 'Notice: skipping postgis tests (connection)'
return False
def createdb_and_dropdb_on_path():
"""Test for presence of dropdb/createdb on user path.
We require these programs to setup and teardown the testing db.
"""
try:
call('createdb --help')
call('dropdb --help')
return True
except RuntimeError, e:
print 'Notice: skipping postgis tests (createdb/dropdb)'
return False
insert_table_1 = """
CREATE TABLE test1(gid serial PRIMARY KEY, label varchar(40), geom geometry);
INSERT INTO test1(label,geom) values ('label_1',GeomFromEWKT('SRID=4326;POINT(0 0)'));
INSERT INTO test1(label,geom) values ('label_2',GeomFromEWKT('SRID=4326;POINT(-2 2)'));
INSERT INTO test1(label,geom) values ('label_3',GeomFromEWKT('SRID=4326;MULTIPOINT(2 1,1 2)'));
INSERT INTO test1(label,geom) values ('label_4',GeomFromEWKT('SRID=4326;LINESTRING(0 0,1 1,1 2)'));
INSERT INTO test1(label,geom) values ('label_5',GeomFromEWKT('SRID=4326;MULTILINESTRING((1 0,0 1,3 2),(3 2,5 4))'));
INSERT INTO test1(label,geom) values ('label_6',GeomFromEWKT('SRID=4326;POLYGON((0 0,4 0,4 4,0 4,0 0),(1 1, 2 1, 2 2, 1 2,1 1))'));
INSERT INTO test1(label,geom) values ('label_7',GeomFromEWKT('SRID=4326;MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1),(1 1,2 1,2 2,1 2,1 1)), ((-1 -1,-1 -2,-2 -2,-2 -1,-1 -1)))'));
INSERT INTO test1(label,geom) values ('label_8',GeomFromEWKT('SRID=4326;GEOMETRYCOLLECTION(POLYGON((1 1, 2 1, 2 2, 1 2,1 1)),POINT(2 3),LINESTRING(2 3,3 4))'));
"""
insert_table_2 = """
CREATE TABLE test2(gid serial PRIMARY KEY, label varchar(40), geom geometry);
INSERT INTO test2(label,geom) values ('label_1',GeomFromEWKT('SRID=4326;POINT(0 0)'));
INSERT INTO test2(label,geom) values ('label_2',GeomFromEWKT('SRID=4326;POINT(-2 2)'));
INSERT INTO test2(label,geom) values ('label_3',GeomFromEWKT('SRID=4326;MULTIPOINT(2 1,1 2)'));
INSERT INTO test2(label,geom) values ('label_4',GeomFromEWKT('SRID=4326;LINESTRING(0 0,1 1,1 2)'));
INSERT INTO test2(label,geom) values ('label_5',GeomFromEWKT('SRID=4326;MULTILINESTRING((1 0,0 1,3 2),(3 2,5 4))'));
INSERT INTO test2(label,geom) values ('label_6',GeomFromEWKT('SRID=4326;POLYGON((0 0,4 0,4 4,0 4,0 0),(1 1, 2 1, 2 2, 1 2,1 1))'));
INSERT INTO test2(label,geom) values ('label_7',GeomFromEWKT('SRID=4326;MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1),(1 1,2 1,2 2,1 2,1 1)), ((-1 -1,-1 -2,-2 -2,-2 -1,-1 -1)))'));
INSERT INTO test2(label,geom) values ('label_8',GeomFromEWKT('SRID=4326;GEOMETRYCOLLECTION(POLYGON((1 1, 2 1, 2 2, 1 2,1 1)),POINT(2 3),LINESTRING(2 3,3 4))'));
"""
def postgis_setup():
call('dropdb %s' % MAPNIK_TEST_DBNAME,silent=True)
call('createdb -T %s %s' % (POSTGIS_TEMPLATE_DBNAME,MAPNIK_TEST_DBNAME),silent=False)
call('''psql -q %s -c "%s"''' % (MAPNIK_TEST_DBNAME,insert_table_1),silent=False)
call('''psql -q %s -c "%s"''' % (MAPNIK_TEST_DBNAME,insert_table_2),silent=False)
if 'postgis' in mapnik.DatasourceCache.plugin_names() \
and createdb_and_dropdb_on_path() \
and psql_can_connect():
# initialize test database
postgis_setup()
def test_psql_error_should_not_break_connection_pool():
# Bad request, will trig an error when returning result
ds_bad = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table="""(SELECT geom as geom,label::int from public.test1) as failure_table""",
max_async_connection=5,geometry_field='geom',srid=4326,trace=False)
# Good request
ds_good = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table="test1",
max_async_connection=5,geometry_field='geom',srid=4326,trace=False)
# This will/should trig a PSQL error
failed = False
try:
fs = ds_bad.featureset()
for feature in fs:
pass
except RuntimeError:
failed = True
assert_true(failed)
# Should be ok
fs = ds_good.featureset()
for feature in fs:
pass
if __name__ == "__main__":
setup()
run_all(eval(x) for x in dir() if x.startswith("test_"))