Skip to content

edsnlp.matchers.simstring

SimstringWriter

Source code in edsnlp/matchers/simstring.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SimstringWriter:
    def __init__(self, path: Union[str, Path]):
        """
        A context class to write a simstring database

        Parameters
        ----------
        path: Union[str, Path]
            Path to database
        """
        os.makedirs(path, exist_ok=True)
        self.path = path

    def __enter__(self):
        path = os.path.join(self.path, "terms.simstring")
        self.db = simstring.writer(path, 3, False, True)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.db.close()

    def insert(self, term):
        self.db.insert(term)

path = path instance-attribute

__init__(path)

A context class to write a simstring database

PARAMETER DESCRIPTION
path

Path to database

TYPE: Union[str, Path]

Source code in edsnlp/matchers/simstring.py
20
21
22
23
24
25
26
27
28
29
30
def __init__(self, path: Union[str, Path]):
    """
    A context class to write a simstring database

    Parameters
    ----------
    path: Union[str, Path]
        Path to database
    """
    os.makedirs(path, exist_ok=True)
    self.path = path

__enter__()

Source code in edsnlp/matchers/simstring.py
32
33
34
35
def __enter__(self):
    path = os.path.join(self.path, "terms.simstring")
    self.db = simstring.writer(path, 3, False, True)
    return self

__exit__(exc_type, exc_val, exc_tb)

Source code in edsnlp/matchers/simstring.py
37
38
def __exit__(self, exc_type, exc_val, exc_tb):
    self.db.close()

insert(term)

Source code in edsnlp/matchers/simstring.py
40
41
def insert(self, term):
    self.db.insert(term)

SimilarityMeasure

Bases: str, Enum

Source code in edsnlp/matchers/simstring.py
44
45
46
47
48
class SimilarityMeasure(str, Enum):
    jaccard = "jaccard"
    dice = "dice"
    overlap = "overlap"
    cosine = "cosine"

jaccard = 'jaccard' class-attribute

dice = 'dice' class-attribute

overlap = 'overlap' class-attribute

cosine = 'cosine' class-attribute

SimstringMatcher

