Skip to content

edsnlp.core.stream

Stream [source]

set_processing [source]

Parameters

PARAMETER DESCRIPTION
batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: BatchBy DEFAULT: None

num_cpu_workers

Number of CPU workers. A CPU worker handles the non deep-learning components and the preprocessing, collating and postprocessing of deep-learning components. If no GPU workers are used, the CPU workers also handle the forward call of the deep-learning components.

TYPE: Optional[int] DEFAULT: None

num_gpu_workers

Number of GPU workers. A GPU worker handles the forward call of the deep-learning components. Only used with "multiprocessing" backend.

TYPE: Optional[int] DEFAULT: None

disable_implicit_parallelism

Whether to disable OpenMP and Huggingface tokenizers implicit parallelism in multiprocessing mode. Defaults to True.

TYPE: bool DEFAULT: True

backend

The backend to use for parallel processing. If not set, the backend is automatically selected based on the input data and the number of workers.

  • "simple" is the default backend and is used when num_cpu_workers is 1 and num_gpu_workers is 0.
  • "multiprocessing" is used when num_cpu_workers is greater than 1 or num_gpu_workers is greater than 0.
  • "spark" is used when the input data is a Spark dataframe and the output writer is a Spark writer.

TYPE: Optional[Literal['simple', 'multiprocessing', 'mp', 'spark']] DEFAULT: None

autocast

Whether to use automatic mixed precision (AMP) for the forward pass of the deep-learning components. If True (by default), AMP will be used with the default settings. If False, AMP will not be used. If a dtype is provided, it will be passed to the torch.autocast context manager.

TYPE: Union[bool, Any] DEFAULT: None

show_progress

Whether to show progress bars (only applicable with "simple" and "multiprocessing" backends).

TYPE: bool DEFAULT: False

gpu_pipe_names

List of pipe names to accelerate on a GPUWorker, defaults to all pipes that inherit from TorchComponent. Only used with "multiprocessing" backend. Inferred from the pipeline if not set.

TYPE: Optional[List[str]] DEFAULT: None

process_start_method

Whether to use "fork" or "spawn" as the start method for the multiprocessing backend. The default is "fork" on Unix systems and "spawn" on Windows.

  • "fork" is the default start method on Unix systems and is the fastest start method, but it is not available on Windows, can cause issues with CUDA and is not safe when using multiple threads.
  • "spawn" is the default start method on Windows and is the safest start method, but it is not available on Unix systems and is slower than "fork".

TYPE: Optional[Literal['fork', 'spawn']] DEFAULT: None

gpu_worker_devices

List of GPU devices to use for the GPU workers. Defaults to all available devices, one worker per device. Only used with "multiprocessing" backend.

TYPE: Optional[List[str]] DEFAULT: None

cpu_worker_devices

List of GPU devices to use for the CPU workers. Used for debugging purposes.

TYPE: Optional[List[str]] DEFAULT: None

deterministic

Whether to try and preserve the order of the documents in "multiprocessing" mode. If set to False, workers will process documents whenever they are available in a dynamic fashion, which may result in out-of-order but usually faster processing. If set to true, tasks will be distributed in a static, round-robin fashion to workers. Defaults to True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Stream

map [source]

Maps a callable to the documents. It takes a callable as input and an optional dictionary of keyword arguments. The function will be applied to each element of the collection. If the callable is a generator function, each element will be yielded to the stream as is.

Parameters

PARAMETER DESCRIPTION
pipe

The callable to map to the documents.

kwargs

The keyword arguments to pass to the callable.

DEFAULT: {}

RETURNS DESCRIPTION
Stream

flatten [source]

Flattens the stream.

RETURNS DESCRIPTION
Stream

map_batches [source]

Maps a callable to a batch of documents. The callable should take a list of inputs. The output of the callable will be flattened if it is a list or a generator, or yielded to the stream as is if it is a single output (tuple or any other type).

Parameters

PARAMETER DESCRIPTION
pipe

