Spark
TLDR
import edsnlp
iterator = edsnlp.data.from_spark(df, converter="omop")
docs = nlp.pipe(iterator)
res = edsnlp.data.to_spark(docs, converter="omop", dtypes=None) # dtypes (i.e. pyspark's schema) is recommended
We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.
As an example, imagine that we have the following OMOP dataframe (we'll name it note_df
)
note_id | note_text | note_datetime |
---|---|---|
0 | Le patient est admis pour une pneumopathie... | 2021-10-23 |
Reading from a Spark Dataframe
The SparkReader (or edsnlp.data.from_spark
) reads a pyspark (or koalas) DataFrame and yields documents. At the moment, only entities and span attributes are loaded.
Example
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.from_spark(note_df, converter="omop")
annotated_docs = nlp.pipe(doc_iterator)
Generator vs list
edsnlp.data.from_spark
returns a LazyCollection To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list
docs = list(edsnlp.data.from_spark(note_df, converter="omop"))
Parameters
PARAMETER | DESCRIPTION |
---|---|
data | The DataFrame to read.
|
converter | Converter to use to convert the rows of the DataFrame to Doc objects TYPE: |
kwargs | Additional keyword arguments passed to the converter. These are documented on the Data schemas page. DEFAULT: |
RETURNS | DESCRIPTION |
---|---|
LazyCollection | |
Writing to a Spark DataFrame
edsnlp.data.to_spark
converts a list of documents into a Spark DataFrame, usually one row per document, unless the converter returns a list in which case each entry of the resulting list will be stored in its own row.
Example
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.covid")
note_df = sql('''
select note_id, note_text from note
where note_text is not null
limit 500
''')
docs = edsnlp.data.from_spark(note_df, converter="omop")
docs = nlp.pipe(docs)
res = edsnlp.data.to_spark(docs, converter="omop")
res.show()
Mac OS X
If you are using Mac OS X, you may need to set the following environment variable (see this thread) to run pyspark:
import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
Parameters
PARAMETER | DESCRIPTION |
---|---|
data | The data to write (either a list of documents or a LazyCollection). TYPE: |
converter | Converter to use to convert the documents to dictionary objects before storing them in the dataframe. TYPE: |
dtypes | The schema to use for the DataFrame. TYPE: |
show_dtypes | Whether to print the inferred schema (only if TYPE: |
kwargs | Additional keyword arguments passed to the converter. These are documented on the Data schemas page. DEFAULT: |
Importing entities from a Spark DataFrame
If you have a dataframe with entities (e.g., note_nlp
in OMOP), you must join it with the dataframe containing the raw text (e.g., note
in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second note_nlp
dataframe that we will name note_nlp
.
note_nlp_id | note_id | start_char | end_char | note_nlp_source_value | lexical_variant |
---|---|---|---|---|---|
0 | 0 | 46 | 57 | disease | coronavirus |
1 | 0 | 77 | 88 | drug | paracétamol |
import pyspark.sql.functions as F
df = note_df.join(
note_nlp_df
.groupBy("note_id")
.agg(
F.collect_list(
F.struct(
F.col("note_nlp_id"),
F.col("start_char"),
F.col("end_char"),
F.col("note_nlp_source_value")
)
).alias("entities")
), "note_id", "left")