Skip to content

edsnlp.processing.parallel

nlp = spacy.blank('fr') module-attribute

_define_nlp(new_nlp)

Set the global nlp variable Doing it this way saves non negligeable amount of time

Source code in edsnlp/processing/parallel.py
14
15
16
17
18
19
20
def _define_nlp(new_nlp: Language):
    """
    Set the global nlp variable
    Doing it this way saves non negligeable amount of time
    """
    global nlp
    nlp = new_nlp

_chunker(iterable, total_length, chunksize)

Takes an iterable and chunk it.

Source code in edsnlp/processing/parallel.py
23
24
25
26
27
28
29
30
31
32
33
def _chunker(
    iterable: Iterable,
    total_length: int,
    chunksize: int,
):
    """
    Takes an iterable and chunk it.
    """
    return (
        iterable[pos : pos + chunksize] for pos in range(0, total_length, chunksize)
    )

_process_chunk(note, **pipe_kwargs)

Source code in edsnlp/processing/parallel.py
36
37
38
39
40
41
42
43
def _process_chunk(note: pd.DataFrame, **pipe_kwargs):

    list_results = []

    for out in _pipe_generator(note, nlp, progress_bar=False, **pipe_kwargs):
        list_results += out

    return list_results

pipe(note, nlp, context=[], additional_spans='discarded', extensions=[], chunksize=100, n_jobs=-2, progress_bar=True, **pipe_kwargs)

Function to apply a spaCy pipe to a pandas DataFrame note by using multiprocessing

PARAMETER DESCRIPTION
note

A pandas DataFrame with a note_id and note_text column

TYPE: DataFrame

nlp

A spaCy pipe

TYPE: Language

context

A list of column to add to the generated SpaCy document as an extension. For instance, if context=["note_datetime"], the corresponding value found in thenote_datetimecolumn will be stored indoc._.note_datetime, which can be useful e.g. for thedates` pipeline.

TYPE: List[str] DEFAULT: []

additional_spans

A name (or list of names) of SpanGroup on which to apply the pipe too: SpanGroup are available as doc.spans[spangroup_name] and can be generated by some pipes. For instance, the date pipe populates doc.spans['dates']

TYPE: Union[List[str], str], by default "discarded" DEFAULT: 'discarded'

extensions

Spans extensions to add to the extracted results: FOr instance, if extensions=["score_name"], the extracted result will include, for each entity, ent._.score_name.

TYPE: List[Tuple[str, T.DataType]], by default [] DEFAULT: []

chunksize

Batch size used to split tasks

TYPE: int DEFAULT: 100

n_jobs

Max number of parallel jobs. The default value uses the maximum number of available cores.

TYPE: int DEFAULT: -2

progress_bar

Whether to display a progress bar or not

TYPE: bool DEFAULT: True

**pipe_kwargs

Arguments exposed in processing.pipe_generator are also available here

DEFAULT: {}

RETURNS DESCRIPTION
DataFrame

A pandas DataFrame with one line per extraction

Source code in edsnlp/processing/parallel.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def pipe(
    note: pd.DataFrame,
    nlp: Language,
    context: List[str] = [],
    additional_spans: Union[List[str], str] = "discarded",
    extensions: ExtensionSchema = [],
    chunksize: int = 100,
    n_jobs: int = -2,
    progress_bar: bool = True,
    **pipe_kwargs,
):
    """
    Function to apply a spaCy pipe to a pandas DataFrame note by using multiprocessing

    Parameters
    ----------
    note : DataFrame
        A pandas DataFrame with a `note_id` and `note_text` column
    nlp : Language
        A spaCy pipe
    context : List[str]
        A list of column to add to the generated SpaCy document as an extension.
        For instance, if `context=["note_datetime"], the corresponding value found
        in the `note_datetime` column will be stored in `doc._.note_datetime`,
        which can be useful e.g. for the `dates` pipeline.
    additional_spans : Union[List[str], str], by default "discarded"
        A name (or list of names) of SpanGroup on which to apply the pipe too:
        SpanGroup are available as `doc.spans[spangroup_name]` and can be generated
        by some pipes. For instance, the `date` pipe populates doc.spans['dates']
    extensions : List[Tuple[str, T.DataType]], by default []
        Spans extensions to add to the extracted results:
        FOr instance, if `extensions=["score_name"]`, the extracted result
        will include, for each entity, `ent._.score_name`.
    chunksize: int, by default 100
        Batch size used to split tasks
    n_jobs: int, by default -2
        Max number of parallel jobs.
        The default value uses the maximum number of available cores.
    progress_bar: bool, by default True
        Whether to display a progress bar or not
    **pipe_kwargs:
        Arguments exposed in `processing.pipe_generator` are also available here

    Returns
    -------
    DataFrame
        A pandas DataFrame with one line per extraction
    """

    if context:
        check_spacy_version_for_context()

    # Setting the nlp variable
    _define_nlp(nlp)

    verbose = 10 if progress_bar else 0

    executor = Parallel(
        n_jobs, backend="multiprocessing", prefer="processes", verbose=verbose
    )
    executor.warn(f"Used nlp components: {nlp.component_names}")

    pipe_kwargs["additional_spans"] = additional_spans
    pipe_kwargs["extensions"] = extensions
    pipe_kwargs["context"] = context

    if verbose:
        executor.warn(f"{int(len(note)/chunksize)} tasks to complete")

    do = delayed(_process_chunk)

    tasks = (
        do(chunk, **pipe_kwargs)
        for chunk in _chunker(note, len(note), chunksize=chunksize)
    )
    result = executor(tasks)

    out = _flatten(result)

    return pd.DataFrame(out)
Back to top