edit 03-trimmed-mean

This notebook was put together by Anderson Banihirwe as part of 2017 CISL/SIParCS Research Project : PySpark for Big Atmospheric & Oceanic Data Analysis

Trimmed Mean¶

  • Discards a percentage of the outlying values before calculating the arithmetic average.
  • A measure that incorporates characteristics of the mean and the median.
  • Less affected by outliers than the untrimmed average.
  • A $x\%$ trimmed mean will eliminate the largest $x\%$ and the smallest $x\%$ of the sample before calculated the mean.
  • Typical range for $x\%$ is $5\%$ to $25\%$.

Source: Wilks, Daniel S. Statistical Methods in the Atmospheric Sciences. p 26.

Example:¶

  • Calculate the $20\%$ trimmed mean of spatially averaged temperature data.
  • The dataset can be found on NCAR's Glade:

    • /glade/p/CMIP/CMIP5/output1/NOAA-GFDL/GFDL-ESM2M/rcp85/mon/atmos/Amon/r1i1p1/v20111228/ta/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc

Step 1: Load Dataset in a Spark dataframe¶

In [1]:
from pyspark4climate import read
from pyspark4climate.functions import shift_lon_udf
from pyspark.sql import SparkSession
import geopandas as gpd
import pandas as pd
import seaborn as sns
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (12, 15)
%matplotlib inline
import matplotlib.pyplot as plt
jet=plt.get_cmap('coolwarm')  # Used for multiple scatter plots
In [2]:
spark = SparkSession.builder.appName("trimmed-mean").getOrCreate()
sc = spark.sparkContext
In [3]:
!ncdump -h ../data/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc
netcdf ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012 {
dimensions:
    plev = 17 ;
    time = UNLIMITED ; // (60 currently)
    lat = 90 ;
    lon = 144 ;
    bnds = 2 ;
variables:
    double plev(plev) ;
        plev:units = "Pa" ;
        plev:long_name = "pressure" ;
        plev:axis = "Z" ;
        plev:positive = "down" ;
        plev:standard_name = "air_pressure" ;
    double average_DT(time) ;
        average_DT:long_name = "Length of average period" ;
        average_DT:units = "days" ;
    double average_T1(time) ;
        average_T1:long_name = "Start time for average period" ;
        average_T1:units = "days since 2006-01-01 00:00:00" ;
    double average_T2(time) ;
        average_T2:long_name = "End time for average period" ;
        average_T2:units = "days since 2006-01-01 00:00:00" ;
    double lat(lat) ;
        lat:long_name = "latitude" ;
        lat:units = "degrees_north" ;
        lat:standard_name = "latitude" ;
        lat:axis = "Y" ;
        lat:bounds = "lat_bnds" ;
    double lon(lon) ;
        lon:long_name = "longitude" ;
        lon:units = "degrees_east" ;
        lon:standard_name = "longitude" ;
        lon:axis = "X" ;
        lon:bounds = "lon_bnds" ;
    double bnds(bnds) ;
        bnds:long_name = "vertex number" ;
        bnds:cartesian_axis = "N" ;
    float ta(time, plev, lat, lon) ;
        ta:long_name = "Air Temperature" ;
        ta:units = "K" ;
        ta:valid_range = 100.f, 350.f ;
        ta:missing_value = 1.e+20f ;
        ta:cell_methods = "time: mean" ;
        ta:_FillValue = 1.e+20f ;
        ta:standard_name = "air_temperature" ;
        ta:original_units = "K" ;
        ta:original_name = "temp" ;
        ta:cell_measures = "area: areacella" ;
        ta:associated_files = "baseURL: http://cmip-pcmdi.llnl.gov/CMIP5/dataLocation areacella: areacella_fx_GFDL-ESM2M_rcp85_r0i0p0.nc" ;
    double time(time) ;
        time:long_name = "time" ;
        time:units = "days since 2006-01-01 00:00:00" ;
        time:cartesian_axis = "T" ;
        time:calendar_type = "noleap" ;
        time:calendar = "noleap" ;
        time:bounds = "time_bnds" ;
        time:standard_name = "time" ;
        time:axis = "T" ;
    double time_bnds(time, bnds) ;
        time_bnds:long_name = "time axis boundaries" ;
        time_bnds:units = "days since 2006-01-01 00:00:00" ;
    double lat_bnds(lat, bnds) ;
    double lon_bnds(lon, bnds) ;

// global attributes:
        :title = "NOAA GFDL GFDL-ESM2M, RCP8.5 (run 1) experiment output for CMIP5 AR5" ;
        :institute_id = "NOAA GFDL" ;
        :source = "GFDL-ESM2M 2010 ocean: MOM4 (MOM4p1_x1_Z50_cCM2M,Tripolar360x200L50); atmosphere: AM2 (AM2p14,M45L24); sea ice: SIS (SISp2,Tripolar360x200L50); land: LM3 (LM3p7_cESM,M45)" ;
        :contact = "gfdl.climate.model.info@noaa.gov" ;
        :project_id = "CMIP5" ;
        :table_id = "Table Amon (31 Jan 2011)" ;
        :experiment_id = "rcp85" ;
        :realization = 1 ;
        :modeling_realm = "atmos" ;
        :tracking_id = "095bd34a-27ca-4f5a-aaf2-ac2edba4317e" ;
        :Conventions = "CF-1.4" ;
        :references = "The GFDL Data Portal (http://nomads.gfdl.noaa.gov/) provides access to NOAA/GFDL\'s publicly available model input and output data sets. From this web site one can view and download data sets and documentation, including those related to the GFDL coupled models experiments run for the IPCC\'s 5th Assessment Report and the US CCSP." ;
        :comment = "GFDL experiment name = ESM2M-HC1_2006-2100_all_rcp85_ZC1. PCMDI experiment name = RCP8.5 (run1). Initial conditions for this experiment were taken from 1 January 2006 of the parent experiment, ESM2M-C1_all_historical_HC1 (historical). Several forcing agents varied during the 95 year duration of the RCP8.5 experiment based upon the MESSAGE integrated assessment model for the 21st century. The time-varying forcing agents include the well-mixed greenhouse gases (CO2, CH4, N2O, halons), tropospheric and stratospheric O3, model-derived aerosol concentrations (sulfate, black and organic carbon, sea salt and dust), and land use transitions. Volcanic aerosols were zero and solar irradiance varied seasonally based upon late 20th century averages but with no interannual variation. The direct effect of tropospheric aerosols is calculated by the model, but not the indirect effects." ;
        :gfdl_experiment_name = "ESM2M-HC1_2006-2100_all_rcp85_ZC1" ;
        :creation_date = "2011-08-12T06:08:37Z" ;
        :model_id = "GFDL-ESM2M" ;
        :branch_time = "52925" ;
        :experiment = "RCP8.5" ;
        :forcing = "GHG,SD,Oz,LU,SS,BC,MD,OC (GHG includes CO2, CH4, N2O, CFC11, CFC12, HCFC22, CFC113)" ;
        :frequency = "mon" ;
        :initialization_method = 1 ;
        :parent_experiment_id = "historical" ;
        :physics_version = 1 ;
        :product = "output1" ;
        :institution = "NOAA GFDL(201 Forrestal Rd, Princeton, NJ, 08540)" ;
        :history = "File was processed by fremetar (GFDL analog of CMOR). TripleID: [exper_id_NzrgHxy3PV,realiz_id_ntKPD70REo,run_id_fBzP3eBa0W]" ;
        :parent_experiment_rip = "r1i1p1" ;
}
In [4]:
filename='../data/ta_Amon_GFDL-ESM2M_rcp85_r1i1p1_200601-201012.nc'
var = 'ta'
In [5]:
data = read.DataFrame(sc, (filename, var), mode='single')

