Skip to content

edsnlp.pipelines.core.contextual_matcher.contextual_matcher

ContextualMatcher

Bases: BaseComponent

Allows additional matching in the surrounding context of the main match group, for qualification/filtering.

PARAMETER DESCRIPTION
nlp

spaCy Language object.

TYPE: Language

name

The name of the pipe

TYPE: str

patterns

The configuration dictionary

TYPE: Union[Dict[str, Any], List[Dict[str, Any]]]

assign_as_span

Whether to store eventual extractions defined via the assign key as Spans or as string

TYPE: bool DEFAULT: False

attr

Attribute to match on, eg TEXT, NORM, etc.

TYPE: str DEFAULT: 'NORM'

ignore_excluded

Whether to skip excluded tokens during matching.

TYPE: bool DEFAULT: False

ignore_space_tokens

Whether to skip space tokens during matching.

TYPE: bool DEFAULT: False

alignment_mode

Overwrite alignment mode.

TYPE: str DEFAULT: 'expand'

regex_flags

RegExp flags to use when matching, filtering and assigning (See here)

TYPE: Union[re.RegexFlag, int] DEFAULT: 0

include_assigned

Whether to include (eventual) assign matches to the final entity

TYPE: bool DEFAULT: False

Source code in edsnlp/pipelines/core/contextual_matcher/contextual_matcher.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
class ContextualMatcher(BaseComponent):
    """
    Allows additional matching in the surrounding context of the main match group,
    for qualification/filtering.

    Parameters
    ----------
    nlp : Language
        spaCy `Language` object.
    name : str
        The name of the pipe
    patterns: Union[Dict[str, Any], List[Dict[str, Any]]]
        The configuration dictionary
    assign_as_span : bool
        Whether to store eventual extractions defined via the `assign` key as Spans
        or as string
    attr : str
        Attribute to match on, eg `TEXT`, `NORM`, etc.
    ignore_excluded : bool
        Whether to skip excluded tokens during matching.
    ignore_space_tokens: bool
        Whether to skip space tokens during matching.
    alignment_mode : str
        Overwrite alignment mode.
    regex_flags : Union[re.RegexFlag, int]
        RegExp flags to use when matching, filtering and assigning (See
        [here](https://docs.python.org/3/library/re.html#flags))
    include_assigned : bool
        Whether to include (eventual) assign matches to the final entity
    """

    def __init__(
        self,
        nlp: Language,
        name: str,
        patterns: Union[Dict[str, Any], List[Dict[str, Any]]],
        assign_as_span: bool = False,
        alignment_mode: str = "expand",
        attr: str = "NORM",
        regex_flags: Union[re.RegexFlag, int] = 0,
        ignore_excluded: bool = False,
        ignore_space_tokens: bool = False,
        include_assigned: bool = False,
    ):
        self.name = name
        self.nlp = nlp
        self.attr = attr
        self.assign_as_span = assign_as_span
        self.ignore_excluded = ignore_excluded
        self.ignore_space_tokens = ignore_space_tokens
        self.alignment_mode = alignment_mode
        self.regex_flags = regex_flags
        self.include_assigned = include_assigned

        # Configuration parsing
        patterns = models.FullConfig.parse_obj(patterns).__root__
        self.patterns = {pattern.source: pattern for pattern in patterns}

        # Matchers for the anchors
        self.phrase_matcher = EDSPhraseMatcher(
            self.nlp.vocab,
            attr=attr,
            ignore_excluded=ignore_excluded,
            ignore_space_tokens=ignore_space_tokens,
        )
        self.regex_matcher = RegexMatcher(
            attr=attr,
            flags=regex_flags,
            ignore_excluded=ignore_excluded,
            ignore_space_tokens=ignore_space_tokens,
            alignment_mode=alignment_mode,
        )

        self.phrase_matcher.build_patterns(
            nlp=nlp,
            terms={
                source: {
                    "patterns": p.terms,
                }
                for source, p in self.patterns.items()
            },
        )
        self.regex_matcher.build_patterns(
            regex={
                source: {
                    "regex": p.regex,
                    "attr": p.regex_attr,
                    "flags": p.regex_flags,
                }
                for source, p in self.patterns.items()
            }
        )

        self.exclude_matchers = defaultdict(
            list
        )  # Will contain all the exclusion matchers
        self.assign_matchers = defaultdict(list)  # Will contain all the assign matchers

        # Will contain the reduce mode (for each source and assign matcher)
        self.reduce_mode = {}

        # Will contain the name of the assign matcher from which
        # entity will be replaced (for each source)
        self.replace_key = {}

        for source, p in self.patterns.items():
            p = p.dict()

            for exclude in p["exclude"]:
                exclude_matcher = RegexMatcher(
                    attr=exclude["regex_attr"] or p["regex_attr"] or self.attr,
                    flags=exclude["regex_flags"]
                    or p["regex_flags"]
                    or self.regex_flags,
                    ignore_excluded=ignore_excluded,
                    ignore_space_tokens=ignore_space_tokens,
                    alignment_mode="expand",
                )

                exclude_matcher.build_patterns(regex={"exclude": exclude["regex"]})

                self.exclude_matchers[source].append(
                    dict(
                        matcher=exclude_matcher,
                        window=exclude["window"],
                        limit_to_sentence=exclude["limit_to_sentence"],
                    )
                )

            replace_key = None

            for assign in p["assign"]:
                assign_matcher = RegexMatcher(
                    attr=assign["regex_attr"] or p["regex_attr"] or self.attr,
                    flags=assign["regex_flags"] or p["regex_flags"] or self.regex_flags,
                    ignore_excluded=ignore_excluded,
                    ignore_space_tokens=ignore_space_tokens,
                    alignment_mode=alignment_mode,
                    span_from_group=True,
                )

                assign_matcher.build_patterns(
                    regex={assign["name"]: assign["regex"]},
                )

                self.assign_matchers[source].append(
                    dict(
                        name=assign["name"],
                        matcher=assign_matcher,
                        window=assign["window"],
                        limit_to_sentence=assign["limit_to_sentence"],
                        replace_entity=assign["replace_entity"],
                        reduce_mode=assign["reduce_mode"],
                    )
                )

                if assign["replace_entity"]:
                    # We know that there is only one assign name
                    # with `replace_entity==True`
                    # from PyDantic validation
                    replace_key = assign["name"]

            self.replace_key[source] = replace_key

            self.reduce_mode[source] = {
                d["name"]: d["reduce_mode"] for d in self.assign_matchers[source]
            }

        self.set_extensions()

    @classmethod
    def set_extensions(cls) -> None:
        if not Span.has_extension("assigned"):
            Span.set_extension("assigned", default=dict())
        if not Span.has_extension("source"):
            Span.set_extension("source", default=None)

    def filter_one(self, span: Span) -> Span:
        """
        Filter extracted entity based on the "exclusion filter" mentionned
        in the configuration

        Parameters
        ----------
        span : Span
            Span to filter

        Returns
        -------
        Optional[Span]
            None if the span was filtered, the span else
        """
        source = span.label_
        to_keep = True
        for matcher in self.exclude_matchers[source]:
            window = matcher["window"]
            limit_to_sentence = matcher["limit_to_sentence"]
            snippet = get_window(
                doclike=span,
                window=window,
                limit_to_sentence=limit_to_sentence,
            )

            if (
                next(
                    matcher["matcher"](snippet, as_spans=True),
                    None,
                )
                is not None
            ):
                to_keep = False
                logger.trace(f"Entity {span} was filtered out")
                break

        if to_keep:
            return span

    def assign_one(self, span: Span) -> Span:
        """
        Get additional information in the context
        of each entity. This function will populate two custom attributes:

        - `ent._.source`
        - `ent._.assigned`, a dictionary with all retrieved information

        Parameters
        ----------
        span : Span
            Span to enrich

        Returns
        -------
        Span
            Span with additional information
        """

        if span is None:
            yield from []
            return

        source = span.label_
        assigned_dict = models.AssignDict(reduce_mode=self.reduce_mode[source])
        replace_key = None

        for matcher in self.assign_matchers[source]:
            attr = self.patterns[source].regex_attr or matcher["matcher"].default_attr
            window = matcher["window"]
            limit_to_sentence = matcher["limit_to_sentence"]
            replace_entity = matcher["replace_entity"]  # Boolean

            snippet = get_window(
                doclike=span,
                window=window,
                limit_to_sentence=limit_to_sentence,
            )

            # Getting the matches
            assigned_list = list(matcher["matcher"].match(snippet))

            assigned_list = [
                (span, span)
                if not match.groups()
                else (
                    span,
                    create_span(
                        doclike=snippet,
                        start_char=match.start(0),
                        end_char=match.end(0),
                        key=matcher["matcher"].regex[0][0],
                        attr=matcher["matcher"].regex[0][2],
                        alignment_mode=matcher["matcher"].regex[0][5],
                        ignore_excluded=matcher["matcher"].regex[0][3],
                        ignore_space_tokens=matcher["matcher"].regex[0][4],
                    ),
                )
                for (span, match) in assigned_list
            ]

            # assigned_list now contains tuples with
            # - the first element being the span extracted from the group
            # - the second element being the full match

            if not assigned_list:  # No match was found
                continue

            for assigned in assigned_list:
                if assigned is None:
                    continue
                if replace_entity:
                    replace_key = assigned[1].label_

                # Using he overrid `__setitem__` method from AssignDict here:
                assigned_dict[assigned[1].label_] = {
                    "span": assigned[1],  # Full span
                    "value_span": assigned[0],  # Span of the group
                    "value_text": get_text(
                        assigned[0],
                        attr=attr,
                        ignore_excluded=self.ignore_excluded,
                    ),  # Text of the group
                }
                logger.trace(f"Assign key {matcher['name']} matched on entity {span}")
        if replace_key is None and self.replace_key[source] is not None:
            # There should have been a replacement, but none was found
            # So we discard the entity
            yield from []
            return

        # Entity replacement
        if replace_key is not None:
            replacables = assigned_dict[replace_key]["span"]
            kept_ents = (
                replacables if isinstance(replacables, list) else [replacables]
            ).copy()

            if self.include_assigned:
                # We look for the closest
                closest = min(
                    kept_ents,
                    key=lambda e: abs(e.start - span.start),
                )
                kept_ents.remove(closest)

                expandables = flatten(
                    [a["span"] for k, a in assigned_dict.items() if k != replace_key]
                ) + [span, closest]

                closest = Span(
                    span.doc,
                    min(expandables, key=attrgetter("start")).start,
                    max(expandables, key=attrgetter("end")).end,
                    span.label_,
                )

                kept_ents.append(closest)
                kept_ents.sort(key=attrgetter("start"))

            for replaced in kept_ents:
                # Propagating attributes from the anchor
                replaced._.source = source
                replaced.label_ = self.name

        else:
            # Entity expansion
            expandables = flatten([a["span"] for a in assigned_dict.values()])

            if self.include_assigned and expandables:
                span = Span(
                    span.doc,
                    min(expandables + [span], key=attrgetter("start")).start,
                    max(expandables + [span], key=attrgetter("end")).end,
                    span.label_,
                )

            span._.source = source
            span.label_ = self.name
            kept_ents = [span]

        key = "value_span" if self.assign_as_span else "value_text"

        for idx, e in enumerate(kept_ents):
            e._.assigned = {
                k: v[key][idx]
                if ((k == replace_key) and self.reduce_mode[source][k] is None)
                else v[key]
                for k, v in assigned_dict.items()
            }

        yield from kept_ents

    def process_one(self, span):
        filtered = self.filter_one(span)
        yield from self.assign_one(filtered)

    def process(self, doc: Doc) -> List[Span]:
        """
        Process the document, looking for named entities.

        Parameters
        ----------
        doc : Doc
            spaCy Doc object

        Returns
        -------
        List[Span]
            List of detected spans.
        """

        matches = self.phrase_matcher(doc, as_spans=True)
        regex_matches = self.regex_matcher(doc, as_spans=True)

        spans = (*matches, *regex_matches)
        for span in spans:
            yield from self.process_one(span)

    def __call__(self, doc: Doc) -> Doc:
        """
        Adds spans to document.

        Parameters
        ----------
        doc:
            spaCy Doc object

        Returns
        -------
        doc:
            spaCy Doc object, annotated for extracted terms.
        """

        ents = list(self.process(doc))

        doc.spans[self.name] = ents

        ents, discarded = filter_spans(list(doc.ents) + ents, return_discarded=True)

        doc.ents = ents

        if "discarded" not in doc.spans:
            doc.spans["discarded"] = []
        doc.spans["discarded"].extend(discarded)

        return doc

