From 7495d2f38141c0e0132c6b7649b8a28bedcb70d8 Mon Sep 17 00:00:00 2001 From: David Marteau Date: Thu, 16 May 2013 17:08:42 +0200 Subject: [PATCH] [mappy] Parallel feature requests supports - PostGIS async requests --- include/mapnik/datasource.hpp | 8 + include/mapnik/feature_style_processor.hpp | 23 ++ .../feature_style_processor_context.hpp | 47 +++ .../mapnik/feature_style_processor_impl.hpp | 291 +++++++++++++---- plugins/input/postgis/asyncresultset.hpp | 212 ++++++++++++ plugins/input/postgis/connection.hpp | 83 ++++- plugins/input/postgis/connection_manager.hpp | 6 +- plugins/input/postgis/cursorresultset.hpp | 44 +-- plugins/input/postgis/postgis_datasource.cpp | 306 +++++++++++------- plugins/input/postgis/postgis_datasource.hpp | 14 +- plugins/input/postgis/resultset.hpp | 46 +-- tests/python_tests/postgis_async_test.py | 119 +++++++ 12 files changed, 934 insertions(+), 265 deletions(-) create mode 100644 include/mapnik/feature_style_processor_context.hpp create mode 100644 plugins/input/postgis/asyncresultset.hpp create mode 100644 tests/python_tests/postgis_async_test.py diff --git a/include/mapnik/datasource.hpp b/include/mapnik/datasource.hpp index b2edb91d4..947fc056a 100644 --- a/include/mapnik/datasource.hpp +++ b/include/mapnik/datasource.hpp @@ -30,6 +30,7 @@ #include #include #include +#include // boost #include @@ -109,6 +110,13 @@ public: * @return The type of the datasource (Vector or Raster) */ virtual datasource_t type() const = 0; + + virtual processor_context_ptr get_context(feature_style_context_map&) const { return processor_context_ptr(); } + virtual featureset_ptr features_with_context(const query& q,processor_context_ptr ctx= processor_context_ptr()) const + { + // default implementation without context use features method + return features(q); + } virtual featureset_ptr features(query const& q) const = 0; virtual featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const = 0; virtual box2d envelope() const = 0; diff --git a/include/mapnik/feature_style_processor.hpp b/include/mapnik/feature_style_processor.hpp index 93a3114bb..6a652150c 100644 --- a/include/mapnik/feature_style_processor.hpp +++ b/include/mapnik/feature_style_processor.hpp @@ -27,6 +27,7 @@ #include // for featureset_ptr #include + // stl #include #include @@ -41,6 +42,7 @@ class projection; class proj_transform; class feature_type_style; class rule_cache; +struct layer_rendering_material; enum eAttributeCollectionPolicy { @@ -67,6 +69,7 @@ public: void apply(mapnik::layer const& lyr, std::set& names, double scale_denom_override=0.0); + /*! * \brief render a layer given a projection and scale. */ @@ -91,6 +94,26 @@ private: featureset_ptr features, proj_transform const& prj_trans); + /*! + * \brief prepare features for rendering asynchronously. + */ + void prepare_layer(layer_rendering_material & mat, + feature_style_context_map & ctx_map, + Processor & p, + projection const& proj0, + double scale, + double scale_denom, + unsigned width, + unsigned height, + box2d const& extent, + int buffer_size, + std::set& names); + + /*! + * \brief render features list queued when they are available. + */ + void render_material(layer_rendering_material & mat, Processor & p ); + Map const& m_; }; } diff --git a/include/mapnik/feature_style_processor_context.hpp b/include/mapnik/feature_style_processor_context.hpp new file mode 100644 index 000000000..376886b9d --- /dev/null +++ b/include/mapnik/feature_style_processor_context.hpp @@ -0,0 +1,47 @@ +/***************************************************************************** + * + * This file is part of Mapnik (c++ mapping toolkit) + * + * Copyright (C) 2013 Artem Pavlenko + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + *****************************************************************************/ + +#ifndef FEATURE_STYLE_PROCESSOR_CONTEXT_HPP +#define FEATURE_STYLE_PROCESSOR_CONTEXT_HPP + +// boost +#include +#include + +// stl +#include +#include + +namespace mapnik { + + +class IProcessorContext { +public: + virtual ~IProcessorContext() {} +}; + +typedef boost::shared_ptr processor_context_ptr; +typedef std::map feature_style_context_map; + +} + +#endif /* FEATURE_STYLE_PROCESSOR_CONTEXT_HPP */ diff --git a/include/mapnik/feature_style_processor_impl.hpp b/include/mapnik/feature_style_processor_impl.hpp index 76f4f190c..58dca7b13 100644 --- a/include/mapnik/feature_style_processor_impl.hpp +++ b/include/mapnik/feature_style_processor_impl.hpp @@ -45,6 +45,7 @@ #include #include + // boost #include #include @@ -126,6 +127,25 @@ struct has_process ); }; +// Store material for layer rendering in a two step process +struct layer_rendering_material { + layer const& lay_; + projection proj0_; + projection proj1_; + box2d layer_ext2_; + std::vector active_styles_; + std::vector featureset_ptr_list_; + boost::ptr_vector rule_caches_; + + layer_rendering_material(layer const& lay, projection const& dest) : + lay_(lay), + proj0_(dest), + proj1_(lay.srs(),true) {} +}; + +typedef boost::shared_ptr layer_rendering_material_ptr; + + template feature_style_processor::feature_style_processor(Map const& m, double scale_factor) : m_(m) @@ -148,27 +168,54 @@ void feature_style_processor::apply(double scale_denom) scale_denom = mapnik::scale_denominator(m_.scale(),proj.is_geographic()); scale_denom *= p.scale_factor(); + // Asynchronous query supports: + // This is a two steps process, + // first we setup all queries at layer level + // in a second time, we fetch the results and + // do the actual rendering + + std::vector mat_list; + + // Define processing context map used by datasources + // implementing asynchronous queries + feature_style_context_map ctx_map; + BOOST_FOREACH ( layer const& lyr, m_.layers() ) { if (lyr.visible(scale_denom)) { std::set names; - apply_to_layer(lyr, - p, - proj, - m_.scale(), - scale_denom, - m_.width(), - m_.height(), - m_.get_current_extent(), - m_.buffer_size(), - names); + layer_rendering_material_ptr mat = boost::make_shared(lyr, proj); + prepare_layer(*mat, + ctx_map, + p, + proj, + m_.scale(), + scale_denom, + m_.width(), + m_.height(), + m_.get_current_extent(), + m_.buffer_size(), + names); + + // Store active material + if (!mat->active_styles_.empty()) + { + mat_list.push_back(mat); + } + } + } + + BOOST_FOREACH ( layer_rendering_material_ptr mat, mat_list ) + { + if (!mat->active_styles_.empty()) + { + render_material(*mat,p); } } p.end_map_processing(m_); - } template @@ -199,8 +246,12 @@ void feature_style_processor::apply(mapnik::layer const& lyr, p.end_map_processing(m_); } +/*! + * \brief render a layer given a projection and scale. + */ template -void feature_style_processor::apply_to_layer(layer const& lay, Processor & p, +void feature_style_processor::apply_to_layer(layer const& lay, + Processor & p, projection const& proj0, double scale, double scale_denom, @@ -210,6 +261,42 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces int buffer_size, std::set& names) { + feature_style_context_map ctx_map; + layer_rendering_material mat(lay, proj0); + + prepare_layer(mat, + ctx_map, + p, + proj0, + m_.scale(), + scale_denom, + m_.width(), + m_.height(), + m_.get_current_extent(), + m_.buffer_size(), + names); + + if (!mat.active_styles_.empty()) + { + render_material(mat,p); + } +} + +template +void feature_style_processor::prepare_layer(layer_rendering_material & mat, + feature_style_context_map & ctx_map, + Processor & p, + projection const& proj0, + double scale, + double scale_denom, + unsigned width, + unsigned height, + box2d const& extent, + int buffer_size, + std::set& names) +{ + layer const& lay = mat.lay_; + std::vector const& style_names = lay.styles(); unsigned int num_styles = style_names.size(); @@ -228,8 +315,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces return; } - projection proj1(lay.srs(),true); - proj_transform prj_trans(proj0,proj1); + processor_context_ptr current_ctx = ds->get_context(ctx_map); + proj_transform prj_trans(mat.proj0_,mat.proj1_); box2d query_ext = extent; // unbuffered box2d buffered_query_ext(query_ext); // buffered @@ -288,6 +375,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces early_return = true; } + std::vector & active_styles = mat.active_styles_; + if (early_return) { // check for styles needing compositing operations applied @@ -299,13 +388,13 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces { continue; } + if (style->comp_op() || style->image_filters().size() > 0) { if (style->active(scale_denom)) { - // trigger any needed compositing ops - p.start_style_processing(*style); - p.end_style_processing(*style); + // we'll have to handle compositing ops + active_styles.push_back(&(*style)); } } } @@ -319,7 +408,9 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces query_ext.clip(*maximum_extent); } - box2d layer_ext2 = lay.envelope(); + box2d & layer_ext2 = mat.layer_ext2_; + + layer_ext2 = lay.envelope(); if (fw_success) { if (prj_trans.forward(query_ext, PROJ_ENVELOPE_POINTS)) @@ -344,9 +435,8 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces height/qh); query q(layer_ext,res,scale_denom,extent); - std::vector active_styles; + boost::ptr_vector & rule_caches = mat.rule_caches_; attribute_collector collector(names); - boost::ptr_vector rule_caches; // iterate through all named styles collecting active styles and attribute names BOOST_FOREACH(std::string const& style_name, style_names) @@ -406,57 +496,90 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces { q.add_property_name(group_by); } + } - bool cache_features = lay.cache_features() && active_styles.size() > 1; + bool cache_features = lay.cache_features() && active_styles.size() > 1; + std::string group_by = lay.group_by(); - // Render incrementally when the column that we group by changes value. - if (!group_by.empty()) + std::vector & featureset_ptr_list = mat.featureset_ptr_list_; + if ( (group_by != "") || cache_features) + { + featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx)); + } + else + { + for(size_t i = 0 ; i < active_styles.size(); i++) { - featureset_ptr features = ds->features(q); - if (features) - { - boost::shared_ptr cache = boost::make_shared(); - feature_ptr feature, prev; - while ((feature = features->next())) - { - if (prev && prev->get(group_by) != feature->get(group_by)) - { - // We're at a value boundary, so render what we have - // up to this point. - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) - { - cache->prepare(); - render_style(p, style, rule_caches[i], cache, prj_trans); - i++; - } - cache->clear(); - } - cache->push(feature); - prev = feature; - } - - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) - { - cache->prepare(); - render_style(p, style, rule_caches[i], cache, prj_trans); - i++; - } - } + featureset_ptr_list.push_back(ds->features_with_context(q,current_ctx)); } - else if (cache_features) + } +} + + +template +void feature_style_processor::render_material(layer_rendering_material & mat, Processor & p ) +{ + std::vector & active_styles = mat.active_styles_; + std::vector & featureset_ptr_list = mat.featureset_ptr_list_; + if (featureset_ptr_list.empty()) + { + // The datasource wasn't querried because of early return + // but we have to apply compositing operations on styles + BOOST_FOREACH (feature_type_style const* style, active_styles) { - featureset_ptr features = ds->features(q); + p.start_style_processing(*style); + p.end_style_processing(*style); + } + + return; + } + + p.start_layer_processing(mat.lay_, mat.layer_ext2_); + + layer const& lay = mat.lay_; + + boost::ptr_vector & rule_caches = mat.rule_caches_; + + proj_transform prj_trans(mat.proj0_,mat.proj1_); + + bool cache_features = lay.cache_features() && active_styles.size() > 1; + + datasource_ptr ds = lay.datasource(); + std::string group_by = lay.group_by(); + + // Render incrementally when the column that we group by + // changes value. + if (group_by != "") + { + featureset_ptr features = *featureset_ptr_list.begin(); + if (features) { + // Cache all features into the memory_datasource before rendering. boost::shared_ptr cache = boost::make_shared(); - if (features) + feature_ptr feature, prev; + + while ((feature = features->next())) { - feature_ptr feature; - while ((feature = features->next())) + if (prev && prev->get(group_by) != feature->get(group_by)) { - cache->push(feature); + // We're at a value boundary, so render what we have + // up to this point. + int i = 0; + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + + cache->prepare(); + render_style(p, style, + rule_caches[i], + cache, + prj_trans); + i++; + } + cache->clear(); } + cache->push(feature); + prev = feature; } + int i = 0; BOOST_FOREACH (feature_type_style const* style, active_styles) { @@ -466,20 +589,48 @@ void feature_style_processor::apply_to_layer(layer const& lay, Proces } cache->clear(); } - // We only have a single style and no grouping. - else - { - int i = 0; - BOOST_FOREACH (feature_type_style const* style, active_styles) + } + else if (cache_features) + { + boost::shared_ptr cache = boost::make_shared(); + featureset_ptr features = *featureset_ptr_list.begin(); + if (features) { + // Cache all features into the memory_datasource before rendering. + feature_ptr feature; + while ((feature = features->next())) { - render_style(p, style, rule_caches[i], ds->features(q), prj_trans); - i++; + + cache->push(feature); } } + int i = 0; + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + cache->prepare(); + render_style(p, style, + rule_caches[i], + cache, prj_trans); + i++; + } + } + // We only have a single style and no grouping. + else + { + int i = 0; + std::vector::iterator featuresets = featureset_ptr_list.begin(); + BOOST_FOREACH (feature_type_style const* style, active_styles) + { + featureset_ptr features = *featuresets++; + render_style(p, style, + rule_caches[i], + features, + prj_trans); + i++; + } } - p.end_layer_processing(lay); -} + p.end_layer_processing(mat.lay_); +} template void feature_style_processor::render_style( diff --git a/plugins/input/postgis/asyncresultset.hpp b/plugins/input/postgis/asyncresultset.hpp new file mode 100644 index 000000000..aa217f629 --- /dev/null +++ b/plugins/input/postgis/asyncresultset.hpp @@ -0,0 +1,212 @@ +/***************************************************************************** + * + * This file is part of Mapnik (c++ mapping toolkit) + * + * Copyright (C) 2013 Artem Pavlenko + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + *****************************************************************************/ + +#ifndef POSTGIS_ASYNCRESULTSET_HPP +#define POSTGIS_ASYNCRESULTSET_HPP + +#include +#include + +#include "connection_manager.hpp" +#include "resultset.hpp" +#include + +class postgis_processor_context; +typedef boost::shared_ptr postgis_processor_context_ptr; + +class AsyncResultSet : public IResultSet, private boost::noncopyable +{ +public: + AsyncResultSet(postgis_processor_context_ptr const& ctx, + boost::shared_ptr< Pool > const& pool, + boost::shared_ptr const &conn, const std::string& sql ) + : ctx_(ctx), + pool_(pool), + conn_(conn), + sql_(sql), + is_closed_(false) + { + } + + virtual bool use_connection() { return true; } + + virtual ~AsyncResultSet() + { + close(); + } + + virtual void close() + { + if (!is_closed_) + { + rs_.reset(); + is_closed_ = true; + if (conn_) + { + conn_.reset(); + } + } + } + + virtual int getNumFields() const + { + return rs_->getNumFields(); + } + + virtual bool next() + { + bool next_res = false; + if (!rs_) + { + // Ensure connection is valid + if (conn_ && conn_->isOK()) + { + rs_ = conn_->getAsyncResult(); + } + else + { + throw mapnik::datasource_exception("invalid connection in AsyncResultSet::next"); + } + } + + next_res = rs_->next(); + if (!next_res) + { + rs_.reset(); + rs_ = conn_->getNextAsyncResult(); + if (rs_ && rs_->next()) + { + return true; + } + close(); + prepare_next(); + } + return next_res; + } + + virtual const char* getFieldName(int index) const + { + return rs_->getFieldName(index); + } + + virtual int getFieldLength(int index) const + { + return rs_->getFieldLength(index); + } + + virtual int getFieldLength(const char* name) const + { + return rs_->getFieldLength(name); + } + + virtual int getTypeOID(int index) const + { + return rs_->getTypeOID(index); + } + + virtual int getTypeOID(const char* name) const + { + return rs_->getTypeOID(name); + } + + virtual bool isNull(int index) const + { + return rs_->isNull(index); + } + + virtual const char* getValue(int index) const + { + return rs_->getValue(index); + } + + virtual const char* getValue(const char* name) const + { + return rs_->getValue(name); + } + +private: + postgis_processor_context_ptr ctx_; + boost::shared_ptr< Pool > pool_; + boost::shared_ptr conn_; + std::string sql_; + boost::shared_ptr rs_; + bool is_closed_; + + void prepare() + { + conn_ = pool_->borrowObject(); + if (conn_ && conn_->isOK()) + { + conn_->executeAsyncQuery(sql_, 1); + } + else + { + throw mapnik::datasource_exception("Postgis Plugin: bad connection"); + } + } + + void prepare_next(); + +}; + + +class postgis_processor_context : public mapnik::IProcessorContext +{ +public: + postgis_processor_context():num_async_requests_(0) {} + ~postgis_processor_context() {} + + void add_request(boost::shared_ptr const& req) + { + q_.push(req); + } + + boost::shared_ptr pop_next_request() + { + boost::shared_ptr r; + if (!q_.empty()) + { + r = q_.front(); + q_.pop(); + } + return r; + } + + int num_async_requests_; + +private: + typedef std::queue > asynch_queue; + asynch_queue q_; + +}; + +inline void AsyncResultSet::prepare_next() +{ + // ensure cnx pool has unused cnx + boost::shared_ptr next = ctx_->pop_next_request(); + if (next) + { + next->prepare(); + } +} + +#endif // POSTGIS_ASYNCRESULTSET_HPP diff --git a/plugins/input/postgis/connection.hpp b/plugins/input/postgis/connection.hpp index d8726aef0..720f98ef5 100644 --- a/plugins/input/postgis/connection.hpp +++ b/plugins/input/postgis/connection.hpp @@ -108,7 +108,7 @@ public: if (! result || (PQresultStatus(result) != PGRES_TUPLES_OK)) { std::string err_msg = status(); - err_msg += "\nFull sql was: '"; + err_msg += "\nin executeQuery Full sql was: '"; err_msg += sql; err_msg += "'\n"; if (result) @@ -136,6 +136,77 @@ public: return status; } + bool executeAsyncQuery(std::string const& sql, int type = 0) + { + int result = 0; + if (type == 1) + { + result = PQsendQueryParams(conn_,sql.c_str(), 0, 0, 0, 0, 0, 1); + } + else + { + result = PQsendQuery(conn_, sql.c_str()); + } + + if (result != 1) + { + + std::string err_msg = status(); + err_msg += "\nin executeAsyncQuery Full sql was: '"; + err_msg += sql; + err_msg += "'\n"; + + clearAsyncResult(PQgetResult(conn_)); + close(); + + throw mapnik::datasource_exception(err_msg); + } + + return result; + } + + + boost::shared_ptr getNextAsyncResult() + { + PGresult *result = PQgetResult(conn_); + if( result && (PQresultStatus(result) != PGRES_TUPLES_OK)) + { + std::string err_msg = status(); + err_msg += "\nin getNextAsyncResult"; + + clearAsyncResult(result); + // We need to be guarded against losing the connection + // (i.e db restart), we invalidate the full connection + close(); + + throw mapnik::datasource_exception(err_msg); + } + + return boost::make_shared(result); + } + + boost::shared_ptr getAsyncResult() + { + PGresult *result = PQgetResult(conn_); + + if ( !result || (PQresultStatus(result) != PGRES_TUPLES_OK)) + { + + std::string err_msg = status(); + err_msg += "\nin getAsyncResult Full sql was: '"; + + clearAsyncResult(result); + // We need to be guarded against losing the connection + // (i.e db restart), we invalidate the full connection + close(); + + throw mapnik::datasource_exception(err_msg); + + } + + return boost::make_shared(result); + } + std::string client_encoding() const { return PQparameterStatus(conn_, "client_encoding"); @@ -169,6 +240,16 @@ private: PGconn *conn_; int cursorId; bool closed_; + + void clearAsyncResult(PGresult *result) const + { + // Clear all pending results + while(result) + { + PQclear(result); + result = PQgetResult(conn_); + } + } }; #endif //CONNECTION_HPP diff --git a/plugins/input/postgis/connection_manager.hpp b/plugins/input/postgis/connection_manager.hpp index 4fbcce2d6..d457da393 100644 --- a/plugins/input/postgis/connection_manager.hpp +++ b/plugins/input/postgis/connection_manager.hpp @@ -101,8 +101,12 @@ private: class ConnectionManager : public singleton { - friend class CreateStatic; +public: typedef Pool PoolType; + +private: + friend class CreateStatic; + typedef std::map > ContType; typedef boost::shared_ptr HolderType; ContType pools_; diff --git a/plugins/input/postgis/cursorresultset.hpp b/plugins/input/postgis/cursorresultset.hpp index 611e394f8..59b4e2dfc 100644 --- a/plugins/input/postgis/cursorresultset.hpp +++ b/plugins/input/postgis/cursorresultset.hpp @@ -28,56 +28,23 @@ #include "connection.hpp" #include "resultset.hpp" -class CursorResultSet : public IResultSet +class CursorResultSet : public IResultSet, private mapnik::noncopyable { public: CursorResultSet(boost::shared_ptr const &conn, std::string cursorName, int fetch_count) : conn_(conn), cursorName_(cursorName), fetch_size_(fetch_count), - is_closed_(false), - refCount_(new int(1)) + is_closed_(false) { getNextResultSet(); } - CursorResultSet(const CursorResultSet& rhs) - : conn_(rhs.conn_), - cursorName_(rhs.cursorName_), - rs_(rhs.rs_), - fetch_size_(rhs.fetch_size_), - is_closed_(rhs.is_closed_), - refCount_(rhs.refCount_) - { - (*refCount_)++; - } - virtual ~CursorResultSet() { - if (--(*refCount_)==0) - { - close(); - delete refCount_,refCount_=0; - } + close(); } - CursorResultSet& operator=(const CursorResultSet& rhs) - { - if (this==&rhs) return *this; - if (--(refCount_)==0) - { - close(); - delete refCount_,refCount_=0; - } - conn_=rhs.conn_; - cursorName_=rhs.cursorName_; - rs_=rhs.rs_; - refCount_=rhs.refCount_; - fetch_size_=rhs.fetch_size_; - is_closed_ = false; - (*refCount_)++; - return *this; - } virtual void close() { @@ -92,6 +59,7 @@ public: conn_->execute(s.str()); is_closed_ = true; + conn_.reset(); } } @@ -105,6 +73,7 @@ public: if (rs_->next()) { return true; } else if (rs_->size() == 0) { + close(); return false; } else { getNextResultSet(); @@ -171,7 +140,8 @@ private: boost::shared_ptr rs_; int fetch_size_; bool is_closed_; - int *refCount_; + + }; #endif // POSTGIS_CURSORRESULTSET_HPP diff --git a/plugins/input/postgis/postgis_datasource.cpp b/plugins/input/postgis/postgis_datasource.cpp index 4b3be44ce..f5bb87395 100644 --- a/plugins/input/postgis/postgis_datasource.cpp +++ b/plugins/input/postgis/postgis_datasource.cpp @@ -78,8 +78,11 @@ postgis_datasource::postgis_datasource(parameters const& params) scale_denom_token_("!scale_denominator!"), pixel_width_token_("!pixel_width!"), pixel_height_token_("!pixel_height!"), + pool_max_size_(*params_.get("max_size", 5)), persist_connection_(*params.get("persist_connection", true)), extent_from_subquery_(*params.get("extent_from_subquery", false)), + max_async_connections_(*params_.get("max_async_connection", 1)), + asynchronous_request_(false), // params below are for testing purposes only (will likely be removed at any time) intersect_min_scale_(*params.get("intersect_min_scale", 0)), intersect_max_scale_(*params.get("intersect_max_scale", 0)) @@ -98,16 +101,29 @@ postgis_datasource::postgis_datasource(parameters const& params) extent_initialized_ = extent_.from_string(*ext); } + // NOTE: In multithread environment, pool_max_size_ should be + // max_async_connections_ * num_threads + if(max_async_connections_ > 1) + { + if(max_async_connections_ > pool_max_size_) + { + std::ostringstream err; + err << "PostGIS Plugin: Error: 'max_async_connections_ must be > pool_max_size_\n"; + throw mapnik::datasource_exception(err.str()); + } + + asynchronous_request_ = true; + } + boost::optional initial_size = params.get("initial_size", 1); - boost::optional max_size = params.get("max_size", 10); boost::optional autodetect_key_field = params.get("autodetect_key_field", false); boost::optional estimate_extent = params.get("estimate_extent", false); estimate_extent_ = estimate_extent && *estimate_extent; boost::optional simplify_opt = params.get("simplify_geometries", false); simplify_geometries_ = simplify_opt && *simplify_opt; - ConnectionManager::instance().registerPool(creator_, *initial_size, *max_size); - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + ConnectionManager::instance().registerPool(creator_, *initial_size, pool_max_size_); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -413,6 +429,10 @@ postgis_datasource::postgis_datasource(parameters const& params) rs->close(); } + + // Close explicitly the connection so we can 'fork()' without sharing open connections + conn->close(); + } } @@ -420,7 +440,7 @@ postgis_datasource::~postgis_datasource() { if (! persist_connection_) { - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -557,156 +577,220 @@ std::string postgis_datasource::populate_tokens(std::string const& sql, double s } -boost::shared_ptr postgis_datasource::get_resultset(boost::shared_ptr const &conn, std::string const& sql) const +boost::shared_ptr postgis_datasource::get_resultset(boost::shared_ptr &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx) const { - if (cursor_fetch_size_ > 0) + + if (!ctx) { - // cursor - std::ostringstream csql; - std::string cursor_name = conn->new_cursor_name(); - - csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY"; - - if (! conn->execute(csql.str())) + // ! asynchronous_request_ + if (cursor_fetch_size_ > 0) { - // TODO - better error - throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." ); + // cursor + std::ostringstream csql; + std::string cursor_name = conn->new_cursor_name(); + + csql << "DECLARE " << cursor_name << " BINARY INSENSITIVE NO SCROLL CURSOR WITH HOLD FOR " << sql << " FOR READ ONLY"; + + if (! conn->execute(csql.str())) + { + // TODO - better error + throw mapnik::datasource_exception("Postgis Plugin: error creating cursor for data select." ); + } + + return boost::make_shared(conn, cursor_name, cursor_fetch_size_); + + } + else + { + // no cursor + return conn->executeQuery(sql, 1); } - - return boost::make_shared(conn, cursor_name, cursor_fetch_size_); - } else - { - // no cursor - return conn->executeQuery(sql, 1); + { // asynchronous requests + + boost::shared_ptr pgis_ctxt = boost::static_pointer_cast(ctx); + if (conn) + { + // lauch async req & create asyncresult with conn + conn->executeAsyncQuery(sql, 1); + return boost::make_shared(pgis_ctxt, pool, conn, sql); + } + else + { + // create asyncresult with null connection + boost::shared_ptr res = boost::make_shared(pgis_ctxt, pool, conn, sql); + pgis_ctxt->add_request(res); + return res; + } } } +processor_context_ptr postgis_datasource::get_context(feature_style_context_map & ctx) const +{ + std::string ds_name(name()); + + if (! asynchronous_request_) + return processor_context_ptr(); + + if (!ctx.count(ds_name)) + { + processor_context_ptr pgis_ctx = boost::make_shared(); + ctx[ds_name] = pgis_ctx; + return ctx[ds_name]; + } + else + return ctx[ds_name]; +} + featureset_ptr postgis_datasource::features(const query& q) const { + // if the driver is in asynchronous mode, return the appropriate fetaures + if ( asynchronous_request_ ) + { + return features_with_context(q,boost::make_shared()); + } + else + { + return features_with_context(q); + } +} + +featureset_ptr postgis_datasource::features_with_context(const query& q,processor_context_ptr proc_ctx) const +{ + #ifdef MAPNIK_STATS - mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features"); + mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_with_context"); #endif + box2d const& box = q.get_bbox(); double scale_denom = q.scale_denominator(); - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); + if (pool) { - shared_ptr conn = pool->borrowObject(); - if (conn && conn->isOK()) + shared_ptr conn; + + if ( asynchronous_request_ ) { - if (geometryColumn_.empty()) + // limit use to num_async_request_ => if reached don't borrow the last connexion object + boost::shared_ptr pgis_ctxt = boost::static_pointer_cast(proc_ctx); + if ( pgis_ctxt->num_async_requests_ < max_async_connections_ ) { - std::ostringstream s_error; - s_error << "PostGIS: geometry name lookup failed for table '"; - - if (! schema_.empty()) - { - s_error << schema_ << "."; - } - s_error << geometry_table_ - << "'. Please manually provide the 'geometry_field' parameter or add an entry " - << "in the geometry_columns for '"; - - if (! schema_.empty()) - { - s_error << schema_ << "."; - } - s_error << geometry_table_ << "'."; - - throw mapnik::datasource_exception(s_error.str()); + conn = pool->borrowObject(); + pgis_ctxt->num_async_requests_++; } - - std::ostringstream s; - - const double px_gw = 1.0 / boost::get<0>(q.resolution()); - const double px_gh = 1.0 / boost::get<1>(q.resolution()); - - s << "SELECT ST_AsBinary("; - - if (simplify_geometries_) { - s << "ST_Simplify("; - } - - s << "\"" << geometryColumn_ << "\""; - - if (simplify_geometries_) { - // 1/20 of pixel seems to be a good compromise to avoid - // drop of collapsed polygons. - // See https://github.com/mapnik/mapnik/issues/1639 - const double tolerance = std::min(px_gw, px_gh) / 20.0; - s << ", " << tolerance << ")"; - } - - s << ") AS geom"; - - mapnik::context_ptr ctx = boost::make_shared(); - std::set const& props = q.property_names(); - std::set::const_iterator pos = props.begin(); - std::set::const_iterator end = props.end(); - - if (! key_field_.empty()) + } + else + { + // Always get a connection in synchronous mode + conn = pool->borrowObject(); + if(!conn ) { - mapnik::sql_utils::quote_attr(s, key_field_); - ctx->push(key_field_); - - for (; pos != end; ++pos) - { - if (*pos != key_field_) - { - mapnik::sql_utils::quote_attr(s, *pos); - ctx->push(*pos); - } - } + throw mapnik::datasource_exception("Postgis Plugin: Null connection"); } - else + } + + + if (geometryColumn_.empty()) + { + std::ostringstream s_error; + s_error << "PostGIS: geometry name lookup failed for table '"; + + if (! schema_.empty()) { - for (; pos != end; ++pos) + s_error << schema_ << "."; + } + s_error << geometry_table_ + << "'. Please manually provide the 'geometry_field' parameter or add an entry " + << "in the geometry_columns for '"; + + if (! schema_.empty()) + { + s_error << schema_ << "."; + } + s_error << geometry_table_ << "'."; + + throw mapnik::datasource_exception(s_error.str()); + } + + std::ostringstream s; + + const double px_gw = 1.0 / boost::get<0>(q.resolution()); + const double px_gh = 1.0 / boost::get<1>(q.resolution()); + + s << "SELECT ST_AsBinary("; + + if (simplify_geometries_) { + s << "ST_Simplify("; + } + + s << "\"" << geometryColumn_ << "\""; + + if (simplify_geometries_) { + // 1/20 of pixel seems to be a good compromise to avoid + // drop of collapsed polygons. + // See https://github.com/mapnik/mapnik/issues/1639 + const double tolerance = std::min(px_gw, px_gh) / 2.0; + s << ", " << tolerance << ")"; + } + + s << ") AS geom"; + + mapnik::context_ptr ctx = boost::make_shared(); + std::set const& props = q.property_names(); + std::set::const_iterator pos = props.begin(); + std::set::const_iterator end = props.end(); + + if (! key_field_.empty()) + { + mapnik::sql_utils::quote_attr(s, key_field_); + ctx->push(key_field_); + + for (; pos != end; ++pos) + { + if (*pos != key_field_) { mapnik::sql_utils::quote_attr(s, *pos); ctx->push(*pos); } } - - std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh); - - s << " FROM " << table_with_bbox; - - if (row_limit_ > 0) - { - s << " LIMIT " << row_limit_; - } - - boost::shared_ptr rs = get_resultset(conn, s.str()); - return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); } else { - std::string err_msg = "Postgis Plugin:"; - if (conn) + for (; pos != end; ++pos) { - err_msg += conn->status(); + mapnik::sql_utils::quote_attr(s, *pos); + ctx->push(*pos); } - else - { - err_msg += " Null connection"; - } - throw mapnik::datasource_exception(err_msg); } + + std::string table_with_bbox = populate_tokens(table_, scale_denom, box, px_gw, px_gh); + + s << " FROM " << table_with_bbox; + + if (row_limit_ > 0) + { + s << " LIMIT " << row_limit_; + } + + boost::shared_ptr rs = get_resultset(conn, s.str(), pool, proc_ctx); + return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); + } return featureset_ptr(); } + featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double tol) const { #ifdef MAPNIK_STATS mapnik::progress_timer __stats__(std::clog, "postgis_datasource::features_at_point"); #endif - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -775,7 +859,7 @@ featureset_ptr postgis_datasource::features_at_point(coord2d const& pt, double t s << " LIMIT " << row_limit_; } - boost::shared_ptr rs = get_resultset(conn, s.str()); + boost::shared_ptr rs = get_resultset(conn, s.str(), pool); return boost::make_shared(rs, ctx, desc_.get_encoding(), !key_field_.empty()); } } @@ -790,7 +874,7 @@ box2d postgis_datasource::envelope() const return extent_; } - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); @@ -881,7 +965,7 @@ boost::optional postgis_datasource::get_geometry { boost::optional result; - shared_ptr< Pool > pool = ConnectionManager::instance().getPool(creator_.id()); + CnxPool_ptr pool = ConnectionManager::instance().getPool(creator_.id()); if (pool) { shared_ptr conn = pool->borrowObject(); diff --git a/plugins/input/postgis/postgis_datasource.hpp b/plugins/input/postgis/postgis_datasource.hpp index d0f5a2beb..4e991d455 100644 --- a/plugins/input/postgis/postgis_datasource.hpp +++ b/plugins/input/postgis/postgis_datasource.hpp @@ -46,9 +46,12 @@ #include "connection_manager.hpp" #include "resultset.hpp" #include "cursorresultset.hpp" +#include "asyncresultset.hpp" using mapnik::transcoder; using mapnik::datasource; +using mapnik::feature_style_context_map; +using mapnik::processor_context_ptr; using mapnik::box2d; using mapnik::layer_descriptor; using mapnik::featureset_ptr; @@ -57,6 +60,8 @@ using mapnik::query; using mapnik::parameters; using mapnik::coord2d; +typedef boost::shared_ptr< ConnectionManager::PoolType> CnxPool_ptr; + class postgis_datasource : public datasource { public: @@ -64,6 +69,8 @@ public: ~postgis_datasource(); mapnik::datasource::datasource_t type() const; static const char * name(); + processor_context_ptr get_context(feature_style_context_map &) const; + featureset_ptr features_with_context(const query& q, processor_context_ptr ctx= processor_context_ptr()) const; featureset_ptr features(const query& q) const; featureset_ptr features_at_point(coord2d const& pt, double tol = 0) const; mapnik::box2d envelope() const; @@ -74,8 +81,7 @@ private: std::string sql_bbox(box2d const& env) const; std::string populate_tokens(std::string const& sql, double scale_denom, box2d const& env, double pixel_width, double pixel_height) const; std::string populate_tokens(std::string const& sql) const; - boost::shared_ptr get_resultset(boost::shared_ptr const &conn, std::string const& sql) const; - + boost::shared_ptr get_resultset(boost::shared_ptr &conn, std::string const& sql, CnxPool_ptr const& pool, processor_context_ptr ctx= processor_context_ptr()) const; static const std::string GEOMETRY_COLUMNS; static const std::string SPATIAL_REF_SYS; static const double FMAX; @@ -102,12 +108,16 @@ private: const std::string scale_denom_token_; const std::string pixel_width_token_; const std::string pixel_height_token_; + int pool_max_size_; bool persist_connection_; bool extent_from_subquery_; bool estimate_extent_; + int max_async_connections_; + bool asynchronous_request_; // params below are for testing purposes only (will likely be removed at any time) int intersect_min_scale_; int intersect_max_scale_; + }; #endif // POSTGIS_DATASOURCE_HPP diff --git a/plugins/input/postgis/resultset.hpp b/plugins/input/postgis/resultset.hpp index c8734f538..600fc96dd 100644 --- a/plugins/input/postgis/resultset.hpp +++ b/plugins/input/postgis/resultset.hpp @@ -45,49 +45,16 @@ public: virtual const char* getValue(const char* name) const = 0; }; -class ResultSet : public IResultSet +class ResultSet : public IResultSet, private mapnik::noncopyable { public: ResultSet(PGresult *res) : res_(res), - pos_(-1), - refCount_(new int(1)) + pos_(-1) { numTuples_ = PQntuples(res_); } - ResultSet(const ResultSet& rhs) - : res_(rhs.res_), - pos_(rhs.pos_), - numTuples_(rhs.numTuples_), - refCount_(rhs.refCount_) - { - (*refCount_)++; - } - - ResultSet& operator=(const ResultSet& rhs) - { - if (this == &rhs) - { - return *this; - } - - if (--(refCount_) == 0) - { - close(); - - delete refCount_; - refCount_ = 0; - } - - res_ = rhs.res_; - pos_ = rhs.pos_; - numTuples_ = rhs.numTuples_; - refCount_ = rhs.refCount_; - (*refCount_)++; - return *this; - } - virtual void close() { PQclear(res_); @@ -96,13 +63,7 @@ public: virtual ~ResultSet() { - if (--(*refCount_) == 0) - { - PQclear(res_); - - delete refCount_; - refCount_ = 0; - } + PQclear(res_); } virtual int getNumFields() const @@ -184,7 +145,6 @@ private: PGresult* res_; int pos_; int numTuples_; - int *refCount_; }; #endif // POSTGIS_RESULTSET_HPP diff --git a/tests/python_tests/postgis_async_test.py b/tests/python_tests/postgis_async_test.py new file mode 100644 index 000000000..6721b97fe --- /dev/null +++ b/tests/python_tests/postgis_async_test.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python + +from nose.tools import * +import sys +import time +from utilities import execution_path +from subprocess import Popen, PIPE +import os, mapnik + +MAPNIK_TEST_DBNAME = 'mapnik-tmp-postgis-async-test-db' +POSTGIS_TEMPLATE_DBNAME = 'template_postgis' + +def setup(): + # All of the paths used are relative, if we run the tests + # from another directory we need to chdir() + os.chdir(execution_path('.')) + +def call(cmd,silent=False): + stdin, stderr = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE).communicate() + if not stderr: + return stdin.strip() + elif not silent and not 'NOTICE' in stderr: + raise RuntimeError(stderr.strip()) + +def psql_can_connect(): + """Test ability to connect to a postgis template db with no options. + + Basically, to run these tests your user must have full read + access over unix sockets without supplying a password. This + keeps these tests simple and focused on postgis not on postgres + auth issues. + """ + try: + call('psql %s -c "select postgis_version()"' % POSTGIS_TEMPLATE_DBNAME) + return True + except RuntimeError, e: + print 'Notice: skipping postgis tests (connection)' + return False + +def createdb_and_dropdb_on_path(): + """Test for presence of dropdb/createdb on user path. + + We require these programs to setup and teardown the testing db. + """ + try: + call('createdb --help') + call('dropdb --help') + return True + except RuntimeError, e: + print 'Notice: skipping postgis tests (createdb/dropdb)' + return False + +insert_table_1 = """ +CREATE TABLE test1(gid serial PRIMARY KEY, label varchar(40), geom geometry); +INSERT INTO test1(label,geom) values ('label_1',GeomFromEWKT('SRID=4326;POINT(0 0)')); +INSERT INTO test1(label,geom) values ('label_2',GeomFromEWKT('SRID=4326;POINT(-2 2)')); +INSERT INTO test1(label,geom) values ('label_3',GeomFromEWKT('SRID=4326;MULTIPOINT(2 1,1 2)')); +INSERT INTO test1(label,geom) values ('label_4',GeomFromEWKT('SRID=4326;LINESTRING(0 0,1 1,1 2)')); +INSERT INTO test1(label,geom) values ('label_5',GeomFromEWKT('SRID=4326;MULTILINESTRING((1 0,0 1,3 2),(3 2,5 4))')); +INSERT INTO test1(label,geom) values ('label_6',GeomFromEWKT('SRID=4326;POLYGON((0 0,4 0,4 4,0 4,0 0),(1 1, 2 1, 2 2, 1 2,1 1))')); +INSERT INTO test1(label,geom) values ('label_7',GeomFromEWKT('SRID=4326;MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1),(1 1,2 1,2 2,1 2,1 1)), ((-1 -1,-1 -2,-2 -2,-2 -1,-1 -1)))')); +INSERT INTO test1(label,geom) values ('label_8',GeomFromEWKT('SRID=4326;GEOMETRYCOLLECTION(POLYGON((1 1, 2 1, 2 2, 1 2,1 1)),POINT(2 3),LINESTRING(2 3,3 4))')); +""" + +insert_table_2 = """ +CREATE TABLE test2(gid serial PRIMARY KEY, label varchar(40), geom geometry); +INSERT INTO test2(label,geom) values ('label_1',GeomFromEWKT('SRID=4326;POINT(0 0)')); +INSERT INTO test2(label,geom) values ('label_2',GeomFromEWKT('SRID=4326;POINT(-2 2)')); +INSERT INTO test2(label,geom) values ('label_3',GeomFromEWKT('SRID=4326;MULTIPOINT(2 1,1 2)')); +INSERT INTO test2(label,geom) values ('label_4',GeomFromEWKT('SRID=4326;LINESTRING(0 0,1 1,1 2)')); +INSERT INTO test2(label,geom) values ('label_5',GeomFromEWKT('SRID=4326;MULTILINESTRING((1 0,0 1,3 2),(3 2,5 4))')); +INSERT INTO test2(label,geom) values ('label_6',GeomFromEWKT('SRID=4326;POLYGON((0 0,4 0,4 4,0 4,0 0),(1 1, 2 1, 2 2, 1 2,1 1))')); +INSERT INTO test2(label,geom) values ('label_7',GeomFromEWKT('SRID=4326;MULTIPOLYGON(((1 1,3 1,3 3,1 3,1 1),(1 1,2 1,2 2,1 2,1 1)), ((-1 -1,-1 -2,-2 -2,-2 -1,-1 -1)))')); +INSERT INTO test2(label,geom) values ('label_8',GeomFromEWKT('SRID=4326;GEOMETRYCOLLECTION(POLYGON((1 1, 2 1, 2 2, 1 2,1 1)),POINT(2 3),LINESTRING(2 3,3 4))')); +""" + +def postgis_setup(): + call('dropdb %s' % MAPNIK_TEST_DBNAME,silent=True) + call('createdb -T %s %s' % (POSTGIS_TEMPLATE_DBNAME,MAPNIK_TEST_DBNAME),silent=False) + call('''psql -q %s -c "%s"''' % (MAPNIK_TEST_DBNAME,insert_table_1),silent=False) + call('''psql -q %s -c "%s"''' % (MAPNIK_TEST_DBNAME,insert_table_2),silent=False) + +if 'postgis' in mapnik.DatasourceCache.plugin_names() \ + and createdb_and_dropdb_on_path() \ + and psql_can_connect(): + + # initialize test database + postgis_setup() + + def test_psql_error_should_not_break_connection_pool(): + # Bad request, will trig an error when returning result + ds_bad = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table="""(SELECT geom as geom,label::int from public.test1) as failure_table""", + max_async_connection=5,geometry_field='geom',srid=4326,trace=False) + + # Good request + ds_good = mapnik.PostGIS(dbname=MAPNIK_TEST_DBNAME,table="test1", + max_async_connection=5,geometry_field='geom',srid=4326,trace=False) + + # This will/should trig a PSQL error + failed = False + try: + fs = ds_bad.featureset() + for feature in fs: + pass + except RuntimeError: + failed = True + + assert_true(failed) + + # Should be ok + fs = ds_good.featureset() + for feature in fs: + pass + + + +if __name__ == "__main__": + setup() + run_all(eval(x) for x in dir() if x.startswith("test_"))