Spark
TLDR
import edsnlp
stream = edsnlp.data.from_spark(df, converter="omop")
stream = stream.map_pipeline(nlp)
res = stream.to_spark(converter="omop")
# or equivalently
edsnlp.data.to_spark(stream, converter="omop")
We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.
As an example, imagine that we have the following OMOP dataframe (we'll name it note_df
)
note_id | note_text | note_datetime |
---|---|---|
0 | Le patient est admis pour une pneumopathie... | 2021-10-23 |
Reading from a Spark Dataframe[source]
The SparkReader (or edsnlp.data.from_spark
) reads a pyspark (or koalas) DataFrame and yields documents. At the moment, only entities and span attributes are loaded.
Example
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.from_spark(note_df, converter="omop")
annotated_docs = nlp.pipe(doc_iterator)
Generator vs list
edsnlp.data.from_spark
returns a Stream To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list
docs = list(edsnlp.data.from_spark(note_df, converter="omop"))
Parameters
PARAMETER | DESCRIPTION |
---|---|
data | The DataFrame to read.
|
shuffle | Whether to shuffle the data. If "dataset", the whole dataset will be shuffled before starting iterating on it (at the start of every epoch if looping). TYPE: |
seed | The seed to use for shuffling. TYPE: |
loop | Whether to loop over the data indefinitely. TYPE: |
converter | Converters to use to convert the rows of the DataFrame to Doc objects. These are documented on the Converters page. TYPE: |
kwargs | Additional keyword arguments to pass to the converter. These are documented on the Converters page. DEFAULT: |
RETURNS | DESCRIPTION |
---|---|
Stream | |
Writing to a Spark DataFrame[source]
edsnlp.data.to_spark
converts a list of documents into a Spark DataFrame, usually one row per document, unless the converter returns a list in which case each entry of the resulting list will be stored in its own row.
Example
import edsnlp, edsnlp.pipes as eds
nlp = edsnlp.blank("eds")
nlp.add_pipe(eds.covid())
note_df = sql('''
select note_id, note_text from note
where note_text is not null
limit 500
''')
docs = edsnlp.data.from_spark(note_df, converter="omop")
docs = nlp.pipe(docs)
res = edsnlp.data.to_spark(docs, converter="omop")
res.show()
Mac OS X
If you are using Mac OS X, you may need to set the following environment variable (see this thread) to run pyspark:
import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
Parameters
PARAMETER | DESCRIPTION |
---|---|
data | The data to write (either a list of documents or a Stream). TYPE: |
dtypes | The schema to use for the DataFrame. TYPE: |
show_dtypes | Whether to print the inferred schema (only if TYPE: |
execute | Whether to execute the writing operation immediately or to return a stream TYPE: |
converter | Converter to use to convert the documents to dictionary objects before storing them in the dataframe. These are documented on the Converters page. TYPE: |
kwargs | Additional keyword arguments to pass to the converter. These are documented on the Converters page. DEFAULT: |
Importing entities from a Spark DataFrame
If you have a dataframe with entities (e.g., note_nlp
in OMOP), you must join it with the dataframe containing the raw text (e.g., note
in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second note_nlp
dataframe that we will name note_nlp
.
note_nlp_id | note_id | start_char | end_char | note_nlp_source_value | lexical_variant |
---|---|---|---|---|---|
0 | 0 | 46 | 57 | disease | coronavirus |
1 | 0 | 77 | 88 | drug | paracétamol |
import pyspark.sql.functions as F
df = note_df.join(
note_nlp_df
.groupBy("note_id")
.agg(
F.collect_list(
F.struct(
F.col("note_nlp_id"),
F.col("start_char"),
F.col("end_char"),
F.col("note_nlp_source_value")
)
).alias("entities")
), "note_id", "left")