Skip to content

edsnlp.matchers.simstring

SimstringWriter

Source code in edsnlp/matchers/simstring.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SimstringWriter:
    def __init__(self, path: Union[str, Path]):
        """
        A context class to write a simstring database

        Parameters
        ----------
        path: Union[str, Path]
            Path to database
        """
        os.makedirs(path, exist_ok=True)
        self.path = path

    def __enter__(self):
        path = os.path.join(self.path, "terms.simstring")
        self.db = simstring.writer(path, 3, False, True)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.db.close()

    def insert(self, term):
        self.db.insert(term)

__init__(path)

A context class to write a simstring database

PARAMETER DESCRIPTION
path

Path to database

TYPE: Union[str, Path]

Source code in edsnlp/matchers/simstring.py
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, path: Union[str, Path]):
    """
    A context class to write a simstring database

    Parameters
    ----------
    path: Union[str, Path]
        Path to database
    """
    os.makedirs(path, exist_ok=True)
    self.path = path

SimstringMatcher

Source code in edsnlp/matchers/simstring.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class SimstringMatcher:
    def __init__(
        self,
        vocab: Vocab,
        path: Optional[Union[Path, str]] = None,
        measure: SimilarityMeasure = SimilarityMeasure.dice,
        threshold: float = 0.75,
        windows: int = 5,
        ignore_excluded: bool = False,
        ignore_space_tokens: bool = False,
        attr: str = "NORM",
    ):
        """
        PhraseMatcher that allows to skip excluded tokens.
        Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

        Parameters
        ----------
        vocab : Vocab
            spaCy vocabulary to match on.
        path: Optional[Union[Path, str]]
            Path where we will store the precomputed patterns
        measure: SimilarityMeasure
            Name of the similarity measure.
            One of [jaccard, dice, overlap, cosine]
        windows: int
            Maximum number of words in a candidate span
        threshold: float
            Minimum similarity value to match a concept's synonym
        ignore_excluded : Optional[bool]
            Whether to exclude tokens that have an EXCLUDED tag, by default False
        ignore_space_tokens : Optional[bool]
            Whether to exclude tokens that have a "SPACE" tag, by default False
        attr : str
            Default attribute to match on, by default "TEXT".
            Can be overridden in the `add` method.
            To match on a custom attribute, prepend the attribute name with `_`.
        """

        assert measure in (
            SimilarityMeasure.jaccard,
            SimilarityMeasure.dice,
            SimilarityMeasure.overlap,
            SimilarityMeasure.cosine,
        )

        self.vocab = vocab
        self.windows = windows
        self.measure = measure
        self.threshold = threshold
        self.ignore_excluded = ignore_excluded
        self.ignore_space_tokens = ignore_space_tokens
        self.attr = attr

        if path is None:
            path = tempfile.mkdtemp()
        self.path = Path(path)

        self.ss_reader = None
        self.syn2cuis = None

    def build_patterns(
        self, nlp: Language, terms: Dict[str, Iterable[str]], progress: bool = False
    ):
        """
        Build patterns and adds them for matching.

        Parameters
        ----------
        nlp : Language
            The instance of the spaCy language class.
        terms : Patterns
            Dictionary of label/terms, or label/dictionary of terms/attribute.
        progress: bool
            Whether to track progress when preprocessing terms
        """

        self.ss_reader = None
        self.syn2cuis = None

        syn2cuis = defaultdict(lambda: [])
        token_pipelines = [
            name
            for name, pipe in nlp.pipeline
            if any(
                "token" in assign and not assign == "token.is_sent_start"
                for assign in nlp.get_pipe_meta(name).assigns
            )
        ]
        with nlp.select_pipes(enable=token_pipelines):
            with SimstringWriter(self.path) as ss_db:
                for cui, synset in tqdm(terms.items()) if progress else terms.items():
                    for term in nlp.pipe(synset):
                        norm_text = get_text(
                            term,
                            self.attr,
                            ignore_excluded=self.ignore_excluded,
                            ignore_space_tokens=self.ignore_space_tokens,
                        )
                        term = "##" + norm_text + "##"
                        ss_db.insert(term)
                        syn2cuis[term].append(cui)
        syn2cuis = {term: tuple(sorted(set(cuis))) for term, cuis in syn2cuis.items()}
        with open(self.path / "cui-db.pkl", "wb") as f:
            pickle.dump(syn2cuis, f)

    def load(self):
        if self.ss_reader is None:
            self.ss_reader = simstring.reader(
                os.path.join(self.path, "terms.simstring")
            )
            self.ss_reader.measure = getattr(simstring, self.measure)
            self.ss_reader.threshold = self.threshold

            with open(os.path.join(self.path, "cui-db.pkl"), "rb") as f:
                self.syn2cuis = pickle.load(f)

    def __call__(self, doc, as_spans=False):
        self.load()

        root = getattr(doc, "doc", doc)
        if root.has_annotation("IS_SENT_START"):
            sents = tuple(doc.sents)
        else:
            sents = (doc,)

        ents: List[Tuple[str, int, int, float]] = []

        for sent in sents:
            text, offsets = get_text_and_offsets(
                doclike=sent,
                attr=self.attr,
                ignore_excluded=self.ignore_excluded,
            )
            sent_start = getattr(sent, "start", 0)
            for size in range(1, self.windows):
                for i in range(0, len(offsets) - size):
                    begin_char, _, begin_i, _ = offsets[i]
                    _, end_char, _, end_i = offsets[i + size]
                    span_text = "##" + text[begin_char:end_char] + "##"
                    matches = self.ss_reader.retrieve(span_text)
                    for res in matches:
                        sim = _similarity(span_text, res, measure=self.measure)
                        for cui in self.syn2cuis[res]:
                            ents.append(
                                (cui, begin_i + sent_start, end_i + sent_start, sim)
                            )

        sorted_spans = sorted(ents, key=simstring_sort_key, reverse=True)
        results = []
        seen_tokens = set()
        for span in sorted_spans:
            # Check for end - 1 here because boundaries are inclusive
            span_tokens = set(range(span[1], span[2]))
            if not (span_tokens & seen_tokens):
                results.append(span)
                seen_tokens.update(span_tokens)
        results = sorted(results, key=lambda span: span[1])
        if as_spans:
            spans = [
                Span(root, span_data[1], span_data[2], span_data[0])
                for span_data in results
            ]
            return spans
        else:
            return [(self.vocab.strings[span[0]], span[1], span[2]) for span in results]

