Parquet
TLDR
import edsnlp
stream = edsnlp.data.read_parquet(path, converter="omop")
stream = stream.map_pipeline(nlp)
res = stream.to_parquet(path, converter="omop")
# or equivalently
edsnlp.data.to_parquet(stream, path, converter="omop")
We provide methods to read and write documents (raw or annotated) from and to parquet files.
As an example, imagine that we have the following document that uses the OMOP schema (parquet files are not actually stored as human-readable text, but this is for the sake of the example):
{ "note_id": 0, "note_text": "Le patient ...", "note_datetime": "2021-10-23", "entities": [...] }
{ "note_id": 1, "note_text": "Autre doc ...", "note_datetime": "2022-12-24", "entities": [] }
...
You could also have multiple parquet files in a directory, the reader will read them all.
Reading Parquet files[source]
The ParquetReader (or edsnlp.data.read_parquet
) reads a directory of parquet files (or a single file) and yields documents.
Example
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc_iterator = edsnlp.data.read_parquet("path/to/parquet", converter="omop")
annotated_docs = nlp.pipe(doc_iterator)
Generator vs list
edsnlp.data.read_parquet
returns a Stream. To iterate over the documents multiple times efficiently or to access them by index, you must convert it to a list
docs = list(edsnlp.data.read_parquet("path/to/parquet", converter="omop"))
Parameters
PARAMETER | DESCRIPTION |
---|---|
path | Path to the directory containing the parquet files (will recursively look for files in subdirectories). Supports any filesystem supported by pyarrow. TYPE: |
filesystem | The filesystem to use to write the files. If None, the filesystem will be inferred from the path (e.g. TYPE: |
shuffle | Whether to shuffle the data. If "dataset", the whole dataset will be shuffled before starting iterating on it (at the start of every epoch if looping). If "fragment", shuffling will occur between and inside the parquet files, but not across them. Dataset shuffling Shuffling the dataset can be expensive, especially for large datasets, since it requires reading the entire dataset into memory. If you have a large dataset, consider shuffling at the "fragment" level. TYPE: |
seed: Optional[int] The seed to use for shuffling. loop: bool Whether to loop over the data indefinitely. work_unit: Literal["record", "fragment"] Only affects the multiprocessing mode. If "record", every worker will start to read the same parquet file and yield each every num_workers-th record, starting at an offset each. For instance, if num_workers=2, the first worker will read the 1st, 3rd, 5th, ... records, while the second worker will read the 2nd, 4th, 6th, ... records of the first parquet file.
If "fragment", each worker will read a different parquet file. For instance, the
first worker will every record of the 1st parquet file, the second worker will
read every record of the 2nd parquet file, and so on. This way, no record is
"wasted" and every record loaded in memory is yielded.
converter: Optional[AsList[Union[str, Callable]]] Converters to use to convert the parquet rows of the data source to Doc objects These are documented on the Converters page. kwargs: Additional keyword arguments to pass to the converter. These are documented on the Converters page.
RETURNS | DESCRIPTION |
---|---|
Stream | |
Writing Parquet files[source]
edsnlp.data.write_parquet
writes a list of documents as a parquet dataset.
Example
import edsnlp
nlp = edsnlp.blank("eds")
nlp.add_pipe(...)
doc = nlp("My document with entities")
edsnlp.data.write_parquet([doc], "path/to/parquet")
Overwriting files
By default, write_parquet
will raise an error if the directory already exists and contains parquet files. This is to avoid overwriting existing annotations. To allow overwriting existing files, use overwrite=True
.
Parameters
PARAMETER | DESCRIPTION |
---|---|
data | The data to write (either a list of documents or a Stream). TYPE: |
path | Path to the directory containing the parquet files (will recursively look for files in subdirectories). Supports any filesystem supported by pyarrow. TYPE: |
batch_size | The maximum number of documents to write in each parquet file. TYPE: |
batch_by | The method to batch the documents. If "docs", the batch size is the number of documents. If "fragment", each batch corresponds to a parquet file fragment from the input data. TYPE: |
write_in_worker | In multiprocessing or spark mode, whether to batch and write the documents in the workers or in the main process. For instance, a worker may read the 1st, 3rd, 5th, ... documents, while another reads the 2nd, 4th, 6th, ... documents. If TYPE: |
overwrite | Whether to overwrite existing directories. TYPE: |
filesystem | The filesystem to use to write the files. If None, the filesystem will be inferred from the path (e.g. TYPE: |
execute | Whether to execute the writing operation immediately or to return a stream TYPE: |
converter | Converter to use to convert the documents to dictionary objects before writing them as Parquet rows. These are documented on the Converters page. TYPE: |
kwargs | Additional keyword arguments to pass to the converter. These are documented on the Converters page. DEFAULT: |