filter_one(span)

Filter extracted entity based on the "exclusion filter" mentionned in the configuration

PARAMETER DESCRIPTION
span

Span to filter

TYPE: Span

RETURNS DESCRIPTION
Optional[Span]

None if the span was filtered, the span else

Source code in edsnlp/pipelines/core/contextual_matcher/contextual_matcher.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def filter_one(self, span: Span) -> Span:
    """
    Filter extracted entity based on the "exclusion filter" mentionned
    in the configuration

    Parameters
    ----------
    span : Span
        Span to filter

    Returns
    -------
    Optional[Span]
        None if the span was filtered, the span else
    """
    source = span.label_
    to_keep = True
    for matcher in self.exclude_matchers[source]:
        window = matcher["window"]
        limit_to_sentence = matcher["limit_to_sentence"]
        snippet = get_window(
            doclike=span,
            window=window,
            limit_to_sentence=limit_to_sentence,
        )

        if (
            next(
                matcher["matcher"](snippet, as_spans=True),
                None,
            )
            is not None
        ):
            to_keep = False
            logger.trace(f"Entity {span} was filtered out")
            break

    if to_keep:
        return span

assign_one(span)

Get additional information in the context of each entity. This function will populate two custom attributes:

  • ent._.source
  • ent._.assigned, a dictionary with all retrieved information
