Skip to content

Spark

TLDR
import edsnlp


docs = edsnlp.data.from_spark(df, converter="omop")
docs = docs.map_pipeline(nlp)
res = edsnlp.data.to_spark(docs, converter="omop")

We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.

As an example, imagine that we have the following OMOP dataframe (we'll name it note_df)

note_id note_text note_datetime
0 Le patient est admis pour une pneumopathie... 2021-10-23

Reading from a Spark Dataframe

The SparkReader (or edsnlp.data.from_spark) reads a pyspark (or koalas) DataFrame and yields documents. At the moment, only entities and span attributes are loaded.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.from_spark(note_df, converter="omop")
annotated_docs = nlp.pipe(doc_iterator)

Generator vs list

edsnlp.data.from_spark returns a LazyCollection To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list

docs = list(edsnlp.data.from_spark(note_df, converter="omop"))

Parameters

PARAMETER DESCRIPTION
data

The DataFrame to read.

converter

Converter to use to convert the rows of the DataFrame to Doc objects. These are documented on the Converters page.

TYPE: Optional[Union[str, Callable]] DEFAULT: None

kwargs

Additional keyword arguments to pass to the converter. These are documented on the Converters page.

DEFAULT: {}

RETURNS DESCRIPTION
LazyCollection

Writing to a Spark DataFrame

edsnlp.data.to_spark converts a list of documents into a Spark DataFrame, usually one row per document, unless the converter returns a list in which case each entry of the resulting list will be stored in its own row.

Example

import edsnlp, edsnlp.pipes as eds

nlp = edsnlp.blank("eds")
nlp.add_pipe(eds.covid())

note_df = sql('''
    select note_id, note_text from note
    where note_text is not null
    limit 500
''')

docs = edsnlp.data.from_spark(note_df, converter="omop")

docs = nlp.pipe(docs)

res = edsnlp.data.to_spark(docs, converter="omop")

res.show()

Mac OS X

If you are using Mac OS X, you may need to set the following environment variable (see this thread) to run pyspark:

import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

Parameters

PARAMETER DESCRIPTION
data

The data to write (either a list of documents or a LazyCollection).

TYPE: Union[Any, LazyCollection]

dtypes

The schema to use for the DataFrame.

TYPE: Any DEFAULT: None

show_dtypes

Whether to print the inferred schema (only if dtypes is None).

TYPE: bool DEFAULT: True

converter

Converter to use to convert the documents to dictionary objects before storing them in the dataframe. These are documented on the Converters page.

TYPE: Optional[Union[str, Callable]] DEFAULT: None

kwargs

Additional keyword arguments to pass to the converter. These are documented on the Converters page.

DEFAULT: {}

Importing entities from a Spark DataFrame

If you have a dataframe with entities (e.g., note_nlp in OMOP), you must join it with the dataframe containing the raw text (e.g., note in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second note_nlp dataframe that we will name note_nlp.

note_nlp_id note_id start_char end_char note_nlp_source_value lexical_variant
0 0 46 57 disease coronavirus
1 0 77 88 drug paracétamol
import pyspark.sql.functions as F

df = note_df.join(
    note_nlp_df
    .groupBy("note_id")
    .agg(
        F.collect_list(
            F.struct(
                F.col("note_nlp_id"),
                F.col("start_char"),
                F.col("end_char"),
                F.col("note_nlp_source_value")
            )
        ).alias("entities")
    ), "note_id", "left")