__init__(vocab, path=None, measure=SimilarityMeasure.dice, threshold=0.75, windows=5, ignore_excluded=False, ignore_space_tokens=False, attr='NORM')

PhraseMatcher that allows to skip excluded tokens. Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

PARAMETER DESCRIPTION
vocab

spaCy vocabulary to match on.

TYPE: Vocab

path

Path where we will store the precomputed patterns

TYPE: Optional[Union[Path, str]] DEFAULT: None

measure

Name of the similarity measure. One of [jaccard, dice, overlap, cosine]

TYPE: SimilarityMeasure DEFAULT: SimilarityMeasure.dice

windows

Maximum number of words in a candidate span

TYPE: int DEFAULT: 5

threshold

Minimum similarity value to match a concept's synonym

TYPE: float DEFAULT: 0.75

ignore_excluded

Whether to exclude tokens that have an EXCLUDED tag, by default False

TYPE: Optional[bool] DEFAULT: False

ignore_space_tokens

Whether to exclude tokens that have a "SPACE" tag, by default False

TYPE: Optional[bool] DEFAULT: False

attr

Default attribute to match on, by default "TEXT". Can be overridden in the add method. To match on a custom attribute, prepend the attribute name with _.

TYPE: str DEFAULT: 'NORM'

Source code in edsnlp/matchers/simstring.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    vocab: Vocab,
    path: Optional[Union[Path, str]] = None,
    measure: SimilarityMeasure = SimilarityMeasure.dice,
    threshold: float = 0.75,
    windows: int = 5,
    ignore_excluded: bool = False,
    ignore_space_tokens: bool = False,
    attr: str = "NORM",
):
    """
    PhraseMatcher that allows to skip excluded tokens.
    Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

    Parameters
    ----------
    vocab : Vocab
        spaCy vocabulary to match on.
    path: Optional[Union[Path, str]]
        Path where we will store the precomputed patterns
    measure: SimilarityMeasure
        Name of the similarity measure.
        One of [jaccard, dice, overlap, cosine]
    windows: int
        Maximum number of words in a candidate span
    threshold: float
        Minimum similarity value to match a concept's synonym
    ignore_excluded : Optional[bool]
        Whether to exclude tokens that have an EXCLUDED tag, by default False
    ignore_space_tokens : Optional[bool]
        Whether to exclude tokens that have a "SPACE" tag, by default False
    attr : str
        Default attribute to match on, by default "TEXT".
        Can be overridden in the `add` method.
        To match on a custom attribute, prepend the attribute name with `_`.
    """

    assert measure in (
        SimilarityMeasure.jaccard,
        SimilarityMeasure.dice,
        SimilarityMeasure.overlap,
        SimilarityMeasure.cosine,
    )

    self.vocab = vocab
    self.windows = windows
    self.measure = measure
    self.threshold = threshold
    self.ignore_excluded = ignore_excluded
    self.ignore_space_tokens = ignore_space_tokens
    self.attr = attr

    if path is None:
        path = tempfile.mkdtemp()
    self.path = Path(path)

    self.ss_reader = None
    self.syn2cuis = None

