Skip to content

Spark

TLDR
import edsnlp

stream = edsnlp.data.from_spark(df, converter="omop")
stream = stream.map_pipeline(nlp)
res = stream.to_spark(converter="omop")
# or equivalently
edsnlp.data.to_spark(stream, converter="omop")

We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.

As an example, imagine that we have the following OMOP dataframe (we'll name it note_df)

note_id note_text note_datetime
0 Le patient est admis pour une pneumopathie... 2021-10-23

Reading from a Spark Dataframe[source]

The SparkReader (or edsnlp.data.from_spark) reads a pyspark (or koalas) DataFrame and yields documents. At the moment, only entities and span attributes are loaded.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.from_spark(note_df, converter="omop")
annotated_docs = nlp.pipe(doc_iterator)

Generator vs list

edsnlp.data.from_spark returns a Stream To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list

docs = list(edsnlp.data.from_spark(note_df, converter="omop"))

Parameters

PARAMETER DESCRIPTION
data

The DataFrame to read.

shuffle

Whether to shuffle the data. If "dataset", the whole dataset will be shuffled before starting iterating on it (at the start of every epoch if looping).

TYPE: Literal['dataset', False] DEFAULT: False

seed

The seed to use for shuffling.

TYPE: Optional[int] DEFAULT: None

loop

Whether to loop over the data indefinitely.

TYPE: bool DEFAULT: False

converter

Converters to use to convert the rows of the DataFrame to Doc objects. These are documented on the Converters page.

TYPE: Optional[AsList[Union[str, Callable]]] DEFAULT: None

kwargs

Additional keyword arguments to pass to the converter. These are documented on the Converters page.

DEFAULT: {}

RETURNS DESCRIPTION
Stream

Writing to a Spark DataFrame[source]

edsnlp.data.to_spark converts a list of documents into a Spark DataFrame, usually one row per document, unless the converter returns a list in which case each entry of the resulting list will be stored in its own row.

Example

import edsnlp, edsnlp.pipes as eds

nlp = edsnlp.blank("eds")
nlp.add_pipe(eds.covid())

note_df = sql('''
    select note_id, note_text from note
    where note_text is not null
    limit 500
''')

docs = edsnlp.data.from_spark(note_df, converter="omop")

docs = nlp.pipe(docs)

res = edsnlp.data.to_spark(docs, converter="omop")

res.show()

Mac OS X

If you are using Mac OS X, you may need to set the following environment variable (see this thread) to run pyspark:

import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

Parameters

PARAMETER DESCRIPTION
data

The data to write (either a list of documents or a Stream).

TYPE: Union[Any, Stream]

dtypes

The schema to use for the DataFrame.

TYPE: Any DEFAULT: None

show_dtypes

Whether to print the inferred schema (only if dtypes is None).

TYPE: bool DEFAULT: True

execute

Whether to execute the writing operation immediately or to return a stream

TYPE: bool DEFAULT: True

converter

Converter to use to convert the documents to dictionary objects before storing them in the dataframe. These are documented on the Converters page.

TYPE: Optional[Union[str, Callable]] DEFAULT: None

kwargs

Additional keyword arguments to pass to the converter. These are documented on the Converters page.

DEFAULT: {}

Importing entities from a Spark DataFrame

If you have a dataframe with entities (e.g., note_nlp in OMOP), you must join it with the dataframe containing the raw text (e.g., note in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second note_nlp dataframe that we will name note_nlp.

note_nlp_id note_id start_char end_char note_nlp_source_value lexical_variant
0 0 46 57 disease coronavirus
1 0 77 88 drug paracétamol
import pyspark.sql.functions as F

df = note_df.join(
    note_nlp_df
    .groupBy("note_id")
    .agg(
        F.collect_list(
            F.struct(
                F.col("note_nlp_id"),
                F.col("start_char"),
                F.col("end_char"),
                F.col("note_nlp_source_value")
            )
        ).alias("entities")
    ), "note_id", "left")