Source code in edsnlp/matchers/simstring.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class SimstringMatcher:
    def __init__(
        self,
        vocab: Vocab,
        path: Optional[Union[Path, str]] = None,
        measure: SimilarityMeasure = SimilarityMeasure.dice,
        threshold: float = 0.75,
        windows: int = 5,
        ignore_excluded: bool = False,
        attr: str = "NORM",
    ):
        """
        PhraseMatcher that allows to skip excluded tokens.
        Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

        Parameters
        ----------
        vocab : Vocab
            spaCy vocabulary to match on.
        path: Optional[Union[Path, str]]
            Path where we will store the precomputed patterns
        measure: SimilarityMeasure
            Name of the similarity measure.
            One of [jaccard, dice, overlap, cosine]
        windows: int
            Maximum number of words in a candidate span
        threshold: float
            Minimum similarity value to match a concept's synonym
        ignore_excluded : bool, optional
            Whether to exclude tokens that have a "SPACE" tag, by default False
        attr : str
            Default attribute to match on, by default "TEXT".
            Can be overridden in the `add` method.
            To match on a custom attribute, prepend the attribute name with `_`.
        """

        assert measure in (
            SimilarityMeasure.jaccard,
            SimilarityMeasure.dice,
            SimilarityMeasure.overlap,
            SimilarityMeasure.cosine,
        )

        self.vocab = vocab
        self.windows = windows
        self.measure = measure
        self.threshold = threshold
        self.ignore_excluded = ignore_excluded
        self.attr = attr

        if path is None:
            path = tempfile.mkdtemp()
        self.path = Path(path)

        self.ss_reader = None
        self.syn2cuis = None

    def build_patterns(self, nlp: Language, terms: Dict[str, Iterable[str]]):
        """
        Build patterns and adds them for matching.

        Parameters
        ----------
        nlp : Language
            The instance of the spaCy language class.
        terms : Patterns
            Dictionary of label/terms, or label/dictionary of terms/attribute.
        """

        self.ss_reader = None
        self.syn2cuis = None

        syn2cuis = defaultdict(lambda: [])
        token_pipelines = [
            name
            for name, pipe in nlp.pipeline
            if any(
                "token" in assign and not assign == "token.is_sent_start"
                for assign in nlp.get_pipe_meta(name).assigns
            )
        ]
        with nlp.select_pipes(enable=token_pipelines):
            with SimstringWriter(self.path) as ss_db:
                for cui, synset in tqdm(terms.items()):
                    for term in nlp.pipe(synset):
                        norm_text = get_text(
                            term, self.attr, ignore_excluded=self.ignore_excluded
                        )
                        term = "##" + norm_text + "##"
                        ss_db.insert(term)
                        syn2cuis[term].append(cui)
        syn2cuis = {term: tuple(sorted(set(cuis))) for term, cuis in syn2cuis.items()}
        with open(self.path / "cui-db.pkl", "wb") as f:
            pickle.dump(syn2cuis, f)

    def load(self):
        if self.ss_reader is None:
            self.ss_reader = simstring.reader(
                os.path.join(self.path, "terms.simstring")
            )
            self.ss_reader.measure = getattr(simstring, self.measure)
            self.ss_reader.threshold = self.threshold

            with open(os.path.join(self.path, "cui-db.pkl"), "rb") as f:
                self.syn2cuis = pickle.load(f)

    def __call__(self, doc, as_spans=False):
        self.load()

        root = getattr(doc, "doc", doc)
        if root.has_annotation("IS_SENT_START"):
            sents = tuple(doc.sents)
        else:
            sents = (doc,)

        ents: List[Tuple[str, int, int, float]] = []

        for sent in sents:
            text, offsets = get_text_and_offsets(
                doclike=sent,
                attr=self.attr,
                ignore_excluded=self.ignore_excluded,
            )
            sent_start = getattr(sent, "start", 0)
            for size in range(1, self.windows):
                for i in range(0, len(offsets) - size):
                    begin_char, _, begin_i = offsets[i]
                    _, end_char, end_i = offsets[i + size]
                    span_text = "##" + text[begin_char:end_char] + "##"
                    matches = self.ss_reader.retrieve(span_text)
                    for res in matches:
                        sim = similarity(span_text, res, measure=self.measure)
                        for cui in self.syn2cuis[res]:
                            ents.append(
                                (cui, begin_i + sent_start, end_i + sent_start, sim)
                            )

        sorted_spans = sorted(ents, key=simstring_sort_key, reverse=True)
        results = []
        seen_tokens = set()
        for span in sorted_spans:
            # Check for end - 1 here because boundaries are inclusive
            span_tokens = set(range(span[1], span[2]))
            if not (span_tokens & seen_tokens):
                results.append(span)
                seen_tokens.update(span_tokens)
        results = sorted(results, key=lambda span: span[1])
        if as_spans:
            spans = [
                Span(root, span_data[1], span_data[2], span_data[0])
                for span_data in results
            ]
            return spans
        else:
            return [(self.vocab.strings[span[0]], span[1], span[2]) for span in results]

vocab = vocab instance-attribute

windows = windows instance-attribute

measure = measure instance-attribute

threshold = threshold instance-attribute

ignore_excluded = ignore_excluded instance-attribute

attr = attr instance-attribute

path = Path(path) instance-attribute

ss_reader = None instance-attribute

syn2cuis = None instance-attribute

__init__(vocab, path=None, measure=SimilarityMeasure.dice, threshold=0.75, windows=5, ignore_excluded=False, attr='NORM')

PhraseMatcher that allows to skip excluded tokens. Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

PARAMETER DESCRIPTION
vocab

spaCy vocabulary to match on.

TYPE: Vocab

path

Path where we will store the precomputed patterns

TYPE: Optional[Union[Path, str]] DEFAULT: None

measure

Name of the similarity measure. One of [jaccard, dice, overlap, cosine]

TYPE: SimilarityMeasure DEFAULT: SimilarityMeasure.dice

windows

Maximum number of words in a candidate span

TYPE: int DEFAULT: 5

threshold

Minimum similarity value to match a concept's synonym

TYPE: float DEFAULT: 0.75

ignore_excluded

Whether to exclude tokens that have a "SPACE" tag, by default False

TYPE: bool, optional DEFAULT: False

attr

Default attribute to match on, by default "TEXT". Can be overridden in the add method. To match on a custom attribute, prepend the attribute name with _.

TYPE: str DEFAULT: 'NORM'