build_patterns(nlp, terms, progress=False)

Build patterns and adds them for matching.

PARAMETER DESCRIPTION
nlp

The instance of the spaCy language class.

TYPE: Language

terms

Dictionary of label/terms, or label/dictionary of terms/attribute.

TYPE: Patterns

progress

Whether to track progress when preprocessing terms

TYPE: bool DEFAULT: False

Source code in edsnlp/matchers/simstring.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def build_patterns(
    self, nlp: Language, terms: Dict[str, Iterable[str]], progress: bool = False
):
    """
    Build patterns and adds them for matching.

    Parameters
    ----------
    nlp : Language
        The instance of the spaCy language class.
    terms : Patterns
        Dictionary of label/terms, or label/dictionary of terms/attribute.
    progress: bool
        Whether to track progress when preprocessing terms
    """

    self.ss_reader = None
    self.syn2cuis = None

    syn2cuis = defaultdict(lambda: [])
    token_pipelines = [
        name
        for name, pipe in nlp.pipeline
        if any(
            "token" in assign and not assign == "token.is_sent_start"
            for assign in nlp.get_pipe_meta(name).assigns
        )
    ]
    with nlp.select_pipes(enable=token_pipelines):
        with SimstringWriter(self.path) as ss_db:
            for cui, synset in tqdm(terms.items()) if progress else terms.items():
                for term in nlp.pipe(synset):
                    norm_text = get_text(
                        term,
                        self.attr,
                        ignore_excluded=self.ignore_excluded,
                        ignore_space_tokens=self.ignore_space_tokens,
                    )
                    term = "##" + norm_text + "##"
                    ss_db.insert(term)
                    syn2cuis[term].append(cui)
    syn2cuis = {term: tuple(sorted(set(cuis))) for term, cuis in syn2cuis.items()}
    with open(self.path / "cui-db.pkl", "wb") as f:
        pickle.dump(syn2cuis, f)

get_text_and_offsets(doclike, attr='TEXT', ignore_excluded=True, ignore_space_tokens=True) cached

Align different representations of a Doc or Span object.

PARAMETER DESCRIPTION
doclike

spaCy Doc or Span object

TYPE: Doc

attr

Attribute to use, by default "TEXT"

TYPE: str, optional DEFAULT: 'TEXT'

ignore_excluded

Whether to remove excluded tokens, by default True

TYPE: bool DEFAULT: True

ignore_space_tokens

Whether to remove space tokens, by default False

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Tuple[str, List[Tuple[int, int, int, int]]]

The new clean text and offset tuples for each word giving the begin char indice of the word in the new text, the end char indice of its preceding word and the begin / end indices of the word in the original document

Source code in edsnlp/matchers/simstring.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@lru_cache(maxsize=128)
def get_text_and_offsets(
    doclike: Union[Span, Doc],
    attr: str = "TEXT",
    ignore_excluded: bool = True,
    ignore_space_tokens: bool = True,
) -> Tuple[str, List[Tuple[int, int, int]]]:
    """
    Align different representations of a `Doc` or `Span` object.

    Parameters
    ----------
    doclike : Doc
        spaCy `Doc` or `Span` object
    attr : str, optional
        Attribute to use, by default `"TEXT"`
    ignore_excluded : bool
        Whether to remove excluded tokens, by default True
    ignore_space_tokens : bool
        Whether to remove space tokens, by default False


    Returns
    -------
    Tuple[str, List[Tuple[int, int, int, int]]]
        The new clean text and offset tuples for each word giving the begin char indice
        of the word in the new text, the end char indice of its preceding word and the
        begin / end indices of the word in the original document
    """
    attr = attr.upper()
    attr = ATTRIBUTES.get(attr, attr)

    custom = attr.startswith("_")

    if custom:
        attr = attr[1:].lower()

    offsets = []

    cursor = 0

    text = []

    last = cursor
    last_i = 0
    for i, token in enumerate(doclike):

        if (not ignore_excluded or token.tag_ != "EXCLUDED") and (
            not ignore_space_tokens or token.tag_ != "SPACE"
        ):
            if custom:
                token_text = getattr(token._, attr)
            else:
                token_text = getattr(token, attr)

            # We add the cursor
            end = cursor + len(token_text)
            offsets.append((cursor, last, i, last_i + 1))

            cursor = end
            last = end
            last_i = i

            text.append(token_text)

            if token.whitespace_:
                cursor += 1
                text.append(" ")

    offsets.append((cursor, last, len(doclike), last_i + 1))

    return "".join(text), offsets