Source code for waterEntropy.utils.dask_clusters
"""
Helpers for setting up dask clusters on HPC.
"""
import os
import subprocess
import sys
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import psutil
[docs]
def check_slurm_env():
"""Check if SLURM_CPU_BIND is set in env and delete if so
to satisfy cpu bind request on some machines."""
if "SLURM_CPU_BIND" in os.environ:
os.environ.pop("SLURM_CPU_BIND")
[docs]
def slurm_directives(args):
"""Process extra directives and directives to be skipped."""
extra = []
if args.hpc_account != "":
extra.append(f'--account="{args.hpc_account}"')
if args.hpc_qos != "":
extra.append(f'--qos="{args.hpc_qos}"')
if args.hpc_constraint != "":
extra.append(f'--constraint="{args.hpc_constraint}"')
# Skip directives.
skip = []
skip = ["--mem"]
return extra, skip
[docs]
def slurm_prologues(args):
"""Process any extra environment variables to be used."""
prologue = []
prologue.append(f'eval "$({args.conda_path} shell.bash hook)"')
if args.conda_exec == "mamba":
prologue.append(f'eval "$({args.conda_exec} shell hook --shell bash)"')
prologue.append(f"{args.conda_exec} activate {args.conda_env}")
prologue.append("export SLURM_CPU_FREQ_REQ=2250000")
return prologue
[docs]
def system_network_interface():
"""Get best candidate for HPC network interface from commonly known ones."""
hpc_nics = ["bond0", "ib0", "hsn0", "eth0"]
interfaces = list(psutil.net_if_addrs().keys())
for iface in hpc_nics:
if iface in interfaces:
break
return iface
[docs]
def slurm_submit_master(args):
"""Submit a master worker process for coordinating dask cluster setup,
orchestration and shutdown."""
# Get original CLI and remove --submit and exec.
cli = sys.argv
cli.pop(0)
cli.remove("--submit")
# Form a submit script
with open("WE-master-submit.sh", "w", encoding="utf-8") as f:
f.write("#!/bin/bash --login\n\n")
f.write("#SBATCH --job-name=waterentropy-master\n")
f.write("#SBATCH --nodes=1\n")
f.write("#SBATCH --ntasks=1\n")
f.write("#SBATCH --cpus-per-task=2\n")
f.write(f"#SBATCH --time={args.hpc_walltime}\n")
if args.hpc_account != "":
f.write(f"#SBATCH --account={args.hpc_account}\n")
f.write(f"#SBATCH --partition={args.hpc_queue}\n")
if args.hpc_qos != "":
f.write(f"#SBATCH --qos={args.hpc_qos}\n")
f.write("\n")
f.write(f'eval "$({args.conda_path} shell.bash hook)"\n')
if args.conda_exec == "mamba":
f.write(f'eval "$({args.conda_exec} shell hook --shell bash)"\n')
f.write(f"{args.conda_exec} activate {args.conda_env}\n\n")
f.write(f"srun waterEntropy {' '.join(cli)}")
# Fix slurm env.
check_slurm_env()
try:
sub = subprocess.check_output(["bash", "-c", "sbatch WE-master-submit.sh"])
print(sub.decode("utf-8"))
except subprocess.CalledProcessError as e:
print(e.output)