230 lines
5.5 KiB
C++
230 lines
5.5 KiB
C++
/*****************************************************************************
|
|
*
|
|
* This file is part of Mapnik (c++ mapping toolkit)
|
|
*
|
|
* Copyright (C) 2015 Artem Pavlenko
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public
|
|
* License along with this library; if not, write to the Free Software
|
|
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*
|
|
*****************************************************************************/
|
|
|
|
#ifndef POSTGIS_ASYNCRESULTSET_HPP
|
|
#define POSTGIS_ASYNCRESULTSET_HPP
|
|
|
|
#include <mapnik/debug.hpp>
|
|
#include <mapnik/datasource.hpp>
|
|
|
|
#include "connection_manager.hpp"
|
|
#include "resultset.hpp"
|
|
#include <queue>
|
|
#include <memory>
|
|
|
|
class postgis_processor_context;
|
|
using postgis_processor_context_ptr = std::shared_ptr<postgis_processor_context>;
|
|
|
|
class AsyncResultSet : public IResultSet, private mapnik::util::noncopyable
|
|
{
|
|
public:
|
|
AsyncResultSet(postgis_processor_context_ptr const& ctx,
|
|
std::shared_ptr< Pool<Connection,ConnectionCreator> > const& pool,
|
|
std::shared_ptr<Connection> const& conn, std::string const& sql )
|
|
: ctx_(ctx),
|
|
pool_(pool),
|
|
conn_(conn),
|
|
sql_(sql),
|
|
is_closed_(false)
|
|
{
|
|
}
|
|
|
|
virtual bool use_connection() { return true; }
|
|
|
|
virtual ~AsyncResultSet()
|
|
{
|
|
close();
|
|
}
|
|
|
|
|
|
void abort()
|
|
{
|
|
if(conn_ && conn_->isPending() )
|
|
{
|
|
MAPNIK_LOG_DEBUG(postgis) << "AsyncResultSet: aborting pending connection - " << conn_.get();
|
|
// there is no easy way to abort a pending connection, so we close it : this will ensure that
|
|
// the connection will be recycled in the pool
|
|
conn_->close();
|
|
}
|
|
}
|
|
|
|
virtual void close()
|
|
{
|
|
if (!is_closed_)
|
|
{
|
|
rs_.reset();
|
|
is_closed_ = true;
|
|
if (conn_)
|
|
{
|
|
if(conn_->isPending())
|
|
{
|
|
abort();
|
|
}
|
|
conn_.reset();
|
|
}
|
|
}
|
|
}
|
|
|
|
virtual int getNumFields() const
|
|
{
|
|
return rs_->getNumFields();
|
|
}
|
|
|
|
virtual bool next()
|
|
{
|
|
bool next_res = false;
|
|
if (!rs_)
|
|
{
|
|
// Ensure connection is valid
|
|
if (conn_ && conn_->isOK())
|
|
{
|
|
rs_ = conn_->getAsyncResult();
|
|
}
|
|
else
|
|
{
|
|
throw mapnik::datasource_exception("invalid connection in AsyncResultSet::next");
|
|
}
|
|
}
|
|
|
|
next_res = rs_->next();
|
|
if (!next_res)
|
|
{
|
|
rs_.reset();
|
|
rs_ = conn_->getNextAsyncResult();
|
|
if (rs_ && rs_->next())
|
|
{
|
|
return true;
|
|
}
|
|
close();
|
|
prepare_next();
|
|
}
|
|
return next_res;
|
|
}
|
|
|
|
virtual const char* getFieldName(int index) const
|
|
{
|
|
return rs_->getFieldName(index);
|
|
}
|
|
|
|
virtual int getFieldLength(int index) const
|
|
{
|
|
return rs_->getFieldLength(index);
|
|
}
|
|
|
|
virtual int getFieldLength(const char* name) const
|
|
{
|
|
return rs_->getFieldLength(name);
|
|
}
|
|
|
|
virtual int getTypeOID(int index) const
|
|
{
|
|
return rs_->getTypeOID(index);
|
|
}
|
|
|
|
virtual int getTypeOID(const char* name) const
|
|
{
|
|
return rs_->getTypeOID(name);
|
|
}
|
|
|
|
virtual bool isNull(int index) const
|
|
{
|
|
return rs_->isNull(index);
|
|
}
|
|
|
|
virtual const char* getValue(int index) const
|
|
{
|
|
return rs_->getValue(index);
|
|
}
|
|
|
|
virtual const char* getValue(const char* name) const
|
|
{
|
|
return rs_->getValue(name);
|
|
}
|
|
|
|
private:
|
|
postgis_processor_context_ptr ctx_;
|
|
std::shared_ptr< Pool<Connection,ConnectionCreator> > pool_;
|
|
std::shared_ptr<Connection> conn_;
|
|
std::string sql_;
|
|
std::shared_ptr<ResultSet> rs_;
|
|
bool is_closed_;
|
|
|
|
void prepare()
|
|
{
|
|
conn_ = pool_->borrowObject();
|
|
if (conn_ && conn_->isOK())
|
|
{
|
|
conn_->executeAsyncQuery(sql_, 1);
|
|
}
|
|
else
|
|
{
|
|
throw mapnik::datasource_exception("Postgis Plugin: bad connection");
|
|
}
|
|
}
|
|
|
|
void prepare_next();
|
|
|
|
};
|
|
|
|
|
|
class postgis_processor_context : public mapnik::IProcessorContext
|
|
{
|
|
public:
|
|
postgis_processor_context()
|
|
: num_async_requests_(0) {}
|
|
~postgis_processor_context() {}
|
|
|
|
void add_request(std::shared_ptr<AsyncResultSet> const& req)
|
|
{
|
|
q_.push(req);
|
|
}
|
|
|
|
std::shared_ptr<AsyncResultSet> pop_next_request()
|
|
{
|
|
std::shared_ptr<AsyncResultSet> r;
|
|
if (!q_.empty())
|
|
{
|
|
r = q_.front();
|
|
q_.pop();
|
|
}
|
|
return r;
|
|
}
|
|
|
|
int num_async_requests_;
|
|
|
|
private:
|
|
using async_queue = std::queue<std::shared_ptr<AsyncResultSet> >;
|
|
async_queue q_;
|
|
|
|
};
|
|
|
|
inline void AsyncResultSet::prepare_next()
|
|
{
|
|
// ensure cnx pool has unused cnx
|
|
std::shared_ptr<AsyncResultSet> next = ctx_->pop_next_request();
|
|
if (next)
|
|
{
|
|
next->prepare();
|
|
}
|
|
}
|
|
|
|
#endif // POSTGIS_ASYNCRESULTSET_HPP
|