mapnik/plugins/input/postgis/asyncresultset.hpp
2021-01-05 11:59:41 +00:00

230 lines
5.5 KiB
C++

/*****************************************************************************
*
* This file is part of Mapnik (c++ mapping toolkit)
*
* Copyright (C) 2021 Artem Pavlenko
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
*****************************************************************************/
#ifndef POSTGIS_ASYNCRESULTSET_HPP
#define POSTGIS_ASYNCRESULTSET_HPP
#include <mapnik/debug.hpp>
#include <mapnik/datasource.hpp>
#include "connection_manager.hpp"
#include "resultset.hpp"
#include <queue>
#include <memory>
class postgis_processor_context;
using postgis_processor_context_ptr = std::shared_ptr<postgis_processor_context>;
class AsyncResultSet : public IResultSet, private mapnik::util::noncopyable
{
public:
AsyncResultSet(postgis_processor_context_ptr const& ctx,
std::shared_ptr< Pool<Connection,ConnectionCreator> > const& pool,
std::shared_ptr<Connection> const& conn, std::string const& sql )
: ctx_(ctx),
pool_(pool),
conn_(conn),
sql_(sql),
is_closed_(false)
{
}
virtual bool use_connection() { return true; }
virtual ~AsyncResultSet()
{
close();
}
void abort()
{
if(conn_ && conn_->isPending() )
{
MAPNIK_LOG_DEBUG(postgis) << "AsyncResultSet: aborting pending connection - " << conn_.get();
// there is no easy way to abort a pending connection, so we close it : this will ensure that
// the connection will be recycled in the pool
conn_->close();
}
}
virtual void close()
{
if (!is_closed_)
{
rs_.reset();
is_closed_ = true;
if (conn_)
{
if(conn_->isPending())
{
abort();
}
conn_.reset();
}
}
}
virtual int getNumFields() const
{
return rs_->getNumFields();
}
virtual bool next()
{
bool next_res = false;
if (!rs_)
{
// Ensure connection is valid
if (conn_ && conn_->isOK())
{
rs_ = conn_->getAsyncResult();
}
else
{
throw mapnik::datasource_exception("invalid connection in AsyncResultSet::next");
}
}
next_res = rs_->next();
if (!next_res)
{
rs_.reset();
rs_ = conn_->getNextAsyncResult();
if (rs_ && rs_->next())
{
return true;
}
close();
prepare_next();
}
return next_res;
}
virtual const char* getFieldName(int index) const
{
return rs_->getFieldName(index);
}
virtual int getFieldLength(int index) const
{
return rs_->getFieldLength(index);
}
virtual int getFieldLength(const char* name) const
{
return rs_->getFieldLength(name);
}
virtual int getTypeOID(int index) const
{
return rs_->getTypeOID(index);
}
virtual int getTypeOID(const char* name) const
{
return rs_->getTypeOID(name);
}
virtual bool isNull(int index) const
{
return rs_->isNull(index);
}
virtual const char* getValue(int index) const
{
return rs_->getValue(index);
}
virtual const char* getValue(const char* name) const
{
return rs_->getValue(name);
}
private:
postgis_processor_context_ptr ctx_;
std::shared_ptr< Pool<Connection,ConnectionCreator> > pool_;
std::shared_ptr<Connection> conn_;
std::string sql_;
std::shared_ptr<ResultSet> rs_;
bool is_closed_;
void prepare()
{
conn_ = pool_->borrowObject();
if (conn_ && conn_->isOK())
{
conn_->executeAsyncQuery(sql_, 1);
}
else
{
throw mapnik::datasource_exception("Postgis Plugin: bad connection");
}
}
void prepare_next();
};
class postgis_processor_context : public mapnik::IProcessorContext
{
public:
postgis_processor_context()
: num_async_requests_(0) {}
~postgis_processor_context() {}
void add_request(std::shared_ptr<AsyncResultSet> const& req)
{
q_.push(req);
}
std::shared_ptr<AsyncResultSet> pop_next_request()
{
std::shared_ptr<AsyncResultSet> r;
if (!q_.empty())
{
r = q_.front();
q_.pop();
}
return r;
}
int num_async_requests_;
private:
using async_queue = std::queue<std::shared_ptr<AsyncResultSet> >;
async_queue q_;
};
inline void AsyncResultSet::prepare_next()
{
// ensure cnx pool has unused cnx
std::shared_ptr<AsyncResultSet> next = ctx_->pop_next_request();
if (next)
{
next->prepare();
}
}
#endif // POSTGIS_ASYNCRESULTSET_HPP