Processing multiple texts
In the previous tutorials, we've seen how to apply a spaCy NLP pipeline to a single text. Once the pipeline is tested and ready to be applied on an entire corpus, we'll want to deploy it efficiently.
In this tutorial, we'll cover a few best practices and some caveats to avoid. Then, we'll explore methods that EDS-NLP provides perform inference on multiple texts.
Consider this simple pipeline:
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.sentences")
nlp.add_pipe("eds.normalizer")
config = dict(
terms=dict(patient=["patient", "malade"]),
attr="NORM",
)
nlp.add_pipe("eds.matcher", config=config)
# Add qualifiers
nlp.add_pipe("eds.negation")
nlp.add_pipe("eds.hypothesis")
nlp.add_pipe("eds.family")
# Add date detection
nlp.add_pipe("eds.dates")
Let's deploy it on a large number of documents.
What about a for
loop?
Suppose we have a corpus of text:
text = (
"Patient admis le 25 septembre 2021 pour suspicion de Covid.\n"
"Pas de cas de coronavirus dans ce service.\n"
"Le père du patient est atteint du covid."
)
corpus = [text] * 10000 #
You could just apply the pipeline document by document.
# ↑ Omitted code above ↑
docs = [nlp(text) for text in corpus]
Next, you might want to convert these documents to a DataFrame for further analysis:
import edsnlp.data
df = edsnlp.data.to_pandas(docs, converter="omop")
There are a few issues with this approach:
- If our model contains deep learning components (which it does not in this tutorial), we don't benefit from optimized batched matrix operations : ideally, we'd like to process multiple documents at once.
- We may have multiple cores available but we don't use them to apply the pipes of our model to multiple documents at the same time.
- We would also like to perform the conversion step (
converter="omop"
which extracts the annotations of our Doc object into dictionaries) in parallel.
Lazy inference and parallelization
To efficiently perform the same operations on multiple documents at once, EDS-NLP uses lazy collections, which record the operations to perform on the documents without actually executing them directly. This allows EDS-NLP to distribute these operations on multiple cores or machines when it is time to execute them. We can configure how the collection operations are run (how many jobs/workers, how many gpus, whether to use the spark engine) via the lazy collection .set_processing(...)
method.
For instance,
docs = edsnlp.data.from_iterable(corpus)
as well as any edsnlp.data.read_*
or edsnlp.data.from_*
return a lazy collection, that we can iterate over or complete with more operations. To apply the model on our collection of documents, we can simply do:
docs = docs.map_pipeline(nlp)
# or à la spaCy :
# docs = nlp.pipe(docs)
SpaCy vs EDS-NLP
SpaCy's nlp.pipe
method is not the same as EDS-NLP's nlp.pipe
method, and will iterate over anything you pass to it, therefore executing the operations scheduled in our lazy collection.
We recommend you instantiate your models using nlp = edsnlp.blank(...)
or nlp = edsnlp.load(...)
.
Otherwise, use the following to apply a spaCy model on a lazy collection docs
without triggering its execution:
docs = docs.map_pipeline(nlp)
Finally, we can convert the documents to a DataFrame (or other formats / files) using the edsnlp.data.to_*
or edsnlp.data.write_*
methods. This triggers the execution of the operations scheduled in the lazy collection and produces the rows of the DataFrame.
df = docs.to_pandas(converter="omop")
# or equivalently:
# df = edsnlp.data.to_pandas(docs, converter="omop")
We can also iterate over the documents, which also triggers the execution of the operations scheduled in the lazy collection.
for doc in docs:
# do something with the doc
pass
Processing a DataFrame
Processing text within a pandas DataFrame is a very common use case. In many applications, you'll select a corpus of documents over a distributed cluster, load it in memory and process all texts.
The OMOP CDM
In every tutorial that mentions distributing EDS-NLP over a corpus of documents, we will expect the data to be organised using a flavour of the OMOP Common Data Model.
The OMOP CDM defines two tables of interest to us:
- the
note
table contains the clinical notes - the
note_nlp
table holds the results of a NLP pipeline applied to thenote
table.
To make sure we can follow along, we propose three recipes for getting the DataFrame: using a dummy dataset like before, loading a CSV or by loading a Spark DataFrame into memory.
import pandas as pd
text = (
"Patient admis le 25 septembre 2021 pour suspicion de Covid.\n"
"Pas de cas de coronavirus dans ce service.\n"
"Le père du patient est atteint du covid."
)
corpus = [text] * 1000
data = pd.DataFrame(dict(note_text=corpus))
data["note_id"] = range(len(data))
import pandas as pd
data = pd.read_csv("note.csv")
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql("SELECT * FROM note")
df = df.select("note_id", "note_text")
data = df.limit(1000).toPandas() #
We'll see in what follows how we can efficiently deploy our pipeline on the data
object.
Locally without parallelization
# Read from a dataframe & use the omop converter
docs = edsnlp.data.from_pandas(data, converter="omop")
# Add the pipeline to operations that will be run
docs = nlp.pipe(docs)
# Convert each doc to a list of dicts (one by entity)
# and store the result in a pandas DataFrame
note_nlp = edsnlp.data.to_pandas(
docs,
converter="ents",
# Below are the arguments to the converter
span_getter=["ents", "dates"],
span_attributes={ #
"negation": "negation",
"hypothesis": "hypothesis",
"family": "family",
"date.day": "date_day", # slugified name
"date.month": "date_month",
"date.year": "date_year",
},
)
The result on the first note:
note_id | start | end | label | lexical_variant | negation | hypothesis | family | key |
---|---|---|---|---|---|---|---|---|
0 | 0 | 7 | patient | Patient | 0 | 0 | 0 | ents |
0 | 114 | 121 | patient | patient | 0 | 0 | 1 | ents |
0 | 17 | 34 | 2021-09-25 | 25 septembre 2021 | nan | nan | nan | dates |
Locally, using multiple parallel workers
# Read from a dataframe & use the omop converter
docs = edsnlp.data.from_pandas(data, converter="omop")
# Add the pipeline to operations that will be run
docs = nlp.pipe(docs)
# The operations of our lazy collection will be distributed on multiple workers
docs = docs.set_processing(backend="multiprocessing")
# Convert each doc to a list of dicts (one by entity)
# and store the result in a pandas DataFrame
note_nlp = edsnlp.data.to_pandas(
docs,
converter="ents",
span_getter=["ents", "dates"],
span_attributes={
"negation": "negation",
"hypothesis": "hypothesis",
"family": "family",
"date.day": "date_day", # slugify the extension name
"date.month": "date_month",
"date.year": "date_year"
},
)
In a distributed fashion with spark
To use the Spark engine to distribute the computation, we create our lazy collection from the Spark dataframe directly and write the result to a new Spark dataframe. EDS-NLP will automatically distribute the operations on the cluster (setting backend="spark"
behind the scenes), but you can change the backend (for instance to multiprocessing
to run locally).
# Read from the pyspark dataframe & use the omop converter
docs = edsnlp.data.from_spark(df, converter="omop")
# Add the pipeline to operations that will be run
docs = nlp.pipe(docs)
# Convert each doc to a list of dicts (one by entity)
# and store the result in a pyspark DataFrame
note_nlp = edsnlp.data.to_spark(
docs,
converter="ents",
span_getter=["ents", "dates"],
span_attributes={
"negation": "negation",
"hypothesis": "hypothesis",
"family": "family",
"date.day": "date_day", # slugify the extension name
"date.month": "date_month",
"date.year": "date_year"
},
dtypes=None, #
)
Using a custom converter
To customize the conversion of a Doc object to dictionaries, you can pass a converter
argument. It will either be a string (the name of a converter) or a callable, that should return either a dictionary or a list of dictionaries.
from spacy.tokens import Doc
from typing import Any, Dict, List
def get_entities(doc: Doc) -> List[Dict[str, Any]]:
"""Return a list of dict representation for the entities"""
entities = []
for ent in doc.ents:
d = dict(
begin=ent.start_char,
end=ent.end_char,
label=ent.label_,
entity_text=ent.text,
negation=ent._.negation,
hypothesis=ent._.hypothesis,
family=ent._.family,
)
entities.append(d)
for date in doc.spans.get("dates", []):
d = dict(
begin=date.start_char,
end=date.end_char,
label="date",
entity_text=date.text,
)
entities.append(d)
return entities
docs = edsnlp.data.from_pandas(data, converter="omop")
# Add the pipeline to operations that will be run
docs = nlp.pipe(docs)
# Convert each doc to a list of dicts (one by entity)
# and store the result in a pyspark DataFrame
note_nlp = edsnlp.data.to_pandas(
docs,
converter=get_entities,
# no keyword args here since our converter expects none
)
begin | end | label | entity_text | negation | hypothesis | family |
---|---|---|---|---|---|---|
0 | 7 | patient | Patient | False | False | False |
114 | 121 | patient | patient | False | False | True |
17 | 34 | date | 25 sept... | |||
0 | 7 | patient | Patient | False | False | False |
114 | 121 | patient | patient | False | False | True |