Running an existing model on HPC (e.g. Slurm)
This tutorial shows how to run an existing deep-learning based EDS-NLP model (for example the public pseudonymisation model eds-pseudo-public efficiently on a cluster. In a Clinical Data Warehouse like AP-HP's, most research projects might want to:
- first fetch a corpus of documents with PySpark. Depending on your computing setup, this might run on a specific cluster like Hadoop/YARN.
- run the NLP model on these notes. This is often best done on a GPU cluster, for instance one managed by Slurm.
Python inference script
Let's start by the Python NLP inference script. We’ll write an inference script that:
- loads an existing model, e.g.
AP-HP/dummy-nerwhich annotates entities of the DEFT 2020 dataset on documents. - reads notes from a Parquet dataset (e.g. exported from Spark)
- applies the model on these notes
- writes entities back to a new Parquet dataset (e.g. to be re-imported in Spark)
import logging
import os
from datetime import datetime
from typing import Union
import confit
import pyarrow.fs
import edsnlp
def make_fs(path: str, endpoint: str = None):
"""
This function can be used to define the filesystem explicitly.
Otherwise, it's automatically created from the path
(ex: "s3://", "hdfs://", ...) using default parameters.
"""
# For instance, if you have a s3 volume (S3 is not necessarily AWS !)
# you can use the S3 filesystem and provide credentials as env vars.
if path.startswith("s3://"):
return pyarrow.fs.S3FileSystem(
access_key=os.getenv("S3_ACCESS_KEY"),
secret_key=os.getenv("S3_SECRET_KEY"),
endpoint_override=os.getenv("S3_ENDPOINT"),
)
return None
app = confit.Cli() #(1)!
@app.command("inference")
def main(
*,
input_path: str,
output_path: str,
model_name: str = "AP-HP/dummy-ner",
batch_size: str = "32 docs",
show_progress: bool = False,
output_file_size: Union[int, str] = 10_000,
):
"""
Run inference on a corpus of notes stored in Parquet format.
Parameters
----------
input_path : str
Input Parquet path (e.g. s3://bucket/notes/ or hdfs path)
output_path : str
Output Parquet path (e.g. s3://bucket/note_nlp/ or hdfs path)
model_name : str
Model to load: local path, installed model package or EDS-NLP
compatible Hub repo (e.g. 'AP-HP/eds-pseudo-public')
batch_size : str
Batch size expression (e.g. '32 docs', '8000 words')
show_progress : bool
Show progress bars
output_file_size : Union[int, str]
Size per Parquet file (e.g. '1000 docs', '40000 words')
in the output dataset
"""
logging.info("Model loading started")
nlp = edsnlp.load(model_name)
# Do anything to the model here
print(nlp)
logging.info("Model loading done")
input_fs = make_fs(input_path)
output_fs = make_fs(output_path)
print(f"Job started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Read OMOP-like parquet (note_id, person_id, note_text, ...)
docs = edsnlp.data.read_parquet(
path=input_path,
converter="omop",
filesystem=input_fs,
read_in_worker=True,
doc_attributes=["note_id", "person_id"], #(2)!
)
# Apply the model lazily
docs = docs.map_pipeline(nlp)
# Configure multiprocessing with automatic resource detection
docs = docs.set_processing(
backend="multiprocessing",
batch_size=batch_size,
show_progress=show_progress,
# You can set num_cpu_workers and num_gpu_workers here,
# otherwise they are auto-detected
)
# Write entities to parquet, with a fallback when no entity
# Feel free to change the output format here
def doc_to_rows(doc):
rows = [
dict(
note_id=getattr(doc._, "note_id", None),
person_id=getattr(doc._, "person_id", None),
offset_begin=ent.start_char,
offset_end=ent.end_char,
label=ent.label_,
snippet=ent.text,
date=getattr(ent._, 'date'),
# You can add other ent attributes here
# like ent._.certainty, ent._.family, etc.
nlp_system=model_name,
)
for ent in doc.ents
]
return rows or [
dict(
note_id=getattr(doc._, "note_id", None),
person_id=getattr(doc._, "person_id", None),
offset_begin=0,
offset_end=0,
label="EMPTY",
snippet="",
date=None,
# You can add other ent attributes here
nlp_system=model_name,
)
]
# We declare here where we want to write the output
# All writers trigger the execution by default (unless execute=False)
docs.write_parquet(
path=output_path,
overwrite=True,
batch_size=output_file_size,
converter=doc_to_rows,
filesystem=output_fs,
)
print(f"Job done: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == "__main__":
app()
- We use confit to create a CLI application and enforce parameter types.
Converters and schemas
- If your input is not OMOP-like (ie with
note_textandnote_idcolumns), provide your own reader converter instead ofconverter="omop"(see the Converters page). - See the tutorial Processing multiple texts for more about batching expressions (
batch_size) and thebackendoptions.
Accessing computation resources on Slurm
Slurm is a workload manager for HPC clusters. You request resources (CPUs, memory, GPUs, time) and submit jobs with scripts. Key Below is a Slurm script that activates your environment, shows GPU info, and runs the inference script.
# Name your job clearly to find it in queues and reports.
#SBATCH --job-name=nlp
# Walltime limit. Increase if you hit the time limit.
#SBATCH --time 1:00:00
# For instance here, we request one V100 GPU. Adapt to what your cluster
# provides, and contact your admin if unsure.
#SBATCH --gres=gpu:v100:1
#SBATCH --partition gpuV100
# Single-node job with 4 CPU cores for rule-based pipes, preprocessing,
# collation, postprocessing.
#SBATCH --nodes=1
#SBATCH --cpus-per-task=10
# RAM (!= GPU VRAM !) per node (in MB), adjust if you hit OOM errors.
#SBATCH --mem=50000
# Container config (if your Slurm allows this). Adapt to your cluster
# setup and contact your admin if unsure.
#SBATCH --container-image /scratch/images/sparkhadoop.sqsh --container-mounts=/export/home/$USER:/export/home/$USER,/export/home/share:/export/home/share,/data/scratch/$USER:/data/scratch/$USER --container-mount-home --container-writable --container-workdir=/
# Stdout/stderr file patterns with `%j` expanded to the job ID.
# You can put these in a logs/ directory if you prefer, but MAKE SURE
# THAT THIS DIRECTORY EXISTS BEFORE SUBMITTING !
#SBATCH --output=slurm-%j-stdout.log
#SBATCH --error=slurm-%j-stderr.log
set -euo pipefail
# Setup the env. Simple setup for AP-HP cluster below
# Refer to your HPC documentation for your own setup.
/etc/start.sh
export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob`
export ARROW_LIBHDFS_DIR=/usr/local/hadoop/usr/lib/
source "$HOME/.user_conda/miniconda/etc/profile.d/conda.sh"
# Activate your environment(s), e.g. conda/venv/uv or a mix of these
conda activate your-conda-env
source path/to/your/project/.venv/bin/activate
# You can install packages here. Doing this here can be useful to
# ensure installed versions match the deployment env, for instance
# glibc, CUDA versions, etc. Otherwise, install in your env beforehand.
pip install "edsnlp[ml]" "pyarrow<17"
# Check available GPUs
nvidia-smi
cd path/to/your/project
python inference.py \
--model_name "AP-HP/dummy-ner" \
--input_path "hdfs:///user/USERNAME/notes/" \
--output_path "hdfs:///user/USERNAME/nlp_results/" \
--batch_size "10000 words" \
--output_file_size "1000 docs" \
--show_progress
Run and monitor the job
-
Launch the job and store the job id in a JOB_ID variable:
JOB_ID=$(sbatch job.sh | awk '{print $4}') && echo "Job: $JOB_ID"Job ID: 123456 -
See the current running and pending jobs.with
squeuesqueueJOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 123456 gpuV100 nlp USERNAME R 0:10 1 gpu-node-01 -
Cancel the job if needed with:
scancel $JOB_ID -
Follow the logs in real time with. See the above #SBATCH directive comment to put them in a directory if needed.
tail -f -n+0 slurm-$JOB_ID-std*.log
Fetching data with PySpark
The above job requires a Parquet dataset as input. You can use PySpark to extract notes from your CDW and write them to Parquet. In theory, you could run end-to-end with Spark using
docs = edsnlp.data.from_spark(...)
A more robust pattern is to decouple document selection from inference. In a Spark-enabled notebook or a Spark-submit job:
-
Extract your input corpus with Spark, write to Parquet (HDFS or S3)
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() note = spark.sql(""" SELECT note_id, person_id, note_text FROM your_database.note WHERE note_datetime >= '2024-01-01' and note_text IS NOT NULL LIMIT 10000 """) note.write.mode("overwrite").parquet("hdfs:///user/USERNAME/notes/") -
Run the Slurm GPU inference on that Parquet dataset, as in sections above (point your
--input_pathto the Parquet location and--output_pathto a destination Parquet directory). -
Load the inference results back into Spark if needed (aggregation, joins, etc.)
note_nlp = spark.read.parquet("hdfs:///user/USERNAME/nlp_results/") note_nlp.createOrReplaceTempView("note_nlp") # Example: count entities per label spark.sql(""" SELECT label, COUNT(*) AS n FROM note_nlp GROUP BY label ORDER BY n DESC """).show()
This approach keeps GPU inference scheduling independent of Spark, avoids excessive CPU pressure, and is easier to monitor and reason about.