This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis
Introduction¶
GeoPandas¶
- GeoPandas is an open source project to make working with geospatial data in python easier.
GeoPandas adds a spatial geometry data type to Pandas and enables spatial operations on these types, using shapely. GeoPandas leverages Pandas together with several core open source geospatial packages and practices to provide a uniquely simple and convenient framework for handling geospatial feature data, operating on both geometries and attributes jointly, and as with Pandas, largely eliminating the need to iterate over features (rows). Also as with Pandas, it adds a very convenient and fine-tuned plotting method, and read/write methods that handle multiple file and "serialization" formats.
NOTES:
- Like shapely, these spatial data types are limited to discrete entities/features and do not address continuously varying rasters or fields.
- While GeoPandas spatial objects can be assigned a Coordinate Reference System (CRS), operations can not be performed across CRS's. Plus, geodetic ("unprojected", lat-lon) CRS are not handled in a special way; the area of a geodetic polygon will be in degrees.
GeoPandas is still young, but it builds on mature and stable and widely used packages (Pandas, shapely, etc). Expect kinks and continued growth!
When should you use GeoPandas?
- For exploratory data analysis, including in Jupyter notebooks.
- For highly compact and readable code. Which in turn improves reproducibility.
- If you're comfortable with Pandas, R dataframes, or tabular/relational approaches.
When it may not be the best tool?
- If you need high performance (though I'm not completely sure -- it uses a nice rtree index).
- For polished map creation and multi-layer, interactive visualization; if you're comfortable with GIS, use a desktop GIS like QGIS! You can generate intermediate GIS files and plots with GeoPandas, then shift over to QGIS. Or refine the plots in Python with matplotlib or additional packages.
WKT/WKB: Well Known Text and Well Known Binary¶
- Well-known text (WKT) is a text markup language for representing vector geometry objects on a map, spatial reference systems of spatial objects and transformations between spatial reference systems.
- A binary equivalent, known as well-known binary (WKB), is used to transfer and store the same information on databases, such as PostGIS, Microsoft SQL Server and DB2. The formats were originally defined by the Open Geospatial Consortium (OGC) and described in their Simple Feature Access and Coordinate Transformation Service specifications.
Implications for PySpark data de/serialization & un/marshalling¶
- When using pyspark we have to send data back and forth between master node and the workers which run jobs on the JVM. - In order to simplify this rather than sending Python or more precisely Shapely objects we will use WKT.
Import Packages¶
import os.path, json, io
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 20)
from retrying import retry # for exponential back down when calling TurboOverdrive API
import pyspark.sql.functions as func # resuse as func.coalace for example
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql import SparkSession
import pandas as pd
from geopandas import GeoDataFrame # Loading boundaries Data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data
from ast import literal_eval as make_tuple # used to decode data from java
# Create SparkSession and attach Sparkcontext to it
spark = SparkSession.builder.appName("pyspark-geopandas").getOrCreate()
sc = spark.sparkContext
Load Data in a GeoDataFrame¶
NOTE/HIGHLIGHT:
- It's worth noting that a GeoDataFrame can be described as a Feature Collection, where each row is a Feature, a geometry column is defined (thought the name of the column doesn't have to be "geometry"), and the attribute Properties are simply the other columns (the Pandas DataFrame part, if you will).
- More than one column can store geometry objects!
# Load the boundaries data
geo_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/states/')
geo_df.head()
Task 1: Find out what the schema for the data is¶
geo_df.columns
Task 2: Plot the geometries¶
Now we finally plot a real map (or blobs, depending on your aesthetics), from a dataset that's global and stored in "geographic" (latitude & longitude) coordinates.
geo_df.plot(column='STATE_ABBR', categorical=True, legend=True)
plt.show()
geo_df.plot(column='SUB_REGION', categorical=True, legend=True)
plt.show()
Task3: Print out the wkt version of the geometries¶
wkts = map(lambda g: g.to_wkt() , geo_df.geometry)
wkts[0]
type(geo_df.geometry)
Task 4: Setting a Projection¶
Setting a projection is how one tells geopandas how to interpret coordinates. If no CRS is set, geopandas geometry operations will still work, but coordinate transformations will not be possible and exported files may not be interpreted correctly by other software.
By setting the CRS, geopandas knows how to interpret the coordinates.
You can find the codes for most commonly used projections from (http://spatialreference.org/).
geo_df.crs={'init': 'epsg:4326'}
geo_df.crs
Task 5: calculate the area of the states we are using¶
geo_df.geometry.area
Task 6: Preparing our data for Spark¶
Spark expects a geospatial column as a WKT string. Internally it uses this to create OGC Geometries via Java Topology Suite (JTS). So in order to use Spatial Spark we will add the WKT column to our data.
geo_df['wkt'] = pd.Series(
map(lambda geom: str(geom.to_wkt()), geo_df['geometry']),
index=geo_df.index, dtype='string')
spark_df = spark.createDataFrame(geo_df)
# drop the geometry column because Spark can't infer a schema for it
df = geo_df.drop("geometry", axis=1)
df.columns
Task 7: Create a pyspark dataframe from GeoPandas¶
spark_df = spark.createDataFrame(df).cache()
spark_df.printSchema()
Task 8: Let's find Colorado¶
CO_rdd = spark_df.filter(spark_df['STATE_NAME'] == 'Colorado')
CO_rdd.first()
wkt.loads(CO_rdd.take(1)[0].wkt)
Task 9: Print of States sorted by their unemployed pop¶
states_pop = spark_df.select(
spark_df['STATE_NAME'],
spark_df['UNEMPLOY'].cast(IntegerType()))
states_pop = states_pop.sort(states_pop['UNEMPLOY'].desc())
states_pop.show()
Task 10: SQL-Version--> Print of States sorted by their unemployed pop¶
states_pop.createOrReplaceTempView("states")
spark.sql("""
SELECT STATE_NAME, UNEMPLOY
FROM states
ORDER BY UNEMPLOY desc""").show()
df = states_pop.toPandas()
df.head()
df.plot.bar(x=df['UNEMPLOY'], legend=True)
plt.show()