OSM provides a wealth of data which has broad coverage and a deep history. This comes at the price of very large size which can make accessing the power of OSM difficult. VectorPipe can help by making OSM processing in Apache Spark possible, leveraging large computing clusters to churn through the large volume of, say, an OSM full history file.
For those cases where an application needs to process incoming changes, VP also provides streaming Spark
DataSources for changesets, OsmChange files, and Augmented diffs generated by Overpass.
For ease of use, the output of VP imports is a Spark DataFrame containing columns of JTS
Geometry objects, enabled by the user-defined types provided by GeoMesa. That package also provides functions for manipulating those geometries via Spark SQL directives.
The final important contribution is a set of functions for exporting geometries to vector tiles. This leans on the
Add the following to your
libraryDependencies += "com.azavea.geotrellis" %% "vectorpipe" % "2.2.0"
Note: VectorPipe releases for version 2.0.0+ are hosted on SonaType. If you need earlier releases, they can be found on Bintray. If using SBT for older releases, you will also need to include
resolvers ++= Resolver.bintrayRepo("azavea", "maven") in your
With a REPL
The fastest way to get started with VectorPipe in a REPL is to invoke
spark-shell --packages com.azavea.geotrellis:vectorpipe_2.11:2.2.0
This will download the required components and set up a REPL with VectorPipe available. At which point, you may issue
// Make JTS types available to Spark import org.locationtech.geomesa.spark.jts._ spark.withJTS import vectorpipe._
and begin using the package.
A Note on Cluster Computing
Your local machine is probably insufficient for dealing with very large OSM files. We recommend the use of Amazon's Elastic Map Reduce (EMR) service to provision substantial clusters of computing resources. You'll want to supply Spark, Hive, and Hadoop to your cluster, with Spark version 2.3. Creating a cluster with EMR version between 5.13 and 5.19 should suffice. From there,
ssh into the master node and run
spark-shell as above for an interactive environment, or use
spark-submit for batch jobs. (You may submit Steps to the EMR cluster using
spark-submit as well.)
Batch analysis can be performed in a few different ways. Perhaps the fastest way is to procure an OSM PBF file from a source such as GeoFabrik, which supplies various extracts of OSM, including the full planet worth of data.
VectorPipe does not provide the means to directly read these OSM PBF files, however, and a conversion to a useful file format will thus be needed. We suggest using
osm2orc to convert your source file to the ORC format which can be read natively via Spark:
val df = spark.read.orc(path)
DataFrame can be processed with VectorPipe.
It is also possible to read from a cache of OsmChange files directly rather than convert the PBF file:
import vectorpipe.sources.Source val df = spark.read .format(Source.Changes) .options(Map[String, String]( Source.BaseURI -> "https://download.geofabrik.de/europe/isle-of-man-updates/", Source.StartSequence -> "2080", Source.EndSequence -> "2174", Source.BatchSize -> "1")) .load .persist // recommended to avoid rereading
(Note that the start and end sequence will shift over time for Geofabrik. Please navigate to the base URI to determine these values, otherwise timeouts may occur.) This may issue errors, but should complete. This is much slower than using ORC files and is much touchier, but it stands as an option.
[It is also possible to build a dataframe from a stream of changesets in a similar manner as above. Changesets carry additional metadata regarding the author of the changes, but none of the geometric information. These tables can be joined on
In either case, a useful place to start is to convert the incoming dataframe into a more usable format. We recommend calling
val geoms = OSM.toGeometry(df)
which will produce a frame consisting of "top-level" entities, which is to say nodes that don't participate in a way, ways that don't participate in relations, and a subset of the relations from the OSM data. The resulting dataframe will represent these entities with JTS geometries in the
toGeometry function keeps elements that fit one of the following descriptions:
- points from tagged nodes (including tags that really ought to be dropped—e.g.
- polygons derived from ways with tags that cause them to be considered as areas;
- lines from ways lacking area tags;
- multipolygons from multipolygon or boundary relations; and
- multilinestrings from route relations.
It is also possible to filter the results based on information in the tags. For instance, all buildings can be found as
import vectorpipe.functions.osm._ val buildings = geoms.filter(isBuilding('tags))
Again, the JTS user defined types allow for easier manipulation of and calculation from geometric types. See here for a list of functions that operate on geometries.
A Note on Geocoding
VectorPipe provides the means to tag geometries with the country codes of the countries they interact with, but it does not provide the boundaries used to do the coding. That gives the user the option to select geometries appropriate to the task at hand—low resolution geometries for less fussy applications, high resolution when precision is important.
In order for an application to make use of
vectorpipe.util.Geocode, it must supply a
countries.geojson in in the root of its project's
resources directory. That GeoJSON file must contain a
FeatureCollection, with each entry having an
ADM0_A3 entry in its
One may employ the Natural Earth Admin 0 resource for low-precision tasks, or use something like the Global LSIB Polygons for more precise tasks (though the latter resource does not tag its elements with the
ADM0_A3 three-letter codes, so some preprocessing would be required).
While most users will rely solely on the features exposed by the
OSM object, finer-grained control of the output of the process—say, if one does not need relations, for example—is available through the
There is a significant caveat here: there are two schemas that are found in the system when working with imported OSM dataframes. The difference is in the type of a sub-field of the
members list. This can cause errors of the form
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Byte
when using the
internal package methods.
These type problems can be fixed by calling
vectorpipe.functions.osm.ensureCompressedMembers on the input OSM data frame before passing to any relation-generating functions, such as
reconstructRelationGeometries. Top-level functions in the
OSM object handle this conversion for you. Note that this only affects the data frames carrying the initially imported OSM data.
If you are intending to contribute to VectorPipe, you may need to work with a development version. If that is the case, instead of loading from Bintray, you will need to build a fat jar using
and following that,
spark-shell --jars target/scala_2.11/vectorpipe.jar
When developing with IntelliJ IDEA, the sbt plugin will see Spark dependencies as provided, which will prevent them from being indexed properly, resulting in errors / warnings within the IDE. To fix this, create
idea.sbt at the root of the project:
import Dependencies._ lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings( libraryDependencies ++= Seq( sparkSql % Compile ) )