Skip to content

edsnlp.processing.parallel

pipe(note, nlp, context=[], additional_spans=[], extensions=[], results_extractor=None, chunksize=100, n_jobs=-2, progress_bar=True, **pipe_kwargs)

Function to apply a spaCy pipe to a pandas DataFrame note by using multiprocessing

PARAMETER DESCRIPTION
note

A pandas DataFrame with a note_id and note_text column

TYPE: DataFrame

nlp

A spaCy pipe

TYPE: Language

context

A list of column to add to the generated SpaCy document as an extension. For instance, if context=["note_datetime"], the corresponding value found in thenote_datetimecolumn will be stored indoc._.note_datetime, which can be useful e.g. for thedates` pipeline.

TYPE: List[str] DEFAULT: []

results_extractor

Arbitrary function that takes extract serialisable results from the computed spaCy Doc object. The output of the function must be a list of dictionaries containing the extracted spans or entities.

TYPE: Optional[Callable[[Doc], List[Dict[str, Any]]]] DEFAULT: None

additional_spans

A name (or list of names) of SpanGroup on which to apply the pipe too: SpanGroup are available as doc.spans[spangroup_name] and can be generated by some pipes. For instance, the date pipe populates doc.spans['dates']

TYPE: Union[List[str], str], by default [] (empty list) DEFAULT: []

extensions

Spans extensions to add to the extracted results: For instance, if extensions=["score_name"], the extracted result will include, for each entity, ent._.score_name.

TYPE: List[Tuple[str, T.DataType]], by default [] DEFAULT: []

chunksize

Batch size used to split tasks

TYPE: int DEFAULT: 100

n_jobs

Max number of parallel jobs. The default value uses the maximum number of available cores.

TYPE: int DEFAULT: -2

progress_bar

Whether to display a progress bar or not

TYPE: bool DEFAULT: True

**pipe_kwargs

Arguments exposed in processing.pipe_generator are also available here

DEFAULT: {}

RETURNS DESCRIPTION
DataFrame

A pandas DataFrame with one line per extraction

Source code in edsnlp/processing/parallel.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def pipe(
    note: pd.DataFrame,
    nlp: Language,
    context: List[str] = [],
    additional_spans: Union[List[str], str] = [],
    extensions: ExtensionSchema = [],
    results_extractor: Optional[Callable[[Doc], List[Dict[str, Any]]]] = None,
    chunksize: int = 100,
    n_jobs: int = -2,
    progress_bar: bool = True,
    **pipe_kwargs,
):
    """
    Function to apply a spaCy pipe to a pandas DataFrame note by using multiprocessing

    Parameters
    ----------
    note : DataFrame
        A pandas DataFrame with a `note_id` and `note_text` column
    nlp : Language
        A spaCy pipe
    context : List[str]
        A list of column to add to the generated SpaCy document as an extension.
        For instance, if `context=["note_datetime"], the corresponding value found
        in the `note_datetime` column will be stored in `doc._.note_datetime`,
        which can be useful e.g. for the `dates` pipeline.
    results_extractor : Optional[Callable[[Doc], List[Dict[str, Any]]]]
        Arbitrary function that takes extract serialisable results from the computed
        spaCy `Doc` object. The output of the function must be a list of dictionaries
        containing the extracted spans or entities.
    additional_spans : Union[List[str], str], by default [] (empty list)
        A name (or list of names) of SpanGroup on which to apply the pipe too:
        SpanGroup are available as `doc.spans[spangroup_name]` and can be generated
        by some pipes. For instance, the `date` pipe populates doc.spans['dates']
    extensions : List[Tuple[str, T.DataType]], by default []
        Spans extensions to add to the extracted results:
        For instance, if `extensions=["score_name"]`, the extracted result
        will include, for each entity, `ent._.score_name`.
    chunksize: int, by default 100
        Batch size used to split tasks
    n_jobs: int, by default -2
        Max number of parallel jobs.
        The default value uses the maximum number of available cores.
    progress_bar: bool, by default True
        Whether to display a progress bar or not
    **pipe_kwargs:
        Arguments exposed in `processing.pipe_generator` are also available here

    Returns
    -------
    DataFrame
        A pandas DataFrame with one line per extraction
    """

    if context:
        check_spacy_version_for_context()

    # Setting the nlp variable
    _define_nlp(nlp)

    verbose = 10 if progress_bar else 0

    executor = Parallel(
        n_jobs, backend="multiprocessing", prefer="processes", verbose=verbose
    )
    executor.warn(f"Used nlp components: {nlp.component_names}")

    pipe_kwargs["additional_spans"] = additional_spans
    pipe_kwargs["extensions"] = extensions
    pipe_kwargs["results_extractor"] = results_extractor
    pipe_kwargs["context"] = context

    if verbose:
        executor.warn(f"{int(len(note)/chunksize)} tasks to complete")

    do = delayed(_process_chunk)

    tasks = (
        do(chunk, **pipe_kwargs)
        for chunk in _chunker(note, len(note), chunksize=chunksize)
    )
    result = executor(tasks)

    out = _flatten(result)

    return pd.DataFrame(out)