PARAMETER DESCRIPTION
span

Span to enrich

TYPE: Span

RETURNS DESCRIPTION
Span

Span with additional information

Source code in edsnlp/pipelines/core/contextual_matcher/contextual_matcher.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def assign_one(self, span: Span) -> Span:
    """
    Get additional information in the context
    of each entity. This function will populate two custom attributes:

    - `ent._.source`
    - `ent._.assigned`, a dictionary with all retrieved information

    Parameters
    ----------
    span : Span
        Span to enrich

    Returns
    -------
    Span
        Span with additional information
    """

    if span is None:
        yield from []
        return

    source = span.label_
    assigned_dict = models.AssignDict(reduce_mode=self.reduce_mode[source])
    replace_key = None

    for matcher in self.assign_matchers[source]:
        attr = self.patterns[source].regex_attr or matcher["matcher"].default_attr
        window = matcher["window"]
        limit_to_sentence = matcher["limit_to_sentence"]
        replace_entity = matcher["replace_entity"]  # Boolean

        snippet = get_window(
            doclike=span,
            window=window,
            limit_to_sentence=limit_to_sentence,
        )

        # Getting the matches
        assigned_list = list(matcher["matcher"].match(snippet))

        assigned_list = [
            (span, span)
            if not match.groups()
            else (
                span,
                create_span(
                    doclike=snippet,
                    start_char=match.start(0),
                    end_char=match.end(0),
                    key=matcher["matcher"].regex[0][0],
                    attr=matcher["matcher"].regex[0][2],
                    alignment_mode=matcher["matcher"].regex[0][5],
                    ignore_excluded=matcher["matcher"].regex[0][3],
                    ignore_space_tokens=matcher["matcher"].regex[0][4],
                ),
            )
            for (span, match) in assigned_list
        ]

        # assigned_list now contains tuples with
        # - the first element being the span extracted from the group
        # - the second element being the full match

        if not assigned_list:  # No match was found
            continue

        for assigned in assigned_list:
            if assigned is None:
                continue
            if replace_entity:
                replace_key = assigned[1].label_

            # Using he overrid `__setitem__` method from AssignDict here:
            assigned_dict[assigned[1].label_] = {
                "span": assigned[1],  # Full span
                "value_span": assigned[0],  # Span of the group
                "value_text": get_text(
                    assigned[0],
                    attr=attr,
                    ignore_excluded=self.ignore_excluded,
                ),  # Text of the group
            }
            logger.trace(f"Assign key {matcher['name']} matched on entity {span}")
    if replace_key is None and self.replace_key[source] is not None:
        # There should have been a replacement, but none was found
        # So we discard the entity
        yield from []
        return

    # Entity replacement
    if replace_key is not None:
        replacables = assigned_dict[replace_key]["span"]
        kept_ents = (
            replacables if isinstance(replacables, list) else [replacables]
        ).copy()

        if self.include_assigned:
            # We look for the closest
            closest = min(
                kept_ents,
                key=lambda e: abs(e.start - span.start),
            )
            kept_ents.remove(closest)

            expandables = flatten(
                [a["span"] for k, a in assigned_dict.items() if k != replace_key]
            ) + [span, closest]

            closest = Span(
                span.doc,
                min(expandables, key=attrgetter("start")).start,
                max(expandables, key=attrgetter("end")).end,
                span.label_,
            )

            kept_ents.append(closest)
            kept_ents.sort(key=attrgetter("start"))

        for replaced in kept_ents:
            # Propagating attributes from the anchor
            replaced._.source = source
            replaced.label_ = self.name

    else:
        # Entity expansion
        expandables = flatten([a["span"] for a in assigned_dict.values()])

        if self.include_assigned and expandables:
            span = Span(
                span.doc,
                min(expandables + [span], key=attrgetter("start")).start,
                max(expandables + [span], key=attrgetter("end")).end,
                span.label_,
            )

        span._.source = source
        span.label_ = self.name
        kept_ents = [span]

    key = "value_span" if self.assign_as_span else "value_text"

    for idx, e in enumerate(kept_ents):
        e._.assigned = {
            k: v[key][idx]
            if ((k == replace_key) and self.reduce_mode[source][k] is None)
            else v[key]
            for k, v in assigned_dict.items()
        }

    yield from kept_ents