Pyspark4climate DataFrame class returns an object. In order to access spark's dataframe we need to do the following:

In [6]:
type(data)
Out[6]:
pyspark4climate.read.DataFrame
In [7]:
data_df = data.df
type(data_df)
Out[7]:
pyspark.sql.dataframe.DataFrame
In [8]:
data_df.show()
+-------------------+--------+------------------+------------------+----+
|               time|    plev|               lat|               lon|  ta|
+-------------------+--------+------------------+------------------+----+
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            256.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            258.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            261.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            263.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            266.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            268.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            271.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|273.74999999999994|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            276.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            278.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|236.24999999999997|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            238.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            241.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            243.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            246.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            248.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            251.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            253.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            256.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            258.75|null|
+-------------------+--------+------------------+------------------+----+
only showing top 20 rows

In [9]:
# Print the schema of data_df dataframe
data_df.printSchema()
root
 |-- time: string (nullable = true)
 |-- plev: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- ta: double (nullable = true)

Step 2: Shift longitudes on grid so that they are in range [-180 -> 180]¶

To achieve this we will use pyspark4climate builtin function shift_grid_udf()

In [10]:
# Shift grid and Drop the lon column
data_df = data_df.withColumn("shifted_lon", shift_lon_udf(data_df["lon"])).cache()
data_df = data_df.selectExpr("time", "plev", "lat", "shifted_lon as lon", "ta")
In [11]:
data_df.show()
+-------------------+--------+------------------+-------------------+----+
|               time|    plev|               lat|                lon|  ta|
+-------------------+--------+------------------+-------------------+----+
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            -103.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|            -101.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -98.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -96.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -93.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -91.25|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -88.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527| -86.25000000000006|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -83.75|null|
|2006-01-16 12:00:00|100000.0|-85.95505617977527|             -81.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|-123.75000000000003|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -121.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -118.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -116.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -113.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -111.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -108.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -106.25|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -103.75|null|
|2006-01-16 12:00:00|100000.0|-71.79775280898876|            -101.25|null|
+-------------------+--------+------------------+-------------------+----+
only showing top 20 rows