Source code in edsnlp/matchers/simstring.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(
    self,
    vocab: Vocab,
    path: Optional[Union[Path, str]] = None,
    measure: SimilarityMeasure = SimilarityMeasure.dice,
    threshold: float = 0.75,
    windows: int = 5,
    ignore_excluded: bool = False,
    attr: str = "NORM",
):
    """
    PhraseMatcher that allows to skip excluded tokens.
    Heavily inspired by https://github.com/Georgetown-IR-Lab/QuickUMLS

    Parameters
    ----------
    vocab : Vocab
        spaCy vocabulary to match on.
    path: Optional[Union[Path, str]]
        Path where we will store the precomputed patterns
    measure: SimilarityMeasure
        Name of the similarity measure.
        One of [jaccard, dice, overlap, cosine]
    windows: int
        Maximum number of words in a candidate span
    threshold: float
        Minimum similarity value to match a concept's synonym
    ignore_excluded : bool, optional
        Whether to exclude tokens that have a "SPACE" tag, by default False
    attr : str
        Default attribute to match on, by default "TEXT".
        Can be overridden in the `add` method.
        To match on a custom attribute, prepend the attribute name with `_`.
    """

    assert measure in (
        SimilarityMeasure.jaccard,
        SimilarityMeasure.dice,
        SimilarityMeasure.overlap,
        SimilarityMeasure.cosine,
    )

    self.vocab = vocab
    self.windows = windows
    self.measure = measure
    self.threshold = threshold
    self.ignore_excluded = ignore_excluded
    self.attr = attr

    if path is None:
        path = tempfile.mkdtemp()
    self.path = Path(path)

    self.ss_reader = None
    self.syn2cuis = None

build_patterns(nlp, terms)

Build patterns and adds them for matching.

PARAMETER DESCRIPTION
nlp

The instance of the spaCy language class.

TYPE: Language

terms

Dictionary of label/terms, or label/dictionary of terms/attribute.

TYPE: Patterns

Source code in edsnlp/matchers/simstring.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def build_patterns(self, nlp: Language, terms: Dict[str, Iterable[str]]):
    """
    Build patterns and adds them for matching.

    Parameters
    ----------
    nlp : Language
        The instance of the spaCy language class.
    terms : Patterns
        Dictionary of label/terms, or label/dictionary of terms/attribute.
    """

    self.ss_reader = None
    self.syn2cuis = None

    syn2cuis = defaultdict(lambda: [])
    token_pipelines = [
        name
        for name, pipe in nlp.pipeline
        if any(
            "token" in assign and not assign == "token.is_sent_start"
            for assign in nlp.get_pipe_meta(name).assigns
        )
    ]
    with nlp.select_pipes(enable=token_pipelines):
        with SimstringWriter(self.path) as ss_db:
            for cui, synset in tqdm(terms.items()):
                for term in nlp.pipe(synset):
                    norm_text = get_text(
                        term, self.attr, ignore_excluded=self.ignore_excluded
                    )
                    term = "##" + norm_text + "##"
                    ss_db.insert(term)
                    syn2cuis[term].append(cui)
    syn2cuis = {term: tuple(sorted(set(cuis))) for term, cuis in syn2cuis.items()}
    with open(self.path / "cui-db.pkl", "wb") as f:
        pickle.dump(syn2cuis, f)

load()

Source code in edsnlp/matchers/simstring.py
146
147
148
149
150
151
152
153
154
155
def load(self):
    if self.ss_reader is None:
        self.ss_reader = simstring.reader(
            os.path.join(self.path, "terms.simstring")
        )
        self.ss_reader.measure = getattr(simstring, self.measure)
        self.ss_reader.threshold = self.threshold

        with open(os.path.join(self.path, "cui-db.pkl"), "rb") as f:
            self.syn2cuis = pickle.load(f)

__call__(doc, as_spans=False)