The callable to map to the documents.

kwargs

The keyword arguments to pass to the callable.

DEFAULT: {}

batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: BatchBy DEFAULT: None

RETURNS DESCRIPTION
Stream

batchify [source]

Accumulates the documents into batches and yield each batch to the stream.

Parameters

PARAMETER DESCRIPTION
batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: BatchBy DEFAULT: None

RETURNS DESCRIPTION
Stream

map_gpu [source]

Maps a deep learning operation to a batch of documents, on a GPU worker.

Parameters

PARAMETER DESCRIPTION
prepare_batch

A callable that takes a list of documents and a device and returns a batch of tensors (or anything that can be passed to the forward callable). This will be called on a CPU-bound worker, and may be parallelized.

TYPE: Callable[[List, Union[str, device]], Any]

forward

A callable that takes the output of prepare_batch and returns the output of the deep learning operation. This will be called on a GPU-bound worker.

TYPE: Callable[[Any], Any]

postprocess

An optional callable that takes the list of documents and the output of the deep learning operation, and returns the final output. This will be called on the same CPU-bound worker that called the prepare_batch function.

TYPE: Optional[Callable[[List, Any], Any]] DEFAULT: None

batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: BatchBy DEFAULT: None

RETURNS DESCRIPTION
Stream

map_pipeline [source]

Maps a pipeline to the documents, i.e. adds each component of the pipeline to the stream operations. This function is called under the hood by nlp.pipe()

Parameters

PARAMETER DESCRIPTION
model

The pipeline to map to the documents.

TYPE: Pipeline

batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[int, str]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: BatchBy DEFAULT: None

RETURNS DESCRIPTION
Stream

shuffle [source]

Shuffles the stream by accumulating the documents into batches and shuffling the batches. We try to optimize and avoid the accumulation by shuffling items directly in the reader, but if some upstream operations are not elementwise or if the reader is not compatible with the batching mode, we have to accumulate the documents into batches and shuffle the batches.

For instance, imagine a reading from list of 2 very large documents and applying an operation to split the documents into sentences. Shuffling only in the reader, then applying the split operation would not shuffle the sentences across documents and may lead to a lack of randomness when training a model. Think of this as having lumps after mixing your data. In our case, we detect that the split op is not elementwise and trigger the accumulation of sentences into batches after their generation before shuffling the batches.

Parameters

PARAMETER DESCRIPTION
batch_size

The batch size. Can also be a batching expression like "32 docs", "1024 words", "dataset", "fragment", etc.

TYPE: Optional[Union[str, int]] DEFAULT: None

batch_by

Function to compute the batches. If set, it should take an iterable of documents and return an iterable of batches. You can also set it to "docs", "words" or "padded_words" to use predefined batching functions. Defaults to "docs".

TYPE: Optional[str, BatchFn] DEFAULT: None

seed

The seed to use for shuffling.

TYPE: Optional[int] DEFAULT: None

shuffle_reader

Whether to shuffle the reader. Defaults to True if the reader is compatible with the batch_by mode, False otherwise.

TYPE: Optional[Union[bool, str]] DEFAULT: None

RETURNS DESCRIPTION
Stream

loop [source]

Loops over the stream indefinitely.

Note that we cycle over items produced by the reader, not the items produced by the stream operations. This means that the stream operations will be applied to the same items multiple times, and may produce different results if they are non-deterministic. This also mean that calling this function will have the same effect regardless of the operations applied to the stream before calling it, ie:

stream.loop().map(...)
# is equivalent to
stream.map(...).loop()
RETURNS DESCRIPTION
Stream

torch_components [source]

Yields components that are PyTorch modules.

RETURNS DESCRIPTION
Iterable['edsnlp.core.torch_component.TorchComponent']

train [source]

Enables training mode on pytorch modules

Parameters

PARAMETER DESCRIPTION
mode

Whether to enable training or not

DEFAULT: True

eval [source]

Enables evaluation mode on pytorch modules