Step 2: Select Temporal and Spatial Domains¶

Select North-America: Region with only values 60W to 130W, 20N to 70N

In [12]:
import pyspark.sql.functions as F
In [13]:
df = data_df.filter((data_df["lon"] <= -60) & (data_df["lon"] >=-130) &\
                 (data_df["lat"] >=20) & (data_df["lat"] <=70))\
                 .orderBy(F.col('time'), F.col('lat'), F.col('lon'))
df.show()
+-------------------+--------+------------------+-------------------+------------------+
|               time|    plev|               lat|                lon|                ta|
+-------------------+--------+------------------+-------------------+------------------+
|2006-01-16 12:00:00| 85000.0|21.235955056179776|-128.74999999999997|  284.496337890625|
|2006-01-16 12:00:00| 50000.0|21.235955056179776|-128.74999999999997| 261.9516906738281|
|2006-01-16 12:00:00| 70000.0|21.235955056179776|-128.74999999999997|278.41375732421875|
|2006-01-16 12:00:00|100000.0|21.235955056179776|-128.74999999999997|290.64520263671875|
|2006-01-16 12:00:00| 25000.0|21.235955056179776|-128.74999999999997|226.87403869628906|
|2006-01-16 12:00:00| 40000.0|21.235955056179776|-128.74999999999997|250.10055541992188|
|2006-01-16 12:00:00| 60000.0|21.235955056179776|-128.74999999999997|  271.363037109375|
|2006-01-16 12:00:00| 15000.0|21.235955056179776|-128.74999999999997| 208.1819610595703|
|2006-01-16 12:00:00|  5000.0|21.235955056179776|-128.74999999999997|210.75625610351562|
|2006-01-16 12:00:00| 92500.0|21.235955056179776|-128.74999999999997| 284.8731384277344|
|2006-01-16 12:00:00|  3000.0|21.235955056179776|-128.74999999999997|218.32974243164062|
|2006-01-16 12:00:00|  2000.0|21.235955056179776|-128.74999999999997|      224.27734375|
|2006-01-16 12:00:00|  7000.0|21.235955056179776|-128.74999999999997|205.61280822753906|
|2006-01-16 12:00:00| 10000.0|21.235955056179776|-128.74999999999997|200.11734008789062|
|2006-01-16 12:00:00| 20000.0|21.235955056179776|-128.74999999999997|  217.828369140625|
|2006-01-16 12:00:00| 30000.0|21.235955056179776|-128.74999999999997|235.12181091308594|
|2006-01-16 12:00:00|  1000.0|21.235955056179776|-128.74999999999997| 232.8317108154297|
|2006-01-16 12:00:00| 10000.0|21.235955056179776|-126.24999999999997| 200.3651580810547|
|2006-01-16 12:00:00|  7000.0|21.235955056179776|-126.24999999999997|205.86260986328125|
|2006-01-16 12:00:00|  5000.0|21.235955056179776|-126.24999999999997|210.97164916992188|
+-------------------+--------+------------------+-------------------+------------------+
only showing top 20 rows

