Skip to content

Spark

TLDR
import edsnlp

iterator = edsnlp.data.from_spark(df, converter="omop")
docs = nlp.pipe(iterator)
res = edsnlp.data.to_spark(docs, converter="omop", dtypes=None) # dtypes (i.e. pyspark's schema) is recommended

We provide methods to read and write documents (raw or annotated) from and to Spark DataFrames.

As an example, imagine that we have the following OMOP dataframe (we'll name it note_df)

note_id note_text note_datetime
0 Le patient est admis pour une pneumopathie... 2021-10-23

Reading from a Spark Dataframe

The SparkReader (or edsnlp.data.from_spark) reads a pyspark (or koalas) DataFrame and yields documents. At the moment, only entities and span attributes are loaded.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.from_spark(note_df, converter="omop")
annotated_docs = nlp.pipe(doc_iterator)

Generator vs list

edsnlp.data.from_spark returns a LazyCollection To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list

docs = list(edsnlp.data.from_spark(note_df, converter="omop"))

Parameters

PARAMETER DESCRIPTION
data

The DataFrame to read.

converter

Converter to use to convert the rows of the DataFrame to Doc objects

TYPE: Union[str, Callable]

kwargs

Additional keyword arguments passed to the converter. These are documented on the Data schemas page.

DEFAULT: {}

RETURNS DESCRIPTION
LazyCollection

Writing to a Spark DataFrame

edsnlp.data.to_spark converts a list of documents into a Spark DataFrame, usually one row per document, unless the converter returns a list in which case each entry of the resulting list will be stored in its own row.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.covid")

note_df = sql('''
    select note_id, note_text from note
    where note_text is not null
    limit 500
''')

docs = edsnlp.data.from_spark(note_df, converter="omop")

docs = nlp.pipe(docs)

res = edsnlp.data.to_spark(docs, converter="omop")

res.show()

Mac OS X

If you are using Mac OS X, you may need to set the following environment variable (see this thread) to run pyspark:

import os
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

Parameters

PARAMETER DESCRIPTION
data

The data to write (either a list of documents or a LazyCollection).

TYPE: Union[Any, LazyCollection]

converter

Converter to use to convert the documents to dictionary objects before storing them in the dataframe.

TYPE: Optional[Union[str, Callable]]

dtypes

The schema to use for the DataFrame.

TYPE: Any DEFAULT: None

show_dtypes

Whether to print the inferred schema (only if dtypes is None).

TYPE: bool DEFAULT: True

kwargs

Additional keyword arguments passed to the converter. These are documented on the Data schemas page.

DEFAULT: {}

Importing entities from a Spark DataFrame

If you have a dataframe with entities (e.g., note_nlp in OMOP), you must join it with the dataframe containing the raw text (e.g., note in OMOP) to obtain a single dataframe with the entities next to the raw text. For instance, the second note_nlp dataframe that we will name note_nlp.

note_nlp_id note_id start_char end_char note_nlp_source_value lexical_variant
0 0 46 57 disease coronavirus
1 0 77 88 drug paracétamol
import pyspark.sql.functions as F

df = note_df.join(
    note_nlp_df
    .groupBy("note_id")
    .agg(
        F.collect_list(
            F.struct(
                F.col("note_nlp_id"),
                F.col("start_char"),
                F.col("end_char"),
                F.col("note_nlp_source_value")
            )
        ).alias("entities")
    ), "note_id", "left")