Benchmarking Performance of History vs. Timeseries Files with ecgtools
, Intake-ESM
, and Dask
#
In this example, we will look at how long reading data from the Community Earth System Model (CESM), applying calculations, and visualizing the output takes using the following packages:
We are going to investigate whether it is faster to do these operations on the history files output by the model or on time series files that have been generated from the history files. Our hypothesis is that performance should be substantially better when reading from timeseries files, but let’s take a look…
We use CESM data on the GLADE filesystem, from a case which includes both history and timeseries files on disk.
Imports#
Installing packages via conda-forge
#
As of this week, ecgtools
is available via conda-forge
, which is very exciting! You can install the packages used here using the following:
conda install -c conda-forge ecgtools ncar-jobqueue distributed intake-esm pandas
We will also install hvPlot
to help with visualization, installing from the pyviz
channel!
conda install -c pyviz hvplot
import ast
import time
import warnings
warnings.filterwarnings("ignore")
import holoviews as hv
import hvplot
import hvplot.pandas
import intake
import pandas as pd
from dask.distributed import performance_report
from distributed import Client
from ecgtools import Builder
from ecgtools.parsers.cesm import parse_cesm_history, parse_cesm_timeseries
from IPython.core.display import HTML
from ncar_jobqueue import NCARCluster
hv.extension('bokeh')
Spin up a Dask Cluster#
In this example, we jump directly into the computation! If you are interested in building the catalogs used in the example, checkout the Appendix
The cluster here is using a set number of workers (19) other tests in the future could investigate the scalability by modifying the computational resources used, using different cluster configurations
cluster = NCARCluster(memory='25 GB', cores=1, walltime='06:00:00')
cluster.scale(19)
client = Client(cluster)
cluster
Read the Catalogs Using Intake-ESM#
Here, we read the catalogs we just created in the previous section! Remember, there are numerous variables in a single file (row) for the history files, so we use ast
to help parse those rows!
history_catalog = intake.open_esm_datastore(
'cesm-hist-smyle-fosi.json',
csv_kwargs={"converters": {"variables": ast.literal_eval}},
sep="/",
)
history_catalog
None catalog with 5 dataset(s) from 6015 asset(s):
unique | |
---|---|
component | 2 |
stream | 5 |
date | 5235 |
case | 1 |
member_id | 1 |
frequency | 3 |
variables | 620 |
path | 6015 |
The timeseries file does not require any additional arguments 👌
timeseries_catalog = intake.open_esm_datastore('cesm-tseries-smyle-fosi.json')
timeseries_catalog
None catalog with 4 dataset(s) from 579 asset(s):
unique | |
---|---|
component | 2 |
stream | 4 |
case | 1 |
member_id | 1 |
variable | 579 |
start_time | 3 |
end_time | 3 |
time_range | 3 |
long_name | 570 |
units | 67 |
vertical_levels | 1 |
frequency | 3 |
path | 579 |
Search for Just Monthly Ocean Output#
We are only interested in monthly (frequency='month_1'
) ocean (component='ocn'
) output in this case
monthly_ocean_timeseries = timeseries_catalog.search(component='ocn', frequency='month_1')
monthly_ocean_timeseries
None catalog with 1 dataset(s) from 410 asset(s):
unique | |
---|---|
component | 1 |
stream | 1 |
case | 1 |
member_id | 1 |
variable | 410 |
start_time | 1 |
end_time | 1 |
time_range | 1 |
long_name | 409 |
units | 55 |
vertical_levels | 1 |
frequency | 1 |
path | 410 |
monthly_ocean_history = history_catalog.search(component='ocn', frequency='month_1')
monthly_ocean_history
None catalog with 1 dataset(s) from 756 asset(s):
unique | |
---|---|
component | 1 |
stream | 1 |
date | 756 |
case | 1 |
member_id | 1 |
frequency | 1 |
variables | 446 |
path | 756 |
Test File Access Speeds#
Here, we test the time it takes to read in the following time ranges:
1 year
5 years
10 years
20 years
40 years
60 years
A useful tool we will use here is the Dask Performance Report which enables the user to output their Dask dashboard, so they can share it with others. This provides a means of going back to a computation on your cluster to investigate which tasks, workers, etc.
Test out the Timeseries Files#
def test_timeseries_file(
num_years, chunk_strategy='time', long_term_average=False, monthly_average=False
):
chunk_dict = {'time': {'time': 240}, 'spatial': {'nlon': 160, 'nlat': 192}}
subset = monthly_ocean_timeseries.search(variable='FG_CO2')
# Start the timing when we read in the dataset using .to_dataset_dict()
start = time.time()
dsets = subset.to_dataset_dict(
'FG_CO2',
cdf_kwargs={'use_cftime': True, 'chunks': chunk_dict[chunk_strategy]},
progressbar=False,
)
keys = list(dsets.keys())
ds = dsets[keys[0]]
if long_term_average:
ds.mean(dim='time').compute()
if monthly_average:
ds.groupby('time.month').mean(dim='time').compute()
end = time.time()
return end - start
Apply the Computation with Timeseries#
We also including the Dask performance report, which can be viewed following the computation cell.
years = [1, 5, 10, 20, 30, 40, 50, 60]
df = pd.DataFrame()
with performance_report(filename="timeseries-computation.html"):
for year in years:
print(f'Starting year {year}')
# Compute without computation
wall_time = test_timeseries_file(year)
df = df.append(
{
'catalog': 'timeseries',
'computation': 'file_access',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
# Compute with long-term mean computation
wall_time = test_timeseries_file(year, long_term_average=True)
df = df.append(
{
'catalog': 'timeseries',
'computation': 'long_term_average',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
# Compute with long-term mean computation
wall_time = test_timeseries_file(year, monthly_average=True)
df = df.append(
{
'catalog': 'timeseries',
'computation': 'monthly_average',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
df_timeseries = df
Starting year 1
Starting year 5
Starting year 10
Starting year 20
Starting year 30
Starting year 40
Starting year 50
Starting year 60
Show code cell source
HTML('timeseries-computation.html')
file_read_in = df.loc[df.computation == 'file_access'].hvplot(
x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='No computation'
)
long_term_mean = df.loc[df.computation == 'long_term_average'].hvplot(
x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Long term mean'
)
monthly_average = df.loc[df.computation == 'monthly_average'].hvplot(
x='num_years', y='wall_time', ylabel='Time (s)', width=400, label='Monthly climatology'
)
# We combine all the plots, and specify we want a single column
(file_read_in + long_term_mean + monthly_average).cols(1)
Test out the History Files#
Next, we investigate the time it takes to read the datasets, compute a temporal average over the entire time period, and compute a monthly climatology using CESM history file output
def test_history_file(
num_years, chunk_strategy='time', long_term_average=False, monthly_average=False
):
chunk_dict = {'time': {'time': 240}, 'spatial': {'nlon': 160, 'nlat': 192}}
subset = monthly_ocean_history.search(
date=monthly_ocean_history.df.date[: num_years * 12], variables='FG_CO2'
)
# Start the timing when we read in the dataset using .to_dataset_dict()
start = time.time()
dsets = subset.to_dataset_dict(
'FG_CO2',
cdf_kwargs={'use_cftime': True, 'chunks': chunk_dict[chunk_strategy]},
progressbar=False,
)
keys = list(dsets.keys())
ds = dsets[keys[0]]
if long_term_average:
ds.mean(dim='time').compute()
if monthly_average:
ds.groupby('time.month').mean(dim='time').compute()
end = time.time()
return end - start
Apply the Computation with History Files#
We also including the Dask performance report, which can be viewed following the computation cell.
df = pd.DataFrame()
with performance_report(filename="history-computation.html"):
for year in years:
print(f'Starting year {year}')
# Compute without computation
wall_time = test_history_file(year)
df = df.append(
{
'catalog': 'history',
'computation': 'file_access',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
# Compute with long-term mean computation
wall_time = test_history_file(year, long_term_average=True)
df = df.append(
{
'catalog': 'history',
'computation': 'long_term_average',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
# Compute with monthly climatology
wall_time = test_history_file(year, monthly_average=True)
df = df.append(
{
'catalog': 'history',
'computation': 'monthly_average',
'num_years': year,
'wall_time': wall_time,
},
ignore_index=True,
)
df_history = df
Starting year 1
Starting year 5
Starting year 10
Starting year 20
Starting year 30
Starting year 40
Starting year 50
Starting year 60
Show code cell source
HTML('history-computation.html')