Step 3: Calculate Spatial Average¶

This operation computes a spatial mean. For each month, the temperature data at each spatial grid point is averaged together to generate one value.

In [14]:
spatial_avg = df.groupby('time')\
                .agg(F.avg('ta').alias('mean_ta'))\
                .orderBy(F.col('time')).cache()
spatial_avg.show()
+-------------------+------------------+
|               time|           mean_ta|
+-------------------+------------------+
|2006-01-16 12:00:00|231.59926081445633|
|2006-02-15 00:00:00|232.37893242814513|
|2006-03-16 12:00:00| 234.1541619403338|
|2006-04-16 00:00:00|235.78591857085357|
|2006-05-16 12:00:00|238.01055345426494|
|2006-06-16 00:00:00|240.42150196670758|
|2006-07-16 12:00:00| 241.9528985937708|
|2006-08-16 12:00:00| 241.2648552284146|
|2006-09-16 00:00:00| 239.6473727575904|
|2006-10-16 12:00:00|237.19174075318733|
|2006-11-16 00:00:00|234.90129093010106|
|2006-12-16 12:00:00|232.37434658849983|
|2007-01-16 12:00:00| 233.7475353484458|
|2007-02-15 00:00:00|234.06008903985523|
|2007-03-16 12:00:00|234.13952266820945|
|2007-04-16 00:00:00|236.35817862969358|
|2007-05-16 12:00:00|238.67507688947597|
|2007-06-16 00:00:00| 240.9092952795669|
|2007-07-16 12:00:00|242.37356627412072|
|2007-08-16 12:00:00|241.68422413936645|
+-------------------+------------------+
only showing top 20 rows

In [17]:
spatial_avg.count()
Out[17]:
60

Step 4: Calculate Trimmed Mean¶

  • Use Spark's approxQuantile(col, probabilities, relativeError) to calculate the approximate quantiles of a numerical column of a DataFrame. This function gives us the lowest $20\%$ of the values and the highest $20\%$ of the values from the dataset.
In [15]:
lowest_highest_20th_values = spatial_avg.approxQuantile("mean_ta", [0.2, 0.8], 0.001)
lowest_highest_20th_values
Out[15]:
[233.01154807122214, 240.67232534998033]
  • Discard the lowest $20\%$ of the values and the highest $20\%$ of the values from the dataset.
In [16]:
trimmed_mean = spatial_avg.where((spatial_avg["mean_ta"] > lowest_highest_20th_values[0])\
                                 & (spatial_avg["mean_ta"] < lowest_highest_20th_values[1])).cache()
