This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis
Median¶
- A commonly used robust and resistant measure of central tendency.
- Defined as the middle value when observations are ordered from smallest to largest.
- Divides the dataset into two parts of equal size, with 50% of the values below the median and 50% of the values above the median.
- Also known as the 50th percentile.
- Insensitive to extreme values.
Example:¶
- Calculate the spatial median of temperature data over North America for the period 2006-2010.
The dataset can be found on NCAR's Glade:
/glade/p/CMIP/CMIP5/output1/NOAA-GFDL/GFDL-ESM2M/rcp85/mon/atmos/Amon/r1i1p1/v20111228/ta/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc
Step 1: Load Dataset in a Spark dataframe¶
from pyspark4climate import read
from pyspark4climate.functions import shift_lon_udf
from pyspark.sql import SparkSession
import geopandas as gpd
import pandas as pd
import seaborn as sns
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (12, 15)
%matplotlib inline
import matplotlib.pyplot as plt
jet=plt.get_cmap('coolwarm') # Used for multiple scatter plots
spark = SparkSession.builder.appName("median").getOrCreate()
sc = spark.sparkContext
!ncdump -h ../data/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc
filename='../data/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc'
var = 'ta'
data = read.DataFrame(sc, (filename, var), mode='single')
Pyspark4climate DataFrame class returns an object. In order to access spark's dataframe we need to do the following:
type(data)
data_df = data.df
type(data_df)
data_df.show()
# Print the schema of data_df dataframe
data_df.printSchema()
Step 2: Shift longitudes on grid so that they are in range [-180 -> 180]¶
To achieve this we will use pyspark4climate
builtin function shift_grid_udf()
# Shift grid and Drop the lon column
data_df = data_df.withColumn("shifted_lon", shift_lon_udf(data_df["lon"])).cache()
data_df = data_df.selectExpr("time", "plev", "lat", "shifted_lon as lon", "ta")
data_df.show()
Step 3: Select Temporal and Spatial Domains¶
Select North-America: Region with only values 60W to 130W, 20N to 70N
import pyspark.sql.functions as F
df = data_df.filter((data_df["lon"] <= -60) & (data_df["lon"] >=-130) &\
(data_df["lat"] >=20) & (data_df["lat"] <=70))\
.orderBy(F.col('time'), F.col('lat'), F.col('lon'))
df.show()
df.cache()
Step 4: Calculate Spatial Median¶
This operation computes the spatial median of the data. The median longitude and median latitude values are located for each time step.
To compute the median using Spark, we will need to use Spark Window function. At its core, a window function calculates a return value for every input row of a table based on a group of rows, called the Frame. Every input row can have a unique frame associated with it. This characteristic of window functions makes them more powerful than other functions and allows users to express various data processing tasks that are hard (if not impossible) to be expressed without window functions in a concise way. You can get more details at this link: (https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html)
As a rule of thumb window definitions should always contain
PARTITION BY
clause otherwise Spark will move all data to a single partition.ORDER BY
is required for some functions, while in different cases (typically aggregates) may be optional.There are also two optional which can be used to define window span
ROWS BETWEEN
andRANGE BETWEEN
.
from pyspark.sql.window import Window
Once a function is marked as a window function, the next key step is to define the Window Specification associated with this function. A window specification defines which rows are included in the frame associated with a given input row. A window specification includes three parts:
- Partitioning Specification: controls which rows will be in the same partition with the given row. In our case, we will make sure all rows having the same value for the time column are collected to the same machine before ordering and calculating the frame. If no partitioning specification is given, then all data must be collected to a single machine.
- Ordering Specification: controls the way that rows in a partition are ordered, determining the position of the given row in its partition.
- Frame Specification: states which rows will be included in the frame for the current input row, based on their relative position to the current row. We don't need this in our case.
windowSpec = Window.partitionBy(df['time']).orderBy(df['ta'])
percent_rank()
returns the relative rank (i.e. percentile) of rows within a window partition.
df = df.select("time", "ta", F.percent_rank().over(windowSpec).alias("percent_rank"),
F.row_number().over(windowSpec).alias("row_number"))
df.show()
df.describe('row_number').show()
# For each time step there are 11900 rows
# Because our ta values are now order in ascending order, the median will be located at
# row_count / 2 + 1 if row_count is even or at row_count + 1 / 2 if row_count is odd.
row_count = 11900
row_num = (11900 / 2) + 1
spatial_median = df.select(df.time, df.ta, df.percent_rank,
df.row_number.between(row_num, row_num).alias('median_loc')).cache()
median = spatial_median.filter(spatial_median.median_loc == True).orderBy("time")
median.show()
Step 5: Plot the spatial median data¶
# convert the spark dataframe to pandas dataframe for visualization
df = median.toPandas()
df.describe()
df.head()
df = df.set_index('time').drop(['percent_rank', 'median_loc'], axis=1)
df.head()
ax = df['ta'].plot(legend=True, figsize=(16, 8))
ax.set_xlabel("Time range [Jan-01-2006; ....; Dec-31-2010]")
ax.set_ylabel("Spatial Median Temperature [K]]")
ax.set_title("Spatial Median of Temperature at 60W to 130W, 20N to 70N for Jan 2006 - Dec 2010")
plt.show()
The spatial median time series looks similar to the spatial average time series from the previous example (https://github.com/NCAR/CMIPscripts/blob/master/pyspark/Statistical-Techniques/01-Measures-of-Central-Tendency/01-mean.ipynb). Yet, even though the two images look similar, they are not identical. In a non-symmetrical dataset such as this, the median is different from the mean.