Stages¶
Let’s focus some more on pipelime stages. As you may already know, stages are a special kind of operation that transforms individual samples of a sequence. When a stage is applied on a sequence, all of its samples are transformed independently.
If possible, you should always implement your operations as a stage, for the following reasons:
They minimize the code you have to write.
You get parallelization for free.
Your operation will become a reusable node for many different pipelines.
You will be able to manually run your operation from a command line interface.
Entities And Actions¶
Previously, we have seen how to get and modify samples from a dataset. Now, consider again the operations we implemented:
Keeping only the samples with even index
Inverting the color of the images
Adding a new item called
colorwith the average image colorDeleting the
maskinvitem
Points 2, 3 and 4 can be implemented as stages, while point 1 requires to remove samples from a sequence, thus violating one of the conditions to be a stage, i.e., input and outputs should have the same length.
Though you can implement your stage as a class derived from pipelime.stages.SampleStage, for most applications we recommend to use a more powerful mechanism based on auto-validated entities. You need to define three components:
An input entity defining the items that your stage expects to find in the input sample
An output entity defining the items that your stage will put into the output sample
An action callable accepting the input entity and returning the output entity
The input/output entities are classes derived from pipelime.stages.BaseEntity, which in turn is a pydantic models.
Hint
Pydantic models are dataclasses on steroids, they provide automatic de/serialization, validation, constructor, property-like fields generation and tons of interesting features aimed at reducing the amount of boilerplate code for plain python classes.
The entity model fields define the expected item keys and their types. Considering the first operation “Inverting the color of the images”, they can be implemented as follows:
from pipelime.stages.entities import BaseEntity
import pipelime.items as pli
class InvertInput(BaseEntity):
image: pli.ImageItem
class InvertOutput(BaseEntity):
image: pli.ImageItem
Then, the action can just be a function that takes an InvertInput and returns an InvertOutput:
def invert_action(x: InvertInput) -> InvertOutput:
return InvertOutput.merge(x, image=(255 - x.image()))
Note how the unrelated items are forwarded to the output using merge. Also, since we are overwriting the existing image key and we want to use the same item type, we just pass the raw numpy array.
The benefit of this approach are multiple:
The input and output entities are automatically validated, raising informative errors if the input sample is missing some items or if the items have the wrong type
By defining full pydantic
Fields, you can specify default values, constraints, help strings, etc.You can skip much of the boilerplate code and focus on the actual logic of your operation
To apply this action on a sequence, you have to wrap it inside a StageEntity and call the map method:
from pipelime.stages import StageEntity
new_seq = seq.map(StageEntity(invert_action))
Similarly, when running the map command from a configuration file:
map:
input: ...
output: ...
stage:
entity: class.path.to.invert_action
Where class.path.to.invert_action can be a path/to/module.py:invert_action as well. As you can see you don’t have to specify the input and output entities: they are automatically inferred from the action signature.
The next operation computes the average color value and adds a new item to the sample:
from pipelime.stages import BaseEntity
import pipelime.items as pli
import numpy as np
class AverageColorInput(BaseEntity):
image: pli.ImageItem
class AverageColorOutput(BaseEntity):
color: pli.NumpyItem
def avg_color_action(x: AverageColorInput) -> AverageColorOutput:
avg_color = np.mean(x.image(), axis=(0, 1))
return AverageColorOutput.merge(x, color=avg_color)
The value assigned to color is a raw numpy array, which is silently converted to the type stated in AverageColorOutput. In general, the actual concrete type is chosen according to the following rules:
if the input entity has an item with the same key and a compatible (covariant) type, such type is used
if the output entity is a concrete class, its type is used
if the output entity is an abstract class, its default type is used
In the above example, since we expect not color item in the input, the default type for NumpyItem, ie, NpyNumpyItem, will be used.
Finally, the last operation deletes the maskinv item:
class RemoveMaskinvInput(BaseEntity):
maskinv: pli.ImageItem
def remove_maskinv_action(x: RemoveMaskinvInput) -> BaseEntity:
out_dict = {k: v for k, v in x.dict().items() if k != "maskinv"}
return BaseEntity(**out_dict)
Here we used the dict method of the underlying pydantic model to get a dictionary representation of the input entity.
Parametrized Actions¶
In the examples above we have seen how to use any free function as an action. However, if you need to keep some internal state, you can define a class with the __call__ method instead.
For instance, let’s say we want to saturate the image value to a given threshold:
class SaturateInputOutput(BaseEntity):
image: pli.ImageItem
class SaturateAction:
def __init__(self, threshold: int):
self._threshold = threshold
def __call__(self, x: SaturateInputOutput) -> SaturateInputOutput:
image = x.image().copy() # make a copy to avoid modifying the original image
image[image > self._threshold] = self._threshold
return SaturateInputOutput.merge(x, image=image)
Then we can use it as follows:
from pipelime.stages import StageEntity
new_seq = seq.map(StageEntity(SaturateAction(111)))
Optional Items¶
Sometimes you may want to define an entity that can contain some optional items. For instance, we may expect a label in the input, but if not found, we just set it to 0:
class OptionalLabelInput(BaseEntity):
image: pli.ImageItem
label: pli.NumpyItem = pli.TxtNumpyItem(0)
Also, if you want to exclude some items from the output when a condition is met, just set it as optional and use the special None value:
from typing import Optional
class DebuggableOutput(BaseEntity):
image: pli.ImageItem
debug: Optional[pli.ImageItem]
class DebuggableAction:
def __init__(self, debug: bool):
self._debug = debug
def __call__(self, x):
# do some cool processing
# ...
if self._debug:
debug_image = ...
else:
debug_image = None
return DebuggableOutput.merge(x, debug=debug_image)
Entity Value Parsing¶
Besides high-level validation, item values can also be parsed and converted to a more convenient type. The most common use case is to parse a JSON/YAML metadata file into a pydantic model:
from typing import Sequence
from pydantic import BaseModel
from pipelime.stages.entities import ParsedItem
class MetadataModel(BaseModel):
keypoints: t.Sequence[t.Tuple[float, float]]
label: int
description: str
class EInput(BaseEntity):
image: pli.ImageItem
metadata: ParsedItem[pli.MetadataItem, MetadataModel]
The metadata has been defined as a ParsedItem class with two type parameters:
the first one is the type of the item to parse
the second one is the type of the parsed value
So, when the EInput entity is created from a sample, the metadata item type must be covariant with pli.MetadataItem and a MetadataModel instance is created from the raw metadata value. Then, calling metadata() will return the parsed value instead the raw item value.
Another use case is to parse and convert a file into a structured object. For instance, you may want to access a RGB image as 3 separate numpy arrays, one for each channel. In this case, you can define a custom class to validate and split an input image:
import numpy as np
class PlanarRGBImage:
def __init__(self, image: np.ndarray):
if image.ndim != 3 or image.shape[2] != 3:
raise ValueError("Expected a 3-channel RGB image")
self._image = image
@classmethod
def create_from_planes(cls, r: np.ndarray, g: np.ndarray, b: np.ndarray) -> "PlanarRGBImage":
return cls(np.stack([r, g, b], axis=-1))
@property
def r(self) -> np.ndarray:
return self._image[:, :, 0]
@property
def g(self) -> np.ndarray:
return self._image[:, :, 1]
@property
def b(self) -> np.ndarray:
return self._image[:, :, 2]
def to_item_data(self) -> np.ndarray:
return self._image
Note that the special to_item_data method is required to convert a parsed value back to a raw item value. Then, you can use it as follows:
import pipelime.stages.entities as ple
import pipelime.items as pli
class PlanarIO(ple.BaseEntity):
image: ple.ParsedItem[pli.ImageItem, PlanarRGBImage]
def my_action(x: PlanarIO):
r = x.image().r
g = x.image().g
b = x.image().b
# ... do some processing
return PlanarIO(image=PlanarRGBImage.create_from_planes(r, g, b))
As shown in the last example, ParsedItem can be used in output entities as well and can be built from a pipelime item, its raw value or the parsed value. In any case, both the item and the parsed value must be constructable from the raw item value, so a full validation is always performed. Also, if you don’t care about the actual item type, you can just use ParsedData[ParsedType] instead of ParsedItem[ItemType, ParsedType].
ParsedItem and ParsedData are fully compatible with the Optional and DynamicKey (see below) features.
Dynamic Key Names¶
To make your actions more flexible and reusable, the actual item key names they work on can be defined at runtime. For instance, let’s say each input sample is composed of two images image_1 and image_2 and we’d want to apply the SaturateAction on both. Instead of expecting an item called image, we can define a private field with a special DynamicKey value:
from pipelime.stages.entities import DynamicKey
class SaturateDynamicInput(BaseEntity):
_image = DynamicKey(pli.ImageItem)
Note that the _image field will not considered when parsing and validating an input sample.
Then, the new action expects the actual image key name as a parameter and explicitly calls a validation on it:
class SaturateDynamicAction:
def __init__(self, threshold: int, image_key: str = "image"):
self._threshold = threshold
self._image_key = image_key
def __call__(self, x: SaturateDynamicInput):
image = x._image.validate(self._image_key)
image = image().copy() # make a copy to avoid modifying the original image
image[image > self._threshold] = self._threshold
return BaseEntity.merge(x, **{self._image_key: image})
The validate method checks that the given key is present in the input sample with the required type, then returns the corresponding item. Note how the kwargs notation has been used to overwrite the actual _image field with the new value.
DynamicKey is fully compatible with the usual field definitions and other features:
the full signature is
DynamicKey(item_type, default_value, default_factory, **field_kwargs), wherefield_kwargsis any other keyword argument accepted bypydantic.FieldDynamicKey(ParsedItem[ItemType, ParsedType])is also supported
Action Registration¶
Action functions and classes can be registered to ease their use in a configuration file. To this end, add the @pipelime.stages.entities.register_action decorator:
import pipelime.stages.entities as ple
@ple.register_action("FnAction", description="My function action help")
def my_fn_action(x):
...
return x
@ple.register_action("ClassAction", description="My class action help")
class MyAction:
def __init__(self, param1, param2):
...
def __call__(self, x):
...
return x
@ple.register_action("ModelAction", description="My model action help")
class MyModelAction(pydantic.BaseModel):
param1: int
param2: str
def __call__(self, x):
...
return x
Note that when no annotation is provided, the input type defaults to BaseEntity.
Once an action is registered, it can be recalled in a configuration file by its name, eg:
- map:
entity: FnAction
- map:
entity:
ClassAction: [..., ...] # positional arguments
- map:
entity:
MyModelAction:
param1: ...
param2: ...
Finally, the registered actions can be listed together with the Sample Stages with $ pipelime list.
Advanced Entity Features¶
Leveraging the power of pydantic models, you can define more complex entities that fit your needs. First, BaseEntity forwards any item key that is not listed as field. Though, you can drop them or even raise an error:
# removes from the sample all the items except "image"
class DropExtraEntity(BaseEntity, extra="ignore"):
image: pli.ImageItem
# raise an error if the sample contains items other than "image"
class StrictEntity(BaseEntity, extra="forbid"):
image: pli.ImageItem
Also, you can add custom validation logic to your entities:
class ColorImageEntity(BaseEntity):
image: pli.ImageItem
@validator("image")
def check_color(cls, v):
if v().shape[2] != 3:
raise ValueError("The image must be a color image")
return v
You can even perform advanced transformations on the input items, checkout pydantic for more details:
class MaskedGrayImageEntity(BaseEntity):
"""A gray image with a mask.
The grayscale image is computed from the RGB image if not provided.
"""
image: pli.ImageItem = None
grayscale: pli.ImageItem = None
mask: pli.ImageItem
@validator("image")
def check_color_image(cls, v):
if v is not None and v().shape[2] != 3:
raise ValueError("The `image` must be a color image")
return v
@validator("grayscale", always=True)
def check_gray_image(cls, v, values):
if v is None:
if "image" not in values or values["image"] is None:
raise ValueError("Either `image` or `grayscale` must be provided")
v = values["image"].make_new(values["image"]().mean(axis=2))
return v
@validator("mask")
def check_mask(cls, v, values):
if v().shape[2] != 1:
raise ValueError("The `mask` must be single channel")
if v().shape[:2] != values["grayscale"]().shape[:2]:
raise ValueError("The `mask` and the `grayscale` image must have the same size")
return v
Finally, you can even use lambda functions as actions and define your input type through the input_type parameter:
from pipelime.stages import StageEntity
new_seq = seq.map(StageEntity(action=lambda x: ..., input_type=YourInputEntity))
# or, equivalently
new_seq = seq.map(StageEntity(action="class.path.to.your_action", input_type="class.path.to.YourInputEntity"))
where YourInputEntity must be a subclass of BaseEntity.
Full-fledged Stages¶
Though the previous approach is very powerful, sometimes it might not fit all your needs and you want to write a full-fledged stage class. The invert_action can be converted to a stage class as follows:
from pydantic import Field
from pipelime.sequences import Sample
from pipelime.stages import SampleStage
class InvertStage(SampleStage, title="invert"):
"""Inverts the colors of an image."""
key: str = Field("image", description="The key of the image to invert.")
def __call__(self, x: Sample) -> Sample:
return x.set_value(self.key, 255 - x[self.key]()) # type: ignore
Though the items are not automatically parsed and validated, we can easily support dynamic key names as well as inject the stage into a pipeline, both programmatically:
stage = InvertStage(key=...)
new_seq = seq.map(stage)
or from a configuration file using the title field given in the class definition:
map:
input: ...
output: ...
stage:
invert:
key: ...
All stages must implement the __call__ method, accepting and returning a single sample. The call method here simply reads the image item, inverts the colors and returns the new sample. The dynamic key name is given through the key field, that defaults to the string “image”, and includes a description. Though not essential, you should always set fields’ descriptions because they are automatically used by pipelime to display a help message in the CLI. Likewise, the class docstring is extracted and used as a description of the stage.
Using a stage to implement the average color computation is pretty easy as well:
from pydantic import Field
from pipelime.sequences import Sample
from pipelime.stages import SampleStage
class AverageColor(SampleStage, title="avg_color"):
"""Averages the color of an image."""
image_key: str = Field("image", description="The key of the image to average.")
avg_key: str = Field("color", description="The key of the average color.")
def __call__(self, x: Sample) -> Sample:
return x.set_value(self.avg_key, np.mean(x[self.image_key](), axis=(0, 1))) # type: ignore
Instead, the removal of the maskinv item is achieved with the built-in StageKeysFilter.
Stage Composition¶
To sequentially apply all the three stages, you can just combine them with the >> and << operators:
from pipelime.stages import StageKeysFilter
new_seq = seq.map(InvertStage() >> AverageColor() >> StageKeysFilter(key_list=["maskinv"], negate=True))
or, if you prefer, flippling the shift operator:
from pipelime.stages import StageKeysFilter
new_seq = seq.map(StageKeysFilter(key_list=["maskinv"], negate=True) << AverageColor() << InvertStage())
Indeed, the left/right shift operators are just shorcuts for the StageCompose stage:
from pipelime.stages import StageKeysFilter
new_seq = seq.map(StageCompose([InvertStage(), AverageColor(), StageKeysFilter(key_list=["maskinv"], negate=True)]))
Tip
When using built-in stages, you can use their titles as well:
new_seq = seq.map({"compose": ["invert", "avg_color", {"filter-keys": {"key_list": ["maskinv"], "negate": True}}]}})