ai_graph.step.video.basic module

Video Processing Steps Module.

This module provides basic video processing steps for a pipeline, including opening a video capture device or file, reading frames, releasing frames, and reading frames from a file path.

class ai_graph.step.video.basic.OpenVideoCaptureStep(source, name=None)[source]

Bases: BasePipelineStep

Pipeline step to open a video capture device or file.

__init__(source, name=None)[source]

Initialize the video capture step.

Parameters:
  • source (int or str) – Video source (device index or file path).

  • name (str, optional) – Name of this pipeline step.

class ai_graph.step.video.basic.ReadVideoFrameStep(name=None)[source]

Bases: BasePipelineStep

Pipeline step to read a frame from the video capture device.

This step reads the next frame from an opened video capture device or file. It supports both sequential reading and seeking to specific frame numbers.

Examples

>>> # Reading sequential frames
>>> step = ReadVideoFrameStep()
>>> data = {'video_capture': {'capture': cap, 'metadata': metadata}}
>>> result = step.process(data)
>>> frame = result['frame']
>>> # Reading a specific frame
>>> data = {'video_capture': {'capture': cap, 'metadata': metadata}, 'frame_num': 100}
>>> result = step.process(data)
__init__(name=None)[source]

Initialize the read step.

Parameters:

name (str, optional) – Name of this pipeline step.

class ai_graph.step.video.basic.ReleaseVideoFrameStep(name=None)[source]

Bases: BasePipelineStep

Pipeline step to release the current video frame.

__init__(name=None)[source]

Initialize the release step.

Parameters:

name (str, optional) – Name of this pipeline step.

class ai_graph.step.video.basic.ReadFrameFromFileStep(name=None)[source]

Bases: BasePipelineStep

Pipeline step to read a frame from a file path.

This step reads an image file from the filesystem using OpenCV’s imread function. It supports all image formats supported by OpenCV including JPEG, PNG, BMP, TIFF, etc.

Examples

>>> step = ReadFrameFromFileStep()
>>> data = {'frame_path': '/path/to/image.jpg'}
>>> result = step.process(data)
>>> frame = result['frame']  # numpy array containing the image
__init__(name=None)[source]

Initialize the read frame step.

Parameters:

name (str, optional) – Name of this pipeline step.

class ai_graph.step.video.basic.ReleaseVideoCaptureStep(name=None)[source]

Bases: BasePipelineStep

Pipeline step to release the video capture device.

__init__(name=None)[source]

Initialize the release step.

Parameters:

name (str, optional) – Name of this pipeline step.