Source code in edsnlp/matchers/simstring.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def __call__(self, doc, as_spans=False):
    self.load()

    root = getattr(doc, "doc", doc)
    if root.has_annotation("IS_SENT_START"):
        sents = tuple(doc.sents)
    else:
        sents = (doc,)

    ents: List[Tuple[str, int, int, float]] = []

    for sent in sents:
        text, offsets = get_text_and_offsets(
            doclike=sent,
            attr=self.attr,
            ignore_excluded=self.ignore_excluded,
        )
        sent_start = getattr(sent, "start", 0)
        for size in range(1, self.windows):
            for i in range(0, len(offsets) - size):
                begin_char, _, begin_i = offsets[i]
                _, end_char, end_i = offsets[i + size]
                span_text = "##" + text[begin_char:end_char] + "##"
                matches = self.ss_reader.retrieve(span_text)
                for res in matches:
                    sim = similarity(span_text, res, measure=self.measure)
                    for cui in self.syn2cuis[res]:
                        ents.append(
                            (cui, begin_i + sent_start, end_i + sent_start, sim)
                        )

    sorted_spans = sorted(ents, key=simstring_sort_key, reverse=True)
    results = []
    seen_tokens = set()
    for span in sorted_spans:
        # Check for end - 1 here because boundaries are inclusive
        span_tokens = set(range(span[1], span[2]))
        if not (span_tokens & seen_tokens):
            results.append(span)
            seen_tokens.update(span_tokens)
    results = sorted(results, key=lambda span: span[1])
    if as_spans:
        spans = [
            Span(root, span_data[1], span_data[2], span_data[0])
            for span_data in results
        ]
        return spans
    else:
        return [(self.vocab.strings[span[0]], span[1], span[2]) for span in results]

similarity(x, y, measure=SimilarityMeasure.dice)

Source code in edsnlp/matchers/simstring.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def similarity(x: str, y: str, measure: SimilarityMeasure = SimilarityMeasure.dice):

    x_ngrams = {x[i : i + 3] for i in range(0, len(x) - 3)}
    y_ngrams = {y[i : i + 3] for i in range(0, len(y) - 3)}

    if measure == SimilarityMeasure.jaccard:
        return len(x_ngrams & y_ngrams) / (len(x_ngrams | y_ngrams))

    if measure == SimilarityMeasure.dice:
        return 2 * len(x_ngrams & y_ngrams) / (len(x_ngrams) + len(y_ngrams))

    if measure == SimilarityMeasure.cosine:
        return len(x_ngrams & y_ngrams) / sqrt(len(x_ngrams) * len(y_ngrams))

    if measure == SimilarityMeasure.overlap:
        return len(x_ngrams & y_ngrams)

    raise ValueError("Cannot compute similarity {}".format(repr(measure)))

simstring_sort_key(span_data)

Source code in edsnlp/matchers/simstring.py
228
229
def simstring_sort_key(span_data: Tuple[str, int, int, float]):
    return span_data[3], span_data[2] - span_data[1], -span_data[1]

get_text_and_offsets(doclike, attr='TEXT', ignore_excluded=True)

Align different representations of a Doc or Span object.

PARAMETER DESCRIPTION
doclike

spaCy Doc or Span object

TYPE: Doc

attr

Attribute to use, by default "TEXT"

TYPE: str, optional DEFAULT: 'TEXT'

ignore_excluded

Whether to remove excluded tokens, by default True

TYPE: bool, optional DEFAULT: True

RETURNS DESCRIPTION
Tuple[str, List[Tuple[int, int, int]]]

The new clean text and offset tuples for each word giving the begin char indice of the word in the new text, the end char indice of its preceding word and the indice of the word in the original document

Source code in edsnlp/matchers/simstring.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
@lru_cache(maxsize=128)
def get_text_and_offsets(
    doclike: Union[Span, Doc],
    attr: str = "TEXT",
    ignore_excluded: bool = True,
) -> Tuple[str, List[Tuple[int, int, int]]]:
    """
    Align different representations of a `Doc` or `Span` object.

    Parameters
    ----------
    doclike : Doc
        spaCy `Doc` or `Span` object
    attr : str, optional
        Attribute to use, by default `"TEXT"`
    ignore_excluded : bool, optional
        Whether to remove excluded tokens, by default True

    Returns
    -------
    Tuple[str, List[Tuple[int, int, int]]]
        The new clean text and offset tuples for each word giving the begin char indice
        of the word in the new text, the end char indice of its preceding word and the
        indice of the word in the original document
    """
    attr = attr.upper()
    attr = ATTRIBUTES.get(attr, attr)

    custom = attr.startswith("_")

    if custom:
        attr = attr[1:].lower()

    offsets = []

    cursor = 0

    text = []

    last = cursor
    for i, token in enumerate(doclike):

        if not ignore_excluded or not token._.excluded:
            if custom:
                token_text = getattr(token._, attr)
            else:
                token_text = getattr(token, attr)

            # We add the cursor
            end = cursor + len(token_text)
            offsets.append((cursor, last, i))

            cursor = end
            last = end

            text.append(token_text)

            if token.whitespace_:
                cursor += 1
                text.append(" ")

    offsets.append((cursor, last, len(doclike)))

    return "".join(text), offsets