trimmed_mean.show()
+-------------------+------------------+
|               time|           mean_ta|
+-------------------+------------------+
|2006-03-16 12:00:00| 234.1541619403338|
|2006-04-16 00:00:00|235.78591857085357|
|2006-05-16 12:00:00|238.01055345426494|
|2006-06-16 00:00:00|240.42150196670758|
|2006-09-16 00:00:00| 239.6473727575904|
|2006-10-16 12:00:00|237.19174075318733|
|2006-11-16 00:00:00|234.90129093010106|
|2007-01-16 12:00:00| 233.7475353484458|
|2007-02-15 00:00:00|234.06008903985523|
|2007-03-16 12:00:00|234.13952266820945|
|2007-04-16 00:00:00|236.35817862969358|
|2007-05-16 12:00:00|238.67507688947597|
|2007-09-16 00:00:00|239.50740400122888|
|2007-10-16 12:00:00|236.77807576356787|
|2007-11-16 00:00:00|234.64059489138126|
|2008-02-15 00:00:00| 233.0450955027923|
|2008-03-15 12:00:00|234.47422504273217|
|2008-04-15 00:00:00|236.04082823775568|
|2008-05-15 12:00:00|238.70110128101425|
|2008-06-15 00:00:00|240.67232534998033|
+-------------------+------------------+
only showing top 20 rows

In [18]:
trimmed_mean.count()
Out[18]:
37

4.1 Plot the trimmed-mean values and Spatial mean values¶

In [19]:
# convert the spark dataframe to pandas dataframe for visualization
df = trimmed_mean.toPandas()
df.head()
Out[19]:
time mean_ta
0 2006-03-16 12:00:00 234.154162
1 2006-04-16 00:00:00 235.785919
2 2006-05-16 12:00:00 238.010553
3 2006-06-16 00:00:00 240.421502
4 2006-09-16 00:00:00 239.647373
In [20]:
df.describe()
Out[20]:
mean_ta
count 37.000000
mean 236.560735
std 2.379737
min 233.011548
25% 234.416524
50% 236.358179
75% 238.675077
max 240.672325
In [21]:
df = df.set_index('time')
df.head()
Out[21]:
mean_ta
time
2006-03-16 12:00:00 234.154162
2006-04-16 00:00:00 235.785919
2006-05-16 12:00:00 238.010553
2006-06-16 00:00:00 240.421502
2006-09-16 00:00:00 239.647373
In [23]:
ax = df['mean_ta'].plot(legend=True, figsize=(16, 8))
ax.set_xlabel("Time range [Jan-01-2006; ....; Dec-31-2010]")
ax.set_ylabel("Trimmed Mean Temperature [K]]")
ax.set_title("Trimmed Mean of Temperature at 60W to 130W, 20N to 70N for Jan 2006 - Dec 2010")
plt.show()

4.2 Plot the Spatial mean temperature values¶

In [24]:
spatial_avg_df = spatial_avg.toPandas()
spatial_avg_df.describe()
Out[24]:
mean_ta
count 60.000000
mean 236.757677
std 3.538398
min 231.549194
25% 233.926710
50% 236.512822
75% 239.926692
max 242.373566
In [25]:
spatial_avg_df = spatial_avg_df.set_index('time')
ax = spatial_avg_df['mean_ta'].plot(legend=True, figsize=(16, 8))
ax.set_xlabel("Time range [Jan-01-2006; ....; Dec-31-2010]")
ax.set_ylabel("Spatial Average [K]]")
plt.show()

4.3 Plot of Trimmed Mean and Spatial Mean Temperatures¶

In [35]:
ax = df['mean_ta'].plot(legend=True, figsize=(16, 8), label='Trimmed Mean')
ax = spatial_avg_df['mean_ta'].plot(legend=True, figsize=(16, 8), label='Spatial Mean')
ax.set_ylabel("Mean Temperature [K]")
ax.set_xlabel("Time range [Jan-01-2006; ....; Dec-31-2010]")
plt.show()