Commands

In this section you will learn how exploit the full potential of pipelime commands in your project. Pipelime commands can be seen as packaged tasks that can be run individually or as part of an execution graph. You can delegate to pydantic the tedious task of parsing and validating the arguments, while pipelime takes care of the rest, namely:

  • merging arguments from different sources (e.g., command line and config file)

  • advanced configuration management through Choixe

  • easy multi-processing

  • automatic cli and help generation for your project

We assume a basic understanding of pydantic, which is key to get the most out of pipelime commands. However, as you will see common use cases are already covered by pipelime, so you don’t need to worry if you are not yet a pydantic power user!

Relevant Modules

All commands derive from PipelimeCommand, which is defined in the pipelime.piper subpackage. In the same subpackage you can find the enum PiperPortType, which we will introduce later. Some commands’ arguments are often the same, e.g., the input and output datasets, therefore pipelime provides some general interfaces defined in pipelime.commands.interfaces.

Note that the pipelime.commands subpackage just imports the standard pipelime commands, so it is usually not necessary when you are writing you own.

A Naive Implementation

Let’s say we want to write a command that perform the following task:

  1. loads a dataset of images

  2. computes mean and standard deviation of the images

  3. applies such values to all the images in the dataset

  4. saves the result to a new folder

This might be a first attempt to implement it:

from pydantic import Field
import numpy as np
from pipelime.piper import PipelimeCommand, PiperPortType
from pipelime.sequences import SamplesSequence
from pipelime.stages import StageLambda

class StandardizationCommand(PipelimeCommand, title="std-img"):
    """Standardize images by subtracting the mean and dividing
    by the standard deviation.
    """

    input: str = Field(
        ..., description="The input folder.", piper_port=PiperPortType.INPUT
    )
    output: str = Field(
        ..., description="The output folder.", piper_port=PiperPortType.OUTPUT
    )
    image_key: str = Field("image", description="The key of the input image.")
    out_std_image_key: str = Field(
        "std_image", description="The key of the output standardized image."
    )

    def run(self):
        # load the dataset
        seq = SamplesSequence.from_underfolder(self.input)

        # add a stage to get the images in the range [0, 1]
        seq = seq.map(
            StageLambda(
                lambda x: x.set_value_as(
                    self.out_std_image_key,
                    self.image_key,
                    x[self.image_key]().astype(float) / 255.0,
                )
            )
        )

        # cache the values, so that we don't need to recompute it every time
        seq = seq.cache()

        # compute global mean and std
        mean = 0
        counter = 0
        for x in seq:
            image = x[self.out_std_image_key]()
            mean += image.sum()
            counter += image.size
        mean /= counter

        stddev = 0
        for x in seq:
            image = x[self.out_std_image_key]()
            stddev += ((image - mean) ** 2).sum()
        stddev = np.sqrt(stddev / (counter-1))

        # apply the standardization and save the result
        seq = seq.map(
            StageLambda(
                lambda x: x.set_value(
                    self.out_std_image_key,
                    (
                        ((x[self.out_std_image_key]() - mean) / stddev) * 128.0 + 128.0
                    ).clip(0, 255).astype(np.uint8),
                ),
            )
        )

        seq = seq.to_underfolder(self.output)
        seq.run()

First, note that all parameters are defined as pydantic Fields with a description and, optionally, a default value. Moreover, the input and output fields are marked as PiperPortType.INPUT and PiperPortType.OUTPUT, respectively: this is needed to find dependencies between commands when building an execution graph.

Though this is a working implementation, it has some drawbacks, namely:

  • there is no option to run the command in parallel

  • there is no progress bar

  • input and output cannot be validated

All these issues can be addressed by using built-in interfaces from pipelime.

Using The Interfaces

Common options, such as the input and output datasets, can be easily deployed in a standard way by using built-in interfaces from pipelime. For example, the previous command can be rewritten as follows:

from pydantic import Field
import numpy as np
from pipelime.piper import PipelimeCommand, PiperPortType
from pipelime.sequences import SamplesSequence
from pipelime.stages import StageLambda
import pipelime.commands.interfaces as plint

