import glob
import re
import matplotlib as plt
import numpy as np
import scipy as sp
import xarray as xr
import intake
import intake_esm
import pandas as pdimport dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import performance_report######## File paths ################
lustre_scratch = "/lustre/desc1/scratch/harshah"
gdex_data = "/gdex/data/special_projects/pythia_2024"
#########
gdex_url = 'https://data.gdex.ucar.edu/'
era5_catalog = gdex_url + 'special_projects/pythia_2024/pythia_intake_catalogs/era5_catalog.json'
#
##########
print(era5_catalog)https://data.gdex.ucar.edu/special_projects/pythia_2024/pythia_intake_catalogs/era5_catalog.json
USE_PBS_SCHEDULER = True# Create a PBS cluster object
def get_pbs_cluster():
""" Create cluster through dask_jobqueue.
"""
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
job_name = 'dask-osdf-24',
cores = 1,
memory = '4GiB',
processes = 1,
local_directory = lustre_scratch + '/dask/spill',
log_directory = lustre_scratch + '/dask/logs/',
resource_spec = 'select=1:ncpus=1:mem=4GB',
queue = 'casper',
walltime = '3:00:00',
#interface = 'ib0'
interface = 'ext'
)
return cluster
def get_gateway_cluster():
""" Create cluster through dask_gateway
"""
from dask_gateway import Gateway
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=2, maximum=4)
return cluster
def get_local_cluster():
""" Create cluster using the Jupyter server's resources
"""
from distributed import LocalCluster, performance_report
cluster = LocalCluster()
cluster.scale(6)
return cluster# Obtain dask cluster in one of three ways
if USE_PBS_SCHEDULER:
cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
cluster = get_gateway_cluster()
else:
cluster = get_local_cluster()
# Connect to cluster
from distributed import Client
client = Client(cluster)/glade/u/home/harshah/venvs/osdf/lib/python3.10/site-packages/distributed/node.py:187: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40641 instead
warnings.warn(
# Scale the cluster and display cluster dashboard URL
n_workers =5
cluster.scale(n_workers)
client.wait_for_workers(n_workers = n_workers)
clusterLoading...
Load data¶
date = '20210214'
hour = '12'
var = 'TMP'
level = '2m_above_ground'
url1 = 'osdf:///aws-opendata/us-west-1/hrrrzarr/sfc/' + date + '/' + date + '_' + hour + 'z_anl.zarr/' + level + '/' + var + '/' + level
url2 = 'osdf:///aws-opendata/us-west-1/hrrrzarr/sfc/' + date + '/' + date + '_' + hour + 'z_anl.zarr/' + level + '/' + varprint(url1)osdf:///aws-opendata/us-west-1/hrrrzarr/sfc/20210214/20210214_12z_anl.zarr/2m_above_ground/TMP/2m_above_ground
%%time
temps = xr.open_zarr(url1)
tempsLoading...
%%time
temps.TMP.plot()
cluster.close()