#include "csv_datasource.hpp" // boost #include #include #include #include #include #include // mapnik #include #include #include #include #include #include // mapnik::boolean // stl #include #include // fstream #include #include // ostream_operator // std lib #include #include using mapnik::datasource; using mapnik::parameters; using namespace boost::spirit; DATASOURCE_PLUGIN(csv_datasource) csv_datasource::csv_datasource(parameters const& params, bool bind) : datasource(params), desc_(*params_.get("type"), *params_.get("encoding","utf-8")), extent_(), filename_(), inline_string_(), file_length_(0), row_limit_(*params_.get("row_limit",0)), features_(), escape_(*params_.get("escape","")), separator_(*params_.get("separator","")), quote_(*params_.get("quote","")), headers_(), manual_headers_(boost::trim_copy(*params_.get("headers",""))), strict_(*params_.get("strict",false)), quiet_(*params_.get("quiet",false)), filesize_max_(*params_.get("filesize_max",20.0)) // MB { /* TODO: general: - refactor parser into generic class - tests alternate large file pipeline: - stat file, detect > 15 MB - build up csv line-by-line iterator - creates opportunity to filter attributes by map query speed: - add properties for wkt/lon/lat at parse time - remove boost::lexical_cast - add ability to pass 'filter' keyword to drop attributes at layer init - create quad tree on the fly for small/med size files - memory map large files for reading - smaller features (less memory overhead) usability: - enforce column names without leading digit - better error messages (add filepath) if not reading from string - move to spirit to tokenize and add character level error feedback: http://boost-spirit.com/home/articles/qi-example/tracking-the-input-position-while-parsing/ */ boost::optional inline_string = params_.get("inline"); if (inline_string) { inline_string_ = *inline_string; } else { boost::optional file = params_.get("file"); if (!file) throw mapnik::datasource_exception("CSV Plugin: missing parameter"); boost::optional base = params_.get("base"); if (base) filename_ = *base + "/" + *file; else filename_ = *file; } if (bind) { this->bind(); } } csv_datasource::~csv_datasource() { } void csv_datasource::bind() const { if (is_bound_) return; if (!inline_string_.empty()) { std::istringstream in(inline_string_); parse_csv(in,escape_, separator_, quote_); } else { std::ifstream in(filename_.c_str(),std::ios_base::in | std::ios_base::binary); if (!in.is_open()) throw mapnik::datasource_exception("CSV Plugin: could not open: '" + filename_ + "'"); parse_csv(in,escape_, separator_, quote_); in.close(); } is_bound_ = true; } template void csv_datasource::parse_csv(T& stream, std::string const& escape, std::string const& separator, std::string const& quote) const { if (filesize_max_ > 0) { stream.seekg (0, std::ios::end); int file_length_ = stream.tellg(); double file_mb = static_cast(file_length_)/1048576; // throw if this is an unreasonably large file to read into memory if (file_mb > filesize_max_) { std::ostringstream s; s << "CSV Plugin: csv file is greater than " << filesize_max_ << "MB " << " - you should use a more efficient data format like sqlite, postgis or a shapefile " << " to render this data (set 'filesize_max=0' to disable this restriction if you have lots of memory)"; throw mapnik::datasource_exception(s.str()); } // set back to start stream.seekg (0, std::ios::beg); } char newline; std::string csv_line; // autodetect newlines bool found_break = false; if (std::getline(stream,csv_line,'\n')) { found_break = true; newline = '\n'; } else if (std::getline(stream,csv_line,'\r')) { found_break = true; newline = '\r'; } else { throw mapnik::datasource_exception("CSV Plugin: could not detect any line breaks in this csv (http://en.wikipedia.org/wiki/Newline)\n"); } // set back to start stream.seekg (0, std::ios::beg); // if user has not passed separator manuall // then attempt to detect by reading first line std::string sep = boost::trim_copy(separator); if (sep.empty()) { // default to ',' sep = ","; // detect tabs int num_tabs = std::count(csv_line.begin(), csv_line.end(), '\t'); if (num_tabs > 0) { int num_commas = std::count(csv_line.begin(), csv_line.end(), ','); if (num_tabs > num_commas) { sep = "\t"; #ifdef MAPNIK_DEBUG std::clog << "CSV Plugin: auto detected tab separator\n"; #endif } } } typedef boost::escaped_list_separator escape_type; std::string esc = boost::trim_copy(escape); if (esc.empty()) esc = "\\"; std::string quo = boost::trim_copy(quote); if (quo.empty()) quo = "\""; #ifdef MAPNIK_DEBUG std::clog << "CSV Plugin: csv grammer: sep: '" << sep << "' quo: '" << quo << "' esc: '" << esc << "'\n"; #endif boost::escaped_list_separator grammer; try { //grammer = boost::escaped_list_separator('\\', ',', '\"'); grammer = boost::escaped_list_separator(esc, sep, quo); } catch (const std::exception & ex ) { std::ostringstream s; s << "CSV Plugin: " << ex.what(); throw mapnik::datasource_exception(s.str()); } typedef boost::tokenizer< escape_type > Tokenizer; int line_number(1); bool has_wkt_field = false; bool has_lat_field = false; bool has_lon_field = false; unsigned wkt_idx; unsigned lat_idx; unsigned lon_idx; if (!manual_headers_.empty()) { Tokenizer tok(manual_headers_, grammer); Tokenizer::iterator beg = tok.begin(); unsigned idx(0); for (; beg != tok.end(); ++beg) { std::string val = boost::trim_copy(*beg); std::string lower_val = boost::algorithm::to_lower_copy(val); if (lower_val == "wkt") { wkt_idx = idx; has_wkt_field = true; } if (lower_val == "x" || (lower_val.find("longitude") != std::string::npos)) { lon_idx = idx; has_lon_field = true; } if (lower_val == "y" || (lower_val.find("latitude") != std::string::npos)) { lat_idx = idx; has_lat_field = true; } ++idx; headers_.push_back(val); } } else // parse first line as headers { while (std::getline(stream,csv_line,newline)) { try { Tokenizer tok(csv_line, grammer); Tokenizer::iterator beg = tok.begin(); std::string val = boost::trim_copy(*beg); // skip blank lines if (val.empty()) { // do nothing ++line_number; } else { int idx = -1; for (; beg != tok.end(); ++beg) { ++idx; val = boost::trim_copy(*beg); if (val.empty()) { std::ostringstream s; s << "CSV Plugin: expected a column header at line " << line_number << ", column " << idx << " - ensure this row contains valid header fields: '" << csv_line << "'\n"; throw mapnik::datasource_exception(s.str()); } else { std::string lower_val = boost::algorithm::to_lower_copy(val); if (lower_val == "wkt") { wkt_idx = idx; has_wkt_field = true; } if (lower_val == "x" || (lower_val.find("longitude") != std::string::npos)) { lon_idx = idx; has_lon_field = true; } if (lower_val == "y" || (lower_val.find("latitude") != std::string::npos)) { lat_idx = idx; has_lat_field = true; } headers_.push_back(val); } } ++line_number; break; } } catch (const std::exception & ex ) { std::ostringstream s; s << "CSV Plugin: error parsing headers: " << ex.what(); throw mapnik::datasource_exception(s.str()); } } } if (!has_wkt_field && (!has_lon_field || !has_lat_field) ) { std::ostringstream s; s << "CSV Plugin: could not detect column headers with the name of 'wkt' or lat/lon - this is required for reading geometry data"; throw mapnik::datasource_exception(s.str()); } int feature_count(0); bool extent_initialized = false; int num_headers = headers_.size(); mapnik::transcoder tr(desc_.get_encoding()); while (std::getline(stream,csv_line,newline)) { if ((row_limit_ > 0) && (line_number > row_limit_)) { #ifdef MAPNIK_DEBUG std::clog << "CSV Plugin: row limit hit, exiting at feature: " << feature_count << "\n"; #endif break; } // skip blank lines if (csv_line.empty()){ ++line_number; continue; #ifdef MAPNIK_DEBUG std::clog << "CSV Plugin: empty row encountered at line: " << line_number << "\n"; #endif } try { Tokenizer tok(csv_line, grammer); Tokenizer::iterator beg = tok.begin(); // early return for strict mode if (strict_) { int num_fields = std::distance(beg,tok.end()); if (num_fields != num_headers) { std::ostringstream s; s << "CSV Plugin: # of headers != # of values parsed for row " << line_number << "\n"; throw mapnik::datasource_exception(s.str()); } } mapnik::feature_ptr feature(mapnik::feature_factory::create(feature_count)); double x(0); double y(0); bool parsed_x = false; bool parsed_y = false; bool parsed_wkt = false; bool first_feature = true; bool skip = false; bool null_geom = false; std::vector collected; int i = -1; for (;beg != tok.end(); ++beg) { ++i; std::string value = boost::trim_copy(*beg); // avoid range error if trailing separator if (i >= num_headers) { #ifdef MAPNIK_DEBUG std::clog << "CSV Plugin: messed up line encountered where # values > # column headers at: " << line_number << "\n"; #endif skip = true; break; } std::string fld_name(headers_.at(i)); collected.push_back(fld_name); int value_length = value.length(); // parse wkt if (has_wkt_field) { if (i == wkt_idx) { // skip empty geoms if (value.empty()) { null_geom = true; break; } // optimize simple "POINT (x y)" // using this shaved 2 seconds off csv that took 8 seconds total to parse if (value.find("POINT") == 0) { using boost::phoenix::ref; using boost::spirit::qi::_1; std::string::const_iterator str_beg = value.begin(); std::string::const_iterator str_end = value.end(); bool r = qi::phrase_parse(str_beg,str_end, ( qi::lit("POINT") >> '(' >> double_[ref(x) = _1] >> double_[ref(y) = _1] >> ')' ), ascii::space); if (r /*&& (str_beg != str_end)*/) { mapnik::geometry_type * pt = new mapnik::geometry_type(mapnik::Point); pt->move_to(x,y); feature->add_geometry(pt); parsed_wkt = true; } else { std::clog << "could not parse: " << value << "\n"; } } else { if (mapnik::from_wkt(value, feature->paths())) { parsed_wkt = true; } else { std::ostringstream s; s << "CSV Plugin: expected well known text geometry: could not parse row " << line_number << ",column " << i << " - found: '" << value << "'"; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; } } } } } else { // longitude if (i == lon_idx) { // skip empty geoms if (value.empty()) { null_geom = true; break; } try { x = boost::lexical_cast(value); parsed_x = true; } catch (boost::bad_lexical_cast & ex) { std::ostringstream s; s << "CSV Plugin: expected a float value for longitude: could not parse row " << line_number << ", column " << i << " - found: '" << value << "'"; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; } } } // latitude else if (i == lat_idx) { // skip empty geoms if (value.empty()) { null_geom = true; break; } try { y = boost::lexical_cast(value); parsed_y = true; } catch (boost::bad_lexical_cast & ex) { std::ostringstream s; s << "CSV Plugin: expected a float value for latitude: could not parse row " << line_number << ", column " << i << " - found: '" << value << "'"; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; } } } } // add all values as attributes if (value.empty()) { boost::put(*feature,fld_name,mapnik::value_null()); } // only true strings are this long else if (value_length > 20) { UnicodeString ustr = tr.transcode(value.c_str()); boost::put(*feature,fld_name,ustr); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); } else if ((value[0] >= '0' && value[0] <= '9') || value[0] == '-') { double float_val = 0.0; std::string::const_iterator str_beg = value.begin(); std::string::const_iterator str_end = value.end(); bool r = qi::phrase_parse(str_beg,str_end,qi::double_,ascii::space,float_val); if (r) { if (value.find(".") != std::string::npos) { boost::put(*feature,fld_name,float_val); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Double)); } else { int val = static_cast(float_val); boost::put(*feature,fld_name,val); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Integer)); } } else { // fallback to normal string UnicodeString ustr = tr.transcode(value.c_str()); boost::put(*feature,fld_name,ustr); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); } } else { if (value == "true") { boost::put(*feature,fld_name,true); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); } else if(value == "false") { boost::put(*feature,fld_name,false); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::Boolean)); } else { // fallback to normal string UnicodeString ustr = tr.transcode(value.c_str()); boost::put(*feature,fld_name,ustr); if (first_feature) desc_.add_descriptor(mapnik::attribute_descriptor(fld_name,mapnik::String)); } } } first_feature = false; if (skip) { ++line_number; std::ostringstream s; s << "CSV Plugin: # values > # column headers" << "for line " << line_number << " - found " << headers_.size() << " with values like: " << csv_line << "\n"; //<< "for: " << boost::algorithm::join(collected, ",") << "\n"; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; continue; } } else if (null_geom) { ++line_number; std::ostringstream s; s << "CSV Plugin: null geometry encountered for line " << line_number; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; continue; } } if (has_wkt_field) { if (parsed_wkt) { if (!extent_initialized) { extent_initialized = true; extent_ = feature->envelope(); } else { extent_.expand_to_include(feature->envelope()); } features_.push_back(feature); } else { std::ostringstream s; s << "CSV Plugin: could not read WKT geometry " << "for line " << line_number << " - found " << headers_.size() << " with values like: " << csv_line << "\n"; if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; continue; } } } else { if (parsed_x && parsed_y) { mapnik::geometry_type * pt = new mapnik::geometry_type(mapnik::Point); pt->move_to(x,y); feature->add_geometry(pt); features_.push_back(feature); ++feature_count; if (!extent_initialized) { extent_initialized = true; extent_ = feature->envelope(); } else { extent_.expand_to_include(feature->envelope()); } } else { std::ostringstream s; if (!parsed_x) { s << "CSV Plugin: does your csv have valid headers?\n" << "Could not detect or parse any rows named 'x' or 'longitude' " << "for line " << line_number << " but found " << headers_.size() << " with values like: " << csv_line << "\n" << "for: " << boost::algorithm::join(collected, ",") << "\n"; } if (!parsed_y) { s << "CSV Plugin: does your csv have valid headers?\n" << "Could not detect or parse any rows named 'y' or 'latitude' " << "for line " << line_number << " but found " << headers_.size() << " with values like: " << csv_line << "\n" << "for: " << boost::algorithm::join(collected, ",") << "\n"; } if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; continue; } } } ++line_number; } catch (const std::exception & ex ) { std::ostringstream s; s << "CSV Plugin: unexpected error parsing line: " << line_number << " - found " << headers_.size() << " with values like: " << csv_line << "\n" << " and got error like: " << ex.what(); if (strict_) { throw mapnik::datasource_exception(s.str()); } else { if (!quiet_) std::clog << s.str() << "\n"; } } } } std::string csv_datasource::name() { return "csv"; } int csv_datasource::type() const { return datasource::Vector; } mapnik::box2d csv_datasource::envelope() const { if (!is_bound_) bind(); return extent_; } mapnik::layer_descriptor csv_datasource::get_descriptor() const { if (!is_bound_) bind(); return desc_; } mapnik::featureset_ptr csv_datasource::features(mapnik::query const& q) const { if (!is_bound_) bind(); // TODO - should we check q.property_names() and throw if not found in headers_? //const std::set& attribute_names = q.property_names(); return boost::make_shared(q.get_bbox(),features_); } mapnik::featureset_ptr csv_datasource::features_at_point(mapnik::coord2d const& pt) const { if (!is_bound_) bind(); throw mapnik::datasource_exception("CSV Plugin: features_at_point is not supported yet"); }