This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis
Introduction¶
The Coastal Marine Zones dataset used in this notebook can be found here.
Abstract:
Coastal marine forecasts and Special Marine Warnings are issued by zone, each zone identified by a text description and a Universal Generic Code (UGC). These forecasts are prepared by the individual Weather Forecast Office responsible for the zone.
Purpose:
To deliniate the coastal marine zones for the use in creating coastal marine forecasts and warnings.
Import Packages¶
import os.path, json, io
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 20)
import pyspark.sql.functions as func # resuse as func.coalace for example
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql import SparkSession
import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame # Loading boundaries Data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data
from ast import literal_eval as make_tuple # used to decode data from java
# Create SparkSession and attach Sparkcontext to it
spark = SparkSession.builder.appName("coastal-marine-zones").getOrCreate()
sc = spark.sparkContext
Task1 : Load Coastal Marine ZonesData¶
# Load the Coastal Marine Zones data
mz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/mz11au16/')
mz_df.columns
mz_df.head()
Detailed Description of the columns:
- ID: Identifier of zone
- WFO: Identifier of organization responsible for forecast to the zone
- NAME: Name of zone
- LON: Longitude of centroid zone
- LAT: Latitude of centroid of zone
mz_df.describe()
mz_df.crs
Task 2: Load Offshore Marine Zones data¶
oz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/oz01ap14b/')
oz_df.head()
oz_df.describe()
oz_df.crs
oz_df.plot()
plt.show()
oz_df.plot(column='WFO', categorical=True, legend=True)
plt.show()
oz_df.plot(column='Location', categorical=True, legend=True)
plt.show()
Task 3: Load High Seas Marine Zones data¶
hz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/hz04jn14/')
hz_df.head()
hz_df.describe()
hz_df.crs
hz_df.plot()
plt.show()
Task 4: Load "Natural Earth" countries dataset, bundled with GeoPandas¶
"Natural Earth is a public domain map dataset available at 1:10m, 1:50m, and 1:110 million scales. Featuring tightly integrated vector and raster data, with Natural Earth you can make a variety of visually pleasing, well-crafted maps with cartography or GIS software." It (a subset?) comes bundled with GeoPandas and is accessible from the gpd.datasets module. We'll use it as a helpful global base layer map.
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
world.head(2)
world.crs
world.plot()
plt.show()
Task 5: Map plot overlays: Plotting multiple spatial layers¶
Here's a compact, quick way of Using GeoDataFrame plot method to overlay two GeoDataFrame. In this case we are overlaying the world and the Offshore Marine Zones.
world.plot(ax=oz_df.plot(cmap='Set2', alpha=1), alpha=1)
plt.show()
fig, ax = plt.subplots()
# set aspect to equal. This is done automatically
# when using *geopandas* plot on it's own, but not when
# working with pyplot directly.
ax.set_aspect('equal')
world.plot(ax=ax)
oz_df.plot(ax=ax)
hz_df.plot(ax=ax, color='blue')
plt.show();
Task 6: Preparing Data for PySpark¶
The following cells will introduce some of the aggregation functions available to us in Spark SQL.
Spark expects a geospatial column as a WKT string. Internally it uses this to create OGC Geometries via Java Topology Suite (JTS). So in order to use Spatial Spark we will add the WKT column to our data.
world['wkt'] = pd.Series(
map(lambda geom: str(geom.to_wkt()), world['geometry']),
index=world.index, dtype='string')
oz_df['wkt'] = pd.Series(
map(lambda geom: str(geom.to_wkt()), oz_df['geometry']),
index=oz_df.index, dtype='string')
hz_df['wkt'] = pd.Series(
map(lambda geom: str(geom.to_wkt()), hz_df['geometry']),
index=hz_df.index, dtype='string')
mz_df['wkt'] = pd.Series(
map(lambda geom: str(geom.to_wkt()), mz_df['geometry']),
index=mz_df.index, dtype='string')
# drop the geometry column because Spark can't infer
# a schema for it when it's a nested geometry shape
def drop_geometry_column(dataframe, col_drop="geometry"):
return dataframe.drop(col_drop, axis=1)
Task 7: Create a Spark dataframe from GeoPandas¶
world_spark_df = spark.createDataFrame(drop_geometry_column(world)).cache()
oz_spark_df = spark.createDataFrame(drop_geometry_column(oz_df)).cache()
hz_spark_df = spark.createDataFrame(drop_geometry_column(hz_df)).cache()
mz_spark_df = spark.createDataFrame(mz_df).cache()
mz_spark_df.printSchema()
world_spark_df.printSchema()
oz_spark_df.printSchema()
Task 8: Get all countries from world_spark_df as Spark DataFrame¶
- Register a Temporary Table to allow us to call SQL-Like statements in Apache Spark against the points of Interests in SparkSQL DataFrame:
world_spark_df.createOrReplaceTempView("world")
# Select all countries
countries = spark.sql(
"""
SELECT name, wkt as geometry, pop_est, continent
FROM world
ORDER BY continent, name
""")
countries.show()
Task 9: Count how many countries we have in our countries dataset¶
print countries.count()
Task 10: Select Mexico from world dataframe and get its geographical boundary¶
mexico = spark.sql(
"""
SELECT wkt, name, pop_est, gdp_md_est
FROM world
WHERE name='Mexico'
""")
mexico_boundary = wkt.loads(mexico.take(1)[0].wkt)
wkt.loads(mexico.take(1)[0].wkt)
Task 11: Filter Offshore Marine Zones that intersect with Mexico Boundaries¶
from pyspark.sql.types import *
from pyspark.sql.functions import udf
# User defined funtion for filtering region of interset
intersection_udf = udf(lambda row: True if mexico_boundary.intersects(wkt.loads(row)) else False, BooleanType())
df = oz_spark_df.filter(intersection_udf(oz_spark_df["wkt"]))
df.show()
pdf = df.toPandas()
pdf
geometry = pdf['wkt'].map(wkt.loads)
crs = {'init': 'epsg:4326'}
gdf = gpd.GeoDataFrame(pdf, crs=crs, geometry=geometry)
gdf
# Set the region of interest color to red
world.plot(ax=gdf.plot(cmap='Set2', alpha=1, color='red'), alpha=1)
plt.show()
Task 12: Select Boundaries for Weather Forecast Office (WFO) Honolulu (HFO)¶
oz_spark_df.createOrReplaceTempView("offshore")
hfo = spark.sql("""
SELECT wkt
FROM offshore
WHERE WFO='HFO'
""")
hfo_boundaries = wkt.loads(hfo.take(1)[0].wkt)
hfo_boundaries
Task 13: Filtering High Seas Marine Zones that contain HFO station boundaries¶
# User defined funtion for filtering region of interset
within_udf = udf(lambda row: True if hfo_boundaries.within(wkt.loads(row)) else False, BooleanType())
df = hz_spark_df.filter(within_udf(hz_spark_df["wkt"]))
df.show()
pdf = df.toPandas()
pdf
geometry = pdf['wkt'].map(wkt.loads)
crs = {'init': 'epsg:4326'}
gdf = gpd.GeoDataFrame(pdf, crs=crs, geometry=geometry)
gdf
world.plot(ax=gdf.plot(cmap='Set2', alpha=1, color='red'), alpha=1)
plt.show()