Skip to content

Parquet

TLDR
import edsnlp

stream = edsnlp.data.read_parquet(path, converter="omop")
stream = stream.map_pipeline(nlp)
res = stream.to_parquet(path, converter="omop")
# or equivalently
edsnlp.data.to_parquet(stream, path, converter="omop")

We provide methods to read and write documents (raw or annotated) from and to parquet files.

As an example, imagine that we have the following document that uses the OMOP schema (parquet files are not actually stored as human-readable text, but this is for the sake of the example):

data.pq
{ "note_id": 0, "note_text": "Le patient ...", "note_datetime": "2021-10-23", "entities": [...] }
{ "note_id": 1, "note_text": "Autre doc ...", "note_datetime": "2022-12-24", "entities": [] }
...

You could also have multiple parquet files in a directory, the reader will read them all.

Reading Parquet files[source]

The ParquetReader (or edsnlp.data.read_parquet) reads a directory of parquet files (or a single file) and yields documents.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.read_parquet("path/to/parquet", converter="omop")
annotated_docs = nlp.pipe(doc_iterator)

Generator vs list

edsnlp.data.read_parquet returns a Stream. To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list

docs = list(edsnlp.data.read_parquet("path/to/parquet", converter="omop"))

Parameters

PARAMETER DESCRIPTION
path

Path to the directory containing the parquet files (will recursively look for files in subdirectories). Supports any filesystem supported by pyarrow.

TYPE: Union[str, Path]

filesystem

The filesystem to use to write the files. If None, the filesystem will be inferred from the path (e.g. s3:// will use S3).

TYPE: Optional[FileSystem] DEFAULT: None

shuffle

Whether to shuffle the data. If "dataset", the whole dataset will be shuffled before starting iterating on it (at the start of every epoch if looping). If "fragment", shuffling will occur between and inside the parquet files, but not across them.

Dataset shuffling

Shuffling the dataset can be expensive, especially for large datasets, since it requires reading the entire dataset into memory. If you have a large dataset, consider shuffling at the "fragment" level.

TYPE: Literal['dataset', 'fragment', False] DEFAULT: False

seed: Optional[int] The seed to use for shuffling. loop: bool Whether to loop over the data indefinitely. work_unit: Literal["record", "fragment"] Only affects the multiprocessing mode. If "record", every worker will start to read the same parquet file and yield each every num_workers-th record, starting at an offset each. For instance, if num_workers=2, the first worker will read the 1st, 3rd, 5th, ... records, while the second worker will read the 2nd, 4th, 6th, ... records of the first parquet file.

If "fragment", each worker will read a different parquet file. For instance, the
first worker will every record of the 1st parquet file, the second worker will
read every record of the 2nd parquet file, and so on. This way, no record is
"wasted" and every record loaded in memory is yielded.

converter: Optional[AsList[Union[str, Callable]]] Converters to use to convert the parquet rows of the data source to Doc objects These are documented on the Converters page. kwargs: Additional keyword arguments to pass to the converter. These are documented on the Converters page.

RETURNS DESCRIPTION
Stream

Writing Parquet files[source]

edsnlp.data.write_parquet writes a list of documents as a parquet dataset.

Example

import edsnlp

nlp = edsnlp.blank("eds")
nlp.add_pipe(...)

doc = nlp("My document with entities")

edsnlp.data.write_parquet([doc], "path/to/parquet")

Overwriting files

By default, write_parquet will raise an error if the directory already exists and contains parquet files. This is to avoid overwriting existing annotations. To allow overwriting existing files, use overwrite=True.

Parameters

PARAMETER DESCRIPTION
data

The data to write (either a list of documents or a Stream).

TYPE: Union[Any, Stream]

path

Path to the directory containing the parquet files (will recursively look for files in subdirectories). Supports any filesystem supported by pyarrow.

TYPE: Union[str, Path]

batch_size

The maximum number of documents to write in each parquet file.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

The method to batch the documents. If "docs", the batch size is the number of documents. If "fragment", each batch corresponds to a parquet file fragment from the input data.

TYPE: BatchBy DEFAULT: None

write_in_worker

In multiprocessing or spark mode, whether to batch and write the documents in the workers or in the main process.

For instance, a worker may read the 1st, 3rd, 5th, ... documents, while another reads the 2nd, 4th, 6th, ... documents.

If write_in_worker is False, deterministic is True (default) and no operation adds or remove document from the stream (e.g., no map_batches), the original order of the documents will be recovered in the main process, and batching there can produce fragments that respect the original order.

TYPE: bool DEFAULT: True

overwrite

Whether to overwrite existing directories.

TYPE: bool DEFAULT: False

filesystem

The filesystem to use to write the files. If None, the filesystem will be inferred from the path (e.g. s3:// will use S3).

TYPE: Optional[FileSystem] DEFAULT: None

execute

Whether to execute the writing operation immediately or to return a stream

TYPE: bool DEFAULT: True

converter

Converter to use to convert the documents to dictionary objects before writing them as Parquet rows. These are documented on the Converters page.

TYPE: Optional[Union[str, Callable]] DEFAULT: None

kwargs

Additional keyword arguments to pass to the converter. These are documented on the Converters page.

DEFAULT: {}