class StandardizationCommand(PipelimeCommand, title="std-img"):
    """Standardize images by subtracting the mean and dividing
    by the standard deviation.
    """

    input: plint.InputDatasetInterface = (
        plint.InputDatasetInterface.pyd_field(alias="i", piper_port=PiperPortType.INPUT)
    )
    output: plint.OutputDatasetInterface = (
        plint.OutputDatasetInterface.pyd_field(
            alias="o", piper_port=PiperPortType.OUTPUT
        )
    )
    image_key: str = Field("image", alias="i", description="The key of the input image.")
    out_std_image_key: str = Field(
        "std_image", alias="s", description="The key of the output standardized image."
    )
    grabber: plint.GrabberInterface = plint.GrabberInterface.pyd_field(alias="g")

    def run(self):
        # load the dataset
        seq = self.input.create_reader()

        # add a stage to get the images in the range [0, 1]
        seq = seq.map(
            StageLambda(
                lambda x: x.set_value_as(
                    self.out_std_image_key,
                    self.image_key,
                    x[self.image_key]().astype(float) / 255.0,
                )
            )
        )

        # cache the values, so that we don't need to recompute it every time
        seq = seq.cache()

        # compute global mean and std
        mean = 0
        counter = 0
        for x in seq:
            image = x[self.out_std_image_key]()
            mean += image.sum()
            counter += image.size
        mean /= counter

        stddev = 0
        for x in seq:
            image = x[self.out_std_image_key]()
            stddev += ((image - mean) ** 2).sum()
        stddev = np.sqrt(stddev / (counter-1))

        # apply the standardization
        seq = seq.map(
            StageLambda(
                lambda x: x.set_value(
                    self.out_std_image_key,
                    (
                        ((x[self.out_std_image_key]() - mean) / stddev) * 128.0 + 128.0
                    ).clip(0, 255).astype(np.uint8),
                ),
            )
        )

        # write to disk
        seq = self.output.append_writer(seq)
        self.grabber.grab_all(
            seq,
            grab_context_manager=self.output.serialization_cm(),
            keep_order=False,
            parent_cmd=self,
            track_message=f"Saving standardized images ({len(seq)} samples)",
        )

In the new implementation above, a number of improvements and best practices are adopted:

  1. Fields include an alias to add an alternative (short) name. For example, the input field can be specified as i or input.

  2. input and output are now pipelime interfaces. This gives the user a standard way to specify more than just the folder, including a validation schema (see section CLI for more details).

  3. The interfaces provide utility methods to easily create a reader (self.input.create_reader()) and append a writer to a sequence (self.output.append_writer(seq)). Also, the grabber interface allows to run on multiple processes and to show a progress bar.

  4. Note how the serialization options are given as a context manager to the grabber interface (self.output.serialization_cm()). This ensure that they are correctly applied even when multiple processes are involved.

Common Built-in Interfaces

When writing a new command, you should always use the built-in interfaces for common options. This way, users feel comfortable with your CLI and new features are automatically available to your command, without the need to change the code. Moreover, most interfaces are able to parse inputs in the so-called compact form, i.e., instead of a key-value mapping, just a single string that contains all the information needed to create the object.

In the following a short list of the most common interfaces available.

GrabberInterface

It exposes options to setup a multiprocessing run. Relevant methods:

  • pyd_field: a class method to ease the creation of the corresponding pydantic field.

  • grab_all, grab_all_wrk_init: methods to run a sequence on multiple processes. It takes care of showing a progress bar and of correctly handling the serialization options.

Compact form: num_workers[,prefetch]. Examples:

  • pipelime clone +g 4: run on 4 processes with default prefetch

  • pipelime clone +g 6,12: run on 6 processes with 12 samples prefetched by each process

InputDatasetInterface

A general interface to load a dataset from disk. Relevant methods:

  • pyd_field: a class method to ease the creation of the corresponding pydantic field.

  • create_reader: a method to create a reader for the dataset.

Input data can be validated against a schema, see Validation for more details.

Compact form: folder[,skip_empty]. Examples:

  • pipelime clone +i path: read all samples from the folder path

  • pipelime clone +i path,true: read only non-empty samples from the folder path

OutputDatasetInterface

A general interface to save a dataset to disk. Relevant methods:

  • pyd_field: a class method to ease the creation of the corresponding pydantic field.

  • append_writer: a method to append a writer to a sequence.

  • serialization_cm: a method to create a context manager to handle the serialization options in a multiprocessing run.

  • as_pipe: a method to get a dictionary to add to a pipe list, useful, e.g., when working with data streams.