edit 002-geopandas-and-spark

This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis

Introduction¶

GeoPandas¶

  • GeoPandas is an open source project to make working with geospatial data in python easier.

GeoPandas adds a spatial geometry data type to Pandas and enables spatial operations on these types, using shapely. GeoPandas leverages Pandas together with several core open source geospatial packages and practices to provide a uniquely simple and convenient framework for handling geospatial feature data, operating on both geometries and attributes jointly, and as with Pandas, largely eliminating the need to iterate over features (rows). Also as with Pandas, it adds a very convenient and fine-tuned plotting method, and read/write methods that handle multiple file and "serialization" formats.

NOTES:

  • Like shapely, these spatial data types are limited to discrete entities/features and do not address continuously varying rasters or fields.
  • While GeoPandas spatial objects can be assigned a Coordinate Reference System (CRS), operations can not be performed across CRS's. Plus, geodetic ("unprojected", lat-lon) CRS are not handled in a special way; the area of a geodetic polygon will be in degrees.

GeoPandas is still young, but it builds on mature and stable and widely used packages (Pandas, shapely, etc). Expect kinks and continued growth!

When should you use GeoPandas?

  • For exploratory data analysis, including in Jupyter notebooks.
  • For highly compact and readable code. Which in turn improves reproducibility.
  • If you're comfortable with Pandas, R dataframes, or tabular/relational approaches.

When it may not be the best tool?

  • If you need high performance (though I'm not completely sure -- it uses a nice rtree index).
  • For polished map creation and multi-layer, interactive visualization; if you're comfortable with GIS, use a desktop GIS like QGIS! You can generate intermediate GIS files and plots with GeoPandas, then shift over to QGIS. Or refine the plots in Python with matplotlib or additional packages.

Source: https://nbviewer.jupyter.org/github/geohackweek/vector/blob/gh-pages/docker/notebooks/geopandas_intro.ipynb

WKT/WKB: Well Known Text and Well Known Binary¶

  • Well-known text (WKT) is a text markup language for representing vector geometry objects on a map, spatial reference systems of spatial objects and transformations between spatial reference systems.
  • A binary equivalent, known as well-known binary (WKB), is used to transfer and store the same information on databases, such as PostGIS, Microsoft SQL Server and DB2. The formats were originally defined by the Open Geospatial Consortium (OGC) and described in their Simple Feature Access and Coordinate Transformation Service specifications.

Implications for PySpark data de/serialization & un/marshalling¶

  • When using pyspark we have to send data back and forth between master node and the workers which run jobs on the JVM. - In order to simplify this rather than sending Python or more precisely Shapely objects we will use WKT.

Import Packages¶

In [1]:
import os.path, json, io
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 20)

from retrying import retry # for exponential back down when calling TurboOverdrive API

import pyspark.sql.functions as func # resuse as func.coalace for example
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql import SparkSession

import pandas as pd
from geopandas import GeoDataFrame # Loading boundaries Data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data
from ast import literal_eval as make_tuple # used to decode data from java
In [2]:
# Create SparkSession and attach Sparkcontext to it
spark = SparkSession.builder.appName("pyspark-geopandas").getOrCreate()
sc = spark.sparkContext

Load Data in a GeoDataFrame¶

NOTE/HIGHLIGHT:

  • It's worth noting that a GeoDataFrame can be described as a Feature Collection, where each row is a Feature, a geometry column is defined (thought the name of the column doesn't have to be "geometry"), and the attribute Properties are simply the other columns (the Pandas DataFrame part, if you will).
  • More than one column can store geometry objects!
