edit 003-Coastal Marine Zones

This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis

Introduction¶

The Coastal Marine Zones dataset used in this notebook can be found here.

Abstract:

Coastal marine forecasts and Special Marine Warnings are issued by zone, each zone identified by a text description and a Universal Generic Code (UGC). These forecasts are prepared by the individual Weather Forecast Office responsible for the zone.

Purpose:

To deliniate the coastal marine zones for the use in creating coastal marine forecasts and warnings.

Import Packages¶

In [1]:
import os.path, json, io
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 20)

import pyspark.sql.functions as func # resuse as func.coalace for example
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql import SparkSession

import pandas as pd
import geopandas as gpd
from geopandas import GeoDataFrame # Loading boundaries Data
from shapely.geometry import Point, Polygon, shape # creating geospatial data
from shapely import wkb, wkt # creating and parsing geospatial data
from ast import literal_eval as make_tuple # used to decode data from java
In [2]:
# Create SparkSession and attach Sparkcontext to it
spark = SparkSession.builder.appName("coastal-marine-zones").getOrCreate()
sc = spark.sparkContext

Task1 : Load Coastal Marine ZonesData¶

In [3]:
# Load the Coastal Marine Zones data
mz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/mz11au16/')
In [4]:
mz_df.columns
Out[4]:
Index([u'GL_WFO', u'ID', u'LAT', u'LON', u'NAME', u'WFO', u'geometry'], dtype='object')
In [5]:
mz_df.head()
Out[5]:
GL_WFO ID LAT LON NAME WFO geometry
0 PHZ113 21.616939 -158.972420 Kauai Channel HFO POLYGON ((-158.745995314 22.08591056400007, -1...
1 PHZ112 21.664122 -160.231390 Kauai Leeward Waters HFO POLYGON ((-159.62633932 22.19913730500002, -15...
2 GMZ155 26.906845 -97.157908 Coastal waters from Baffin Bay to Port Mansfie... BRO POLYGON ((-96.88227653499996 26.59007835400007...
3 GMZ657 25.453694 -81.389492 Coastal waters from East Cape Sable to Chokolo... MFL POLYGON ((-81.34737199999995 25.82093900000007...
4 GMZ656 25.953719 -81.872501 Coastal waters from Chokoloskee to Bonita Beac... MFL (POLYGON ((-81.35332512199994 25.8227071780000...

Detailed Description of the columns:

  • ID: Identifier of zone
  • WFO: Identifier of organization responsible for forecast to the zone
  • NAME: Name of zone
  • LON: Longitude of centroid zone
  • LAT: Latitude of centroid of zone
In [6]:
mz_df.describe()
Out[6]:
LAT LON
count 497.000000 497.000000
mean 38.750343 -90.627044
std 12.009424 47.737679
min -14.310718 -175.184644
25% 29.924015 -97.349263
50% 40.793045 -85.694244
75% 45.002633 -76.643209
max 71.681322 174.461826
In [7]:
mz_df.crs
Out[7]:
{'init': u'epsg:4269'}

Task 2: Load Offshore Marine Zones data¶

In [8]:
oz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/oz01ap14b/')
In [9]:
oz_df.head()
Out[9]:
ID LAT LON Location Name WFO geometry
0 AMZ031 11.83763 -79.34953 from 11N-15N between 72W-80W Caribbean from 11N to 15N between 72W and 80W ... NH2 POLYGON ((-72.00606649499997 15.03032157200005...
1 AMZ013 18.75795 -81.74130 north of 18N between 76W-85W Caribbean N of 18N between 76W and 85W includi... NH2 POLYGON ((-82.34419558999997 22.48400648500007...
2 AMZ037 14.44086 -59.99007 from 7N-15N between 55W-60W Tropical N Atlantic from 7N and 15N between 55... NH2 POLYGON ((-54.98014895199998 15.00294936700004...
3 AMZ121 24.81218 -72.99113 from 22N-27N between 65W-70W Atlantic from 22N to 27N between 65W and 70W NH2 POLYGON ((-64.99909230599997 22.00331475900003...
4 AMZ011 18.75795 -81.74130 from 18N-22N west of 85W to Yucatan Peninsula ... Caribbean Nof 18N W of 85W including Yucatan B... NH2 POLYGON ((-85.00351240299995 21.65418662700006...
In [10]:
oz_df.describe()
Out[10]:
LAT LON
count 80.000000 80.000000
mean 33.997171 -99.040287
std 14.489604 33.844106
min 11.837630 -184.250000
25% 24.003513 -127.531540
50% 36.102950 -81.741300
75% 41.056950 -72.935130
max 73.283000 -59.990070
In [11]:
oz_df.crs
Out[11]:
{'init': u'epsg:4269'}
In [12]:
oz_df.plot()
plt.show()
In [13]:
oz_df.plot(column='WFO', categorical=True, legend=True)
plt.show()
In [14]:
oz_df.plot(column='Location', categorical=True, legend=True)
plt.show()

Task 3: Load High Seas Marine Zones data¶

In [15]:
hz_df = GeoDataFrame.from_file('/Users/abanihi/Documents/shapefiles/marine-zones/hz04jn14/')
In [16]:
hz_df.head()
Out[16]:
LAT LON NAME WFO geometry id
0 48.95213 -44.40468 North Atlantic Ocean between 31N and 67N latit... OPC (POLYGON ((-81.51807499966134 31.0045729997444...
1 18.96140 -66.43170 Atlantic Ocean West of 35W longitude between 3... NHC (POLYGON ((-80.76558399960174 25.1674449997871...
2 -9.13155 -95.25638 Eastern South Pacific Ocean between 3.4S and ... NHC (POLYGON ((-70.40097017504525 -18.349999999999...
3 -12.71287 -149.99985 Central South Pacific Ocean between the Equato... HPA (POLYGON ((-139.9999999999999 0.00026303200030...
4 15.00008 -108.51445 Eastern North Pacific Ocean between the Equato... NHC POLYGON ((-114.9738617859924 31.92294689918015...
In [17]:
hz_df.describe()
Out[17]:
LAT LON
count 7.000000 7.000000
mean 17.693301 -110.357616
std 24.299462 44.638814
min -12.712870 -160.000000
25% 2.934225 -148.948050
50% 15.000080 -108.514450
75% 33.372660 -80.844040
max 48.952130 -44.404680
In [18]:
hz_df.crs
Out[18]:
{'init': u'epsg:4269'}
In [19]:
hz_df.plot()
plt.show()

Task 4: Load "Natural Earth" countries dataset, bundled with GeoPandas¶

"Natural Earth is a public domain map dataset available at 1:10m, 1:50m, and 1:110 million scales. Featuring tightly integrated vector and raster data, with Natural Earth you can make a variety of visually pleasing, well-crafted maps with cartography or GIS software." It (a subset?) comes bundled with GeoPandas and is accessible from the gpd.datasets module. We'll use it as a helpful global base layer map.

In [20]:
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
world.head(2)
Out[20]:
continent gdp_md_est geometry iso_a3 name pop_est
0 Asia 22270.0 POLYGON ((61.21081709172574 35.65007233330923,... AFG Afghanistan 28400000.0
1 Africa 110300.0 (POLYGON ((16.32652835456705 -5.87747039146621... AGO Angola 12799293.0
In [21]:
world.crs
Out[21]:
{'init': u'epsg:4326'}
In [22]:
world.plot()
plt.show()

Task 5: Map plot overlays: Plotting multiple spatial layers¶

Here's a compact, quick way of Using GeoDataFrame plot method to overlay two GeoDataFrame. In this case we are overlaying the world and the Offshore Marine Zones.

In [23]:
world.plot(ax=oz_df.plot(cmap='Set2', alpha=1), alpha=1)
plt.show()
In [24]:
fig, ax = plt.subplots()
In [25]:
# set aspect to equal. This is done automatically
# when using *geopandas* plot on it's own, but not when
# working with pyplot directly.
ax.set_aspect('equal')
In [26]:
world.plot(ax=ax)
oz_df.plot(ax=ax)
hz_df.plot(ax=ax, color='blue')
plt.show();

Task 6: Preparing Data for PySpark¶

The following cells will introduce some of the aggregation functions available to us in Spark SQL.

Spark expects a geospatial column as a WKT string. Internally it uses this to create OGC Geometries via Java Topology Suite (JTS). So in order to use Spatial Spark we will add the WKT column to our data.

In [27]:
world['wkt'] = pd.Series(
                map(lambda geom: str(geom.to_wkt()), world['geometry']),
                index=world.index, dtype='string')
In [28]:
oz_df['wkt'] = pd.Series(
                map(lambda geom: str(geom.to_wkt()), oz_df['geometry']),
                index=oz_df.index, dtype='string')
hz_df['wkt'] = pd.Series(
                map(lambda geom: str(geom.to_wkt()), hz_df['geometry']),
                index=hz_df.index, dtype='string')
mz_df['wkt'] = pd.Series(
                map(lambda geom: str(geom.to_wkt()), mz_df['geometry']),
                index=mz_df.index, dtype='string')
In [29]:
# drop the geometry column because Spark can't infer 
# a schema for it when it's a nested geometry shape
def drop_geometry_column(dataframe, col_drop="geometry"):
    return dataframe.drop(col_drop, axis=1)

Task 7: Create a Spark dataframe from GeoPandas¶

In [30]:
world_spark_df = spark.createDataFrame(drop_geometry_column(world)).cache()
In [31]:
oz_spark_df = spark.createDataFrame(drop_geometry_column(oz_df)).cache()
In [32]:
hz_spark_df = spark.createDataFrame(drop_geometry_column(hz_df)).cache()
In [33]:
mz_spark_df = spark.createDataFrame(mz_df).cache()
In [34]:
mz_spark_df.printSchema()
root
 |-- GL_WFO: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- WFO: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- __geom__: long (nullable = true)
 |    |-- _is_empty: boolean (nullable = true)
 |    |-- _ndim: long (nullable = true)
 |-- wkt: string (nullable = true)

In [35]:
world_spark_df.printSchema()
root
 |-- continent: string (nullable = true)
 |-- gdp_md_est: double (nullable = true)
 |-- iso_a3: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pop_est: double (nullable = true)
 |-- wkt: string (nullable = true)

In [36]:
oz_spark_df.printSchema()
root
 |-- ID: string (nullable = true)
 |-- LAT: double (nullable = true)
 |-- LON: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- WFO: string (nullable = true)
 |-- wkt: string (nullable = true)

Task 8: Get all countries from world_spark_df as Spark DataFrame¶

  • Register a Temporary Table to allow us to call SQL-Like statements in Apache Spark against the points of Interests in SparkSQL DataFrame:
In [37]:
world_spark_df.createOrReplaceTempView("world")
In [38]:
# Select all countries
countries = spark.sql(
            """
            SELECT name, wkt as geometry, pop_est, continent
            FROM world
            ORDER BY continent, name
            """)
In [39]:
countries.show()
+--------------------+--------------------+-----------+---------+
|                name|            geometry|    pop_est|continent|
+--------------------+--------------------+-----------+---------+
|             Algeria|POLYGON ((11.9995...|3.4178188E7|   Africa|
|              Angola|MULTIPOLYGON (((1...|1.2799293E7|   Africa|
|               Benin|POLYGON ((2.69170...|  8791832.0|   Africa|
|            Botswana|POLYGON ((25.6491...|  1990876.0|   Africa|
|        Burkina Faso|POLYGON ((-2.8274...|1.5746232E7|   Africa|
|             Burundi|POLYGON ((29.3399...|  8988091.0|   Africa|
|            Cameroon|POLYGON ((13.0758...|1.8879301E7|   Africa|
|Central African Rep.|POLYGON ((15.2794...|  4511488.0|   Africa|
|                Chad|POLYGON ((14.4957...|1.0329208E7|   Africa|
|               Congo|POLYGON ((12.9955...|  4012809.0|   Africa|
|       Côte d'Ivoire|POLYGON ((-2.8561...|2.0617068E7|   Africa|
|     Dem. Rep. Congo|POLYGON ((30.8338...|6.8692542E7|   Africa|
|            Djibouti|POLYGON ((43.0812...|   516055.0|   Africa|
|               Egypt|POLYGON ((34.9226...|8.3082869E7|   Africa|
|          Eq. Guinea|POLYGON ((9.49288...|   650702.0|   Africa|
|             Eritrea|POLYGON ((42.3515...|  5647168.0|   Africa|
|            Ethiopia|POLYGON ((37.9060...|8.5237338E7|   Africa|
|               Gabon|POLYGON ((11.0937...|  1514993.0|   Africa|
|              Gambia|POLYGON ((-16.841...|  1782893.0|   Africa|
|               Ghana|POLYGON ((1.06012...|2.3832495E7|   Africa|
+--------------------+--------------------+-----------+---------+
only showing top 20 rows

Task 9: Count how many countries we have in our countries dataset¶

In [40]:
print countries.count()
177

Task 10: Select Mexico from world dataframe and get its geographical boundary¶

In [41]:
mexico = spark.sql(
    """
    SELECT wkt, name, pop_est, gdp_md_est
    FROM world
    WHERE name='Mexico'
    """)
In [42]:
mexico_boundary = wkt.loads(mexico.take(1)[0].wkt)
In [43]:
wkt.loads(mexico.take(1)[0].wkt)
Out[43]:

Task 11: Filter Offshore Marine Zones that intersect with Mexico Boundaries¶

In [44]:
from pyspark.sql.types import *
In [45]:
from pyspark.sql.functions import udf
In [46]:
# User defined funtion for filtering region of interset
intersection_udf = udf(lambda row: True if mexico_boundary.intersects(wkt.loads(row)) else False, BooleanType())
In [48]:
df = oz_spark_df.filter(intersection_udf(oz_spark_df["wkt"]))
df.show()
+------+--------+---------+--------------------+--------------------+---+--------------------+
|    ID|     LAT|      LON|            Location|                Name|WFO|                 wkt|
+------+--------+---------+--------------------+--------------------+---+--------------------+
|AMZ011|18.75795| -81.7413|from 18N-22N west...|Caribbean Nof 18N...|NH2|POLYGON ((-85.003...|
|GMZ025|21.57751|-93.95211|south of 22N betw...|E Bay of Campeche...|NH2|POLYGON ((-87.005...|
|GMZ017|21.57751|-93.95211|between 22N-26N w...|W Central Gulf fr...|NH2|POLYGON ((-93.992...|
|GMZ023|21.57751|-93.95211|south of 22N west...|SW Gulf S of 22N ...|NH2|POLYGON ((-93.999...|
+------+--------+---------+--------------------+--------------------+---+--------------------+

In [49]:
pdf = df.toPandas()
pdf
Out[49]:
ID LAT LON Location Name WFO wkt
0 AMZ011 18.75795 -81.74130 from 18N-22N west of 85W to Yucatan Peninsula ... Caribbean Nof 18N W of 85W including Yucatan B... NH2 POLYGON ((-85.0035124029999452 21.654186627000...
1 GMZ025 21.57751 -93.95211 south of 22N between 87W-94W E Bay of Campeche including Campeche Bank NH2 POLYGON ((-87.0053766059999703 22.014180185000...
2 GMZ017 21.57751 -93.95211 between 22N-26N west of 94W W Central Gulf from 22N to 26N W of 94W NH2 POLYGON ((-93.9921787359999712 26.006293284000...
3 GMZ023 21.57751 -93.95211 south of 22N west of 94W SW Gulf S of 22N W of 94W NH2 POLYGON ((-93.9995355959999870 22.011966694000...
In [51]:
geometry = pdf['wkt'].map(wkt.loads)
crs = {'init': 'epsg:4326'}
gdf = gpd.GeoDataFrame(pdf, crs=crs, geometry=geometry)
In [52]:
gdf
Out[52]:
ID LAT LON Location Name WFO wkt geometry
0 AMZ011 18.75795 -81.74130 from 18N-22N west of 85W to Yucatan Peninsula ... Caribbean Nof 18N W of 85W including Yucatan B... NH2 POLYGON ((-85.0035124029999452 21.654186627000... POLYGON ((-85.00351240299995 21.65418662700006...
1 GMZ025 21.57751 -93.95211 south of 22N between 87W-94W E Bay of Campeche including Campeche Bank NH2 POLYGON ((-87.0053766059999703 22.014180185000... POLYGON ((-87.00537660599997 22.01418018500004...
2 GMZ017 21.57751 -93.95211 between 22N-26N west of 94W W Central Gulf from 22N to 26N W of 94W NH2 POLYGON ((-93.9921787359999712 26.006293284000... POLYGON ((-93.99217873599997 26.00629328400004...
3 GMZ023 21.57751 -93.95211 south of 22N west of 94W SW Gulf S of 22N W of 94W NH2 POLYGON ((-93.9995355959999870 22.011966694000... POLYGON ((-93.99953559599999 22.01196669400002...
In [57]:
# Set the region of interest color to red
world.plot(ax=gdf.plot(cmap='Set2', alpha=1, color='red'), alpha=1)
plt.show()

Task 12: Select Boundaries for Weather Forecast Office (WFO) Honolulu (HFO)¶

In [58]:
oz_spark_df.createOrReplaceTempView("offshore")
In [59]:
hfo = spark.sql("""
        SELECT wkt
        FROM offshore
        WHERE WFO='HFO'
        """)
In [60]:
hfo_boundaries = wkt.loads(hfo.take(1)[0].wkt)
hfo_boundaries
Out[60]:

Task 13: Filtering High Seas Marine Zones that contain HFO station boundaries¶

In [61]:
# User defined funtion for filtering region of interset
within_udf = udf(lambda row: True if hfo_boundaries.within(wkt.loads(row)) else False, BooleanType())
In [62]:
df = hz_spark_df.filter(within_udf(hz_spark_df["wkt"]))
df.show()
+----+------+--------------------+---+---+--------------------+
| LAT|   LON|                NAME|WFO| id|                 wkt|
+----+------+--------------------+---+---+--------------------+
|15.0|-160.0|Central North Pac...|HPA|   |MULTIPOLYGON (((-...|
+----+------+--------------------+---+---+--------------------+

In [63]:
pdf = df.toPandas()
In [64]:
pdf
Out[64]:
LAT LON NAME WFO id wkt
0 15.0 -160.0 Central North Pacific Ocean between the Equato... HPA MULTIPOLYGON (((-139.9999999999998863 25.00000...
In [65]:
geometry = pdf['wkt'].map(wkt.loads)
crs = {'init': 'epsg:4326'}
gdf = gpd.GeoDataFrame(pdf, crs=crs, geometry=geometry)
In [66]:
gdf
Out[66]:
LAT LON NAME WFO id wkt geometry
0 15.0 -160.0 Central North Pacific Ocean between the Equato... HPA MULTIPOLYGON (((-139.9999999999998863 25.00000... (POLYGON ((-139.9999999999999 25.0000000000003...
In [68]:
world.plot(ax=gdf.plot(cmap='Set2', alpha=1, color='red'), alpha=1)
plt.show()