process(doc)

Process the document, looking for named entities.

PARAMETER DESCRIPTION
doc

spaCy Doc object

TYPE: Doc

RETURNS DESCRIPTION
List[Span]

List of detected spans.

Source code in edsnlp/pipelines/core/contextual_matcher/contextual_matcher.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def process(self, doc: Doc) -> List[Span]:
    """
    Process the document, looking for named entities.

    Parameters
    ----------
    doc : Doc
        spaCy Doc object

    Returns
    -------
    List[Span]
        List of detected spans.
    """

    matches = self.phrase_matcher(doc, as_spans=True)
    regex_matches = self.regex_matcher(doc, as_spans=True)

    spans = (*matches, *regex_matches)
    for span in spans:
        yield from self.process_one(span)

__call__(doc)

Adds spans to document.

PARAMETER DESCRIPTION
doc

spaCy Doc object

TYPE: Doc

RETURNS DESCRIPTION
doc

spaCy Doc object, annotated for extracted terms.

TYPE: Doc

Source code in edsnlp/pipelines/core/contextual_matcher/contextual_matcher.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def __call__(self, doc: Doc) -> Doc:
    """
    Adds spans to document.

    Parameters
    ----------
    doc:
        spaCy Doc object

    Returns
    -------
    doc:
        spaCy Doc object, annotated for extracted terms.
    """

    ents = list(self.process(doc))

    doc.spans[self.name] = ents

    ents, discarded = filter_spans(list(doc.ents) + ents, return_discarded=True)

    doc.ents = ents

    if "discarded" not in doc.spans:
        doc.spans["discarded"] = []
    doc.spans["discarded"].extend(discarded)

    return doc