In [3]:
# Load the boundaries data
geo_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/states/')
In [4]:
geo_df.head()
Out[4]:
CARPOOL DRVALONE EMPLOYED FAMILIES FEMALE HOUSHOLD LAND_KM MALE MANUAL PERSONS ... SAMP_POP SERVICE STATE_ABBR STATE_FIPS STATE_NAME SUB_REGION UNEMPLOY WATER_KM WORKERS geometry
0 652603.0 3741715.0 5417967.0 2924880.0 5878369.0 4202240.0 143986.610 5552233.0 828906.0 11430602.0 ... 1747776.0 1360159.0 IL 17 Illinois E N Cen 385040.0 1993.335 4199206.0 POLYGON ((-88.071564 37.51099000000001, -88.08...
1 36621.0 106694.0 303994.0 122087.0 323930.0 249634.0 159.055 282970.0 22407.0 606900.0 ... 72696.0 65498.0 DC 11 District of Columbia S Atl 23442.0 17.991 229975.0 POLYGON ((-77.00823200000001 38.96655699999999...
2 42968.0 258087.0 335147.0 175867.0 343200.0 247497.0 5062.456 322968.0 44140.0 666168.0 ... 102776.0 87973.0 DE 10 Delaware S Atl 13945.0 1385.022 247566.0 POLYGON ((-75.70742 38.55747600000001, -75.711...
3 106918.0 493164.0 671085.0 500259.0 931941.0 688557.0 62384.200 861536.0 124172.0 1793477.0 ... 317564.0 205950.0 WV 54 West Virginia S Atl 71142.0 375.199 661702.0 POLYGON ((-79.231903 38.48037299999999, -79.27...
4 376449.0 1732837.0 2481342.0 1245814.0 2462797.0 1748991.0 25316.345 2318671.0 260308.0 4781468.0 ... 684773.0 586994.0 MD 24 Maryland S Atl 111536.0 6188.794 1783061.0 (POLYGON ((-75.71106 38.649551, -75.70742 38.5...

5 rows × 23 columns

Task 1: Find out what the schema for the data is¶

In [5]:
geo_df.columns
Out[5]:
Index([   u'CARPOOL',   u'DRVALONE',   u'EMPLOYED',   u'FAMILIES',
           u'FEMALE',   u'HOUSHOLD',    u'LAND_KM',       u'MALE',
           u'MANUAL',    u'PERSONS',   u'PUBTRANS',   u'P_FEMALE',
           u'P_MALE',   u'SAMP_POP',    u'SERVICE', u'STATE_ABBR',
       u'STATE_FIPS', u'STATE_NAME', u'SUB_REGION',   u'UNEMPLOY',
         u'WATER_KM',    u'WORKERS',   u'geometry'],
      dtype='object')

Task 2: Plot the geometries¶

Now we finally plot a real map (or blobs, depending on your aesthetics), from a dataset that's global and stored in "geographic" (latitude & longitude) coordinates.

In [61]:
geo_df.plot(column='STATE_ABBR', categorical=True, legend=True)
plt.show()
In [54]:
geo_df.plot(column='SUB_REGION', categorical=True, legend=True)
plt.show()

Task3: Print out the wkt version of the geometries¶

In [7]:
wkts = map(lambda g: g.to_wkt() , geo_df.geometry)
wkts[0]
type(geo_df.geometry)
Out[7]:
geopandas.geoseries.GeoSeries

Task 4: Setting a Projection¶

Setting a projection is how one tells geopandas how to interpret coordinates. If no CRS is set, geopandas geometry operations will still work, but coordinate transformations will not be possible and exported files may not be interpreted correctly by other software.

By setting the CRS, geopandas knows how to interpret the coordinates.

You can find the codes for most commonly used projections from (http://spatialreference.org/).

In [8]:
geo_df.crs={'init': 'epsg:4326'}
In [9]:
geo_df.crs
Out[9]:
{'init': 'epsg:4326'}

Task 5: calculate the area of the states we are using¶

In [49]:
geo_df.geometry.area
Out[49]:
0     15.396467
1      0.017770
2      0.553318
3      6.493195
4      2.625117
5     28.041027
6     10.645120
7     21.981892
8     10.512136
9     18.647593
10    28.858782
11    18.031206
12    12.627803
13    10.876448
14    65.059583
15    30.936483
16    12.897116
17    11.870990
18    14.598796
19     7.794398
20    13.517338
21    11.224646
22    13.347714
23    16.927960
24    45.131676
25     9.570610
26    21.874284
27    22.597693
28    27.965643
29    16.477302
30    24.390738
31     2.794099
32    25.576939
33    28.187028
34     2.677289
35    15.853172
36     2.309183
37    21.606405
38    13.874600
39    12.550054
40     1.392486
41     0.292620
42     2.056683
43     9.931907
44    29.969229
45    22.966968
46    41.533085
47    11.299889
48    20.749828
dtype: float64

Task 6: Preparing our data for Spark¶

Spark expects a geospatial column as a WKT string. Internally it uses this to create OGC Geometries via Java Topology Suite (JTS). So in order to use Spatial Spark we will add the WKT column to our data.

In [12]:
geo_df['wkt'] = pd.Series(
        map(lambda geom: str(geom.to_wkt()), geo_df['geometry']),
        index=geo_df.index, dtype='string')
In [13]:
spark_df = spark.createDataFrame(geo_df)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-13-d182222fcbd5> in <module>()
----> 1 spark_df = spark.createDataFrame(geo_df)

/usr/local/Cellar/apache-spark/2.1.1/libexec/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    524             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    525         else:
--> 526             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    527         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    528         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/Cellar/apache-spark/2.1.1/libexec/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema)
    388 
    389         if schema is None or isinstance(schema, (list, tuple)):
--> 390             struct = self._inferSchemaFromList(data)
    391             converter = _create_converter(struct)
    392             data = map(converter, data)

/usr/local/Cellar/apache-spark/2.1.1/libexec/python/pyspark/sql/session.pyc in _inferSchemaFromList(self, data)
    320             warnings.warn("inferring schema from dict is deprecated,"
    321                           "please use pyspark.sql.Row instead")
--> 322         schema = reduce(_merge_type, map(_infer_schema, data))
    323         if _has_nulltype(schema):
    324             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/Cellar/apache-spark/2.1.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
    992         raise TypeError("Can not infer schema for type: %s" % type(row))
    993 
--> 994     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    995     return StructType(fields)
    996 

/usr/local/Cellar/apache-spark/2.1.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
    969             return _infer_schema(obj)
    970         except TypeError:
--> 971             raise TypeError("not supported type: %s" % type(obj))
    972 
    973 

TypeError: not supported type: <class 'shapely.geometry.polygon.Polygon'>
In [14]:
# drop the geometry column because Spark can't infer a schema for it
df = geo_df.drop("geometry", axis=1)
In [15]:
df.columns
Out[15]:
Index([   u'CARPOOL',   u'DRVALONE',   u'EMPLOYED',   u'FAMILIES',
           u'FEMALE',   u'HOUSHOLD',    u'LAND_KM',       u'MALE',
           u'MANUAL',    u'PERSONS',   u'PUBTRANS',   u'P_FEMALE',
           u'P_MALE',   u'SAMP_POP',    u'SERVICE', u'STATE_ABBR',
       u'STATE_FIPS', u'STATE_NAME', u'SUB_REGION',   u'UNEMPLOY',
         u'WATER_KM',    u'WORKERS',        u'wkt'],
      dtype='object')

Task 7: Create a pyspark dataframe from GeoPandas¶

In [17]:
spark_df = spark.createDataFrame(df).cache()
In [18]:
spark_df.printSchema()
root
 |-- CARPOOL: double (nullable = true)
 |-- DRVALONE: double (nullable = true)
 |-- EMPLOYED: double (nullable = true)
 |-- FAMILIES: double (nullable = true)
 |-- FEMALE: double (nullable = true)
 |-- HOUSHOLD: double (nullable = true)
 |-- LAND_KM: double (nullable = true)
 |-- MALE: double (nullable = true)
 |-- MANUAL: double (nullable = true)
 |-- PERSONS: double (nullable = true)
 |-- PUBTRANS: double (nullable = true)
 |-- P_FEMALE: double (nullable = true)
 |-- P_MALE: double (nullable = true)
 |-- SAMP_POP: double (nullable = true)
 |-- SERVICE: double (nullable = true)
 |-- STATE_ABBR: string (nullable = true)
 |-- STATE_FIPS: string (nullable = true)
 |-- STATE_NAME: string (nullable = true)
 |-- SUB_REGION: string (nullable = true)
 |-- UNEMPLOY: double (nullable = true)
 |-- WATER_KM: double (nullable = true)
 |-- WORKERS: double (nullable = true)
 |-- wkt: string (nullable = true)

Task 8: Let's find Colorado¶

In [22]:
CO_rdd = spark_df.filter(spark_df['STATE_NAME'] == 'Colorado')
In [23]:
CO_rdd.first()
Out[23]:
Row(CARPOOL=210274.0, DRVALONE=1216639.0, EMPLOYED=1633281.0, FAMILIES=854214.0, FEMALE=1663099.0, HOUSHOLD=1282489.0, LAND_KM=268659.501, MALE=1631295.0, MANUAL=181760.0, PERSONS=3294394.0, PUBTRANS=46983.0, P_FEMALE=0.505, P_MALE=0.495, SAMP_POP=512677.0, SERVICE=421079.0, STATE_ABBR=u'CO', STATE_FIPS=u'08', STATE_NAME=u'Colorado', SUB_REGION=u'Mtn', UNEMPLOY=99438.0, WATER_KM=960.364, WORKERS=1233023.0, wkt=u'POLYGON ((-102.0439989999999995 37.6414599999999950, -102.0415569999999974 37.3862610000000046, -102.0367580000000061 36.9889720000000040, -102.9972230000000053 36.9985049999999944, -103.0773769999999985 36.9997410000000002, -103.9931109999999990 36.9944459999999964, -105.1456150000000065 36.9931830000000019, -105.2125319999999959 36.9925800000000038, -105.7128909999999991 36.9945409999999981, -105.9914250000000067 36.9922750000000065, -106.4715879999999970 36.9914930000000055, -106.8606570000000033 36.9894910000000010, -106.8897780000000068 36.9990729999999957, -107.4102170000000029 36.9975199999999944, -107.4718550000000050 36.9987720000000024, -108.3718340000000069 36.9994740000000064, -109.0478209999999990 36.9966430000000059, -109.0449370000000044 37.6308290000000056, -109.0425419999999974 37.8874279999999999, -109.0428009999999972 38.1529429999999934, -109.0551990000000018 38.2449300000000108, -109.0532840000000050 38.4946630000000027, -109.0507510000000053 39.3609889999999893, -109.0528639999999996 39.5181959999999890, -109.0518880000000053 39.6574099999999987, -109.0505909999999972 40.2105449999999962, -109.0454790000000003 40.6653290000000140, -109.0476380000000063 40.9984739999999874, -107.9180369999999982 41.0034100000000024, -107.3034360000000049 41.0001680000000022, -106.8648380000000060 40.9984890000000064, -106.3285450000000054 41.0013160000000028, -106.2028959999999955 41.0001110000000040, -105.2782590000000056 40.9963649999999973, -104.9339679999999930 40.9943049999999971, -104.0512010000000060 41.0032270000000096, -103.5718229999999949 40.9996639999999957, -103.3824690000000004 41.0003319999999860, -102.6518020000000035 40.9981349999999907, -102.6207890000000020 41.0002250000000004, -102.0472790000000032 40.9980769999999950, -102.0465319999999991 40.7431339999999977, -102.0455699999999979 40.6973230000000115, -102.0471569999999986 40.4310839999999985, -102.0470810000000057 40.3426509999999894, -102.0510709999999932 39.9989280000000065, -102.0489809999999977 39.5686990000000094, -102.0483399999999961 39.5628089999999872, -102.0474169999999958 39.1267510000000129, -102.0485149999999948 39.0369990000000087, -102.0471119999999985 38.6925390000000107, -102.0471270000000032 38.6154860000000042, -102.0450900000000019 38.2633289999999988, -102.0456010000000049 38.2538069999999948, -102.0435180000000059 37.7343860000000006, -102.0439989999999995 37.6414599999999950))')
In [62]:
wkt.loads(CO_rdd.take(1)[0].wkt)
Out[62]:

Task 9: Print of States sorted by their unemployed pop¶

In [25]:
states_pop = spark_df.select(
             spark_df['STATE_NAME'],
             spark_df['UNEMPLOY'].cast(IntegerType()))
In [27]:
states_pop = states_pop.sort(states_pop['UNEMPLOY'].desc())
states_pop.show()
+--------------+--------+
|    STATE_NAME|UNEMPLOY|
+--------------+--------+
|    California|  996502|
|      New York|  636280|
|         Texas|  590269|
|      Illinois|  385040|
|      Michigan|  374341|
|       Florida|  356769|
|  Pennsylvania|  344795|
|          Ohio|  324867|
|    New Jersey|  218598|
| Massachusetts|  218000|
|       Georgia|  187390|
|     Louisiana|  175303|
|North Carolina|  163081|
|       Indiana|  160143|
|      Missouri|  155388|
|     Tennessee|  152128|
|      Kentucky|  148125|
|      Virginia|  141926|
|    Washington|  139216|
|     Wisconsin|  129191|
+--------------+--------+
only showing top 20 rows

Task 10: SQL-Version--> Print of States sorted by their unemployed pop¶

In [33]:
states_pop.createOrReplaceTempView("states")
In [34]:
spark.sql("""
          SELECT STATE_NAME, UNEMPLOY
          FROM states 
          ORDER BY UNEMPLOY desc""").show()
+--------------+--------+
|    STATE_NAME|UNEMPLOY|
+--------------+--------+
|    California|  996502|
|      New York|  636280|
|         Texas|  590269|
|      Illinois|  385040|
|      Michigan|  374341|
|       Florida|  356769|
|  Pennsylvania|  344795|
|          Ohio|  324867|
|    New Jersey|  218598|
| Massachusetts|  218000|
|       Georgia|  187390|
|     Louisiana|  175303|
|North Carolina|  163081|
|       Indiana|  160143|
|      Missouri|  155388|
|     Tennessee|  152128|
|      Kentucky|  148125|
|      Virginia|  141926|
|    Washington|  139216|
|     Wisconsin|  129191|
+--------------+--------+
only showing top 20 rows

In [38]:
df = states_pop.toPandas()
In [39]:
df.head()
Out[39]:
STATE_NAME UNEMPLOY
0 California 996502
1 New York 636280
2 Texas 590269
3 Illinois 385040
4 Michigan 374341
In [56]:
df.plot.bar(x=df['UNEMPLOY'], legend=True)
plt.show()