Quick Start Guide

This guide will help you get started with AI-Graph in just a few minutes. AI-Graph is a framework for building and managing AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.

What You’ll Learn

  • How to create custom pipeline steps

  • How to build and run pipelines

  • How to chain multiple steps together

  • How to work with collections using ForEach steps

  • Error handling best practices

  • Real-world examples

Setup and Imports

First, let’s import the necessary components from the AI-Graph framework:

[1]:
# Import the core components from AI-Graph
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Additional imports for our examples
import json
import time
import re
from typing import Dict, Any

print("AI-Graph framework imported successfully!")
AI-Graph framework imported successfully!

Your First Pipeline

Let’s create a simple pipeline step that processes numerical data. In AI-Graph, all steps work with dictionary data structures and inherit from BasePipelineStep:

[2]:
class DoubleStep(BasePipelineStep):
    """A step that doubles the input value."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Double the 'value' key in the input data."""
        if 'value' in data:
            data['value'] = data['value'] * 2
        return data

# Create and run the pipeline
pipeline = Pipeline(name="DoubleValuePipeline")
pipeline.add_step(DoubleStep())

# Input data as a dictionary
input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
Input: {'value': 10}
Result: {'value': 10}

Multiple Steps in a Pipeline

You can chain multiple steps together. Each step processes the data and passes it to the next step:

[3]:
class AddStep(BasePipelineStep):
    def __init__(self, add_value: int, name: str = None):
        super().__init__(name or f"AddStep_{add_value}")
        self.add_value = add_value

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'value' in data:
            data['value'] = data['value'] + self.add_value
        return data

class MultiplyStep(BasePipelineStep):
    def __init__(self, multiply_value: int, name: str = None):
        super().__init__(name or f"MultiplyStep_{multiply_value}")
        self.multiply_value = multiply_value

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'value' in data:
            data['value'] = data['value'] * self.multiply_value
        return data

# Create pipeline with multiple steps
pipeline = Pipeline(name="MathPipeline")
pipeline.add_step(AddStep(10))       # Add 10
pipeline.add_step(MultiplyStep(2))   # Multiply by 2

input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
print(f"Calculation: (5 + 10) * 2 = {result['value']}")
Input: {'value': 30}
Result: {'value': 30}
Calculation: (5 + 10) * 2 = 30

Using Built-in Steps

AI-Graph provides several built-in steps for common operations. Let’s use AddKeyStep and DelKeyStep:

[4]:
# Create a pipeline that adds metadata and processes data
pipeline = Pipeline(name="MetadataPipeline")
pipeline.add_step(AddKeyStep("timestamp", "2025-07-16T10:00:00Z"))
pipeline.add_step(AddKeyStep("processed_by", "AI-Graph"))
pipeline.add_step(DoubleStep())  # Our custom step from before
pipeline.add_step(DelKeyStep("timestamp"))  # Remove timestamp after processing

input_data = {"value": 42, "user_id": "user123"}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
Input: {'value': 84, 'user_id': 'user123', 'processed_by': 'AI-Graph'}
Result: {'value': 84, 'user_id': 'user123', 'processed_by': 'AI-Graph'}

Working with Collections using ForEach

Use ForEach steps to process collections of items. The ForEach step creates a sub-pipeline that processes each item:

[5]:
class SquareStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Square the current item in a ForEach iteration."""
        if '_current_item' in data:
            # ForEach puts the current item in '_current_item'
            current_value = data['_current_item']
            data['_current_item'] = current_value ** 2
        return data

# Create main pipeline with ForEach
pipeline = Pipeline(name="SquareNumbersPipeline")

# Create ForEach step that will process each number in the 'numbers' key
foreach_step = ForEachStep(
    items_key="numbers",
    results_key="squared_numbers"
)
foreach_step.add_sub_step(SquareStep())

pipeline.add_step(foreach_step)

input_data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(input_data)

print(f"Input numbers: {input_data['numbers']}")
print(f"Squared numbers: {[item['_current_item'] for item in result['squared_numbers']]}")
Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:00<00:00, 50051.36item/s]
Input numbers: [1, 2, 3, 4, 5]
Squared numbers: [1, 4, 9, 16, 25]

Error Handling

AI-Graph provides built-in error handling. Let’s create a step that might fail and see how to handle it:

[6]:
class DivideStep(BasePipelineStep):
    def __init__(self, divisor: float, name: str = None):
        super().__init__(name or f"DivideStep_{divisor}")
        self.divisor = divisor

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if self.divisor == 0:
            raise ValueError("Cannot divide by zero")

        if 'value' in data:
            data['value'] = data['value'] / self.divisor
        return data

# Test with a safe divisor first
pipeline = Pipeline(name="SafeDivisionPipeline")
pipeline.add_step(DivideStep(2))

input_data = {"value": 10}
result = pipeline.process(input_data)
print(f"Safe division - Input: {input_data}, Result: {result}")

# Now test with zero (this will raise an error)
pipeline_with_error = Pipeline(name="ErrorPipeline")
pipeline_with_error.add_step(DivideStep(0))

try:
    input_data = {"value": 10}
    result = pipeline_with_error.process(input_data)
except ValueError as e:
    print(f"Caught expected error: {e}")
Safe division - Input: {'value': 5.0}, Result: {'value': 5.0}
Caught expected error: Cannot divide by zero

Real-World Example: Text Processing Pipeline

Here’s a more practical example for text processing that demonstrates multiple concepts:

[7]:
class CleanTextStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Clean text by removing extra whitespace and converting to lowercase."""
        if '_current_item' in data:
            text = data['_current_item']
            # Remove extra whitespace and convert to lowercase
            cleaned = re.sub(r'\s+', ' ', text.strip().lower())
            data['_current_item'] = cleaned
        return data

class CountWordsStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Count words in the text."""
        if '_current_item' in data:
            text = data['_current_item']
            word_count = len(text.split())
            data['word_count'] = word_count
            data['text'] = text  # Keep the cleaned text
        return data

class FilterLongTextsStep(BasePipelineStep):
    def __init__(self, min_words: int = 5, name: str = None):
        super().__init__(name or f"FilterLongTexts_{min_words}")
        self.min_words = min_words

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Filter out texts with fewer than min_words."""
        if 'word_count' in data and data['word_count'] < self.min_words:
            data['filtered_out'] = True
        else:
            data['filtered_out'] = False
        return data

# Create the text processing sub-pipeline
foreach_step = ForEachStep(
    items_key="texts",
    results_key="processed_texts"
)
foreach_step.add_sub_step(CleanTextStep())
foreach_step.add_sub_step(CountWordsStep())
foreach_step.add_sub_step(FilterLongTextsStep(min_words=3))

# Create main pipeline
main_pipeline = Pipeline(name="TextProcessingPipeline")
main_pipeline.add_step(foreach_step)

# Test data
input_data = {
    "texts": [
        "Hello World!",
        "This is a longer text with many words",
        "Short",
        "AI-Graph makes pipeline processing easy and efficient"
    ]
}

result = main_pipeline.process(input_data)

print("Text Processing Results:")
print("=" * 40)
for i, processed_item in enumerate(result['processed_texts']):
    original_text = input_data['texts'][i]
    cleaned_text = processed_item.get('text', 'N/A')
    word_count = processed_item.get('word_count', 0)
    filtered = processed_item.get('filtered_out', False)

    print(f"Original: '{original_text}'")
    print(f"Cleaned:  '{cleaned_text}'")
    print(f"Words:    {word_count}")
    print(f"Filtered: {'Yes' if filtered else 'No'}")
    print("-" * 40)

# Extract only non-filtered results
valid_results = [
    item for item in result['processed_texts']
    if not item.get('filtered_out', False)
]
print(f"\nValid texts (>= 3 words): {len(valid_results)} out of {len(input_data['texts'])}")
Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 4/4 [00:00<00:00, 39290.90item/s]
Text Processing Results:
========================================
Original: 'Hello World!'
Cleaned:  'hello world!'
Words:    2
Filtered: Yes
----------------------------------------
Original: 'This is a longer text with many words'
Cleaned:  'this is a longer text with many words'
Words:    8
Filtered: No
----------------------------------------
Original: 'Short'
Cleaned:  'short'
Words:    1
Filtered: Yes
----------------------------------------
Original: 'AI-Graph makes pipeline processing easy and efficient'
Cleaned:  'ai-graph makes pipeline processing easy and efficient'
Words:    7
Filtered: No
----------------------------------------

Valid texts (>= 3 words): 2 out of 4

Progress Tracking with ForEach

The ForEach step automatically shows progress bars for long-running operations. Let’s see this in action:

[8]:
class SlowProcessingStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate slow processing."""
        time.sleep(0.1)  # Simulate slow processing
        if '_current_item' in data:
            data['_current_item'] = data['_current_item'] * 2
        return data

# Create pipeline with progress tracking
foreach_step = ForEachStep(
    items_key="numbers",
    results_key="processed_numbers"
)
foreach_step.add_sub_step(SlowProcessingStep())

pipeline = Pipeline(name="SlowProcessingPipeline")
pipeline.add_step(foreach_step)

# Process a list of numbers with progress tracking
input_data = {"numbers": list(range(10))}
print("Processing numbers with progress tracking:")
result = pipeline.process(input_data)

processed_numbers = [item['_current_item'] for item in result['processed_numbers']]
print(f"\nOriginal: {input_data['numbers']}")
print(f"Doubled:  {processed_numbers}")
Processing numbers with progress tracking:
Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 10/10 [00:01<00:00,  9.90item/s]

Original: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Doubled:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Advanced Example: Data Validation Pipeline

Let’s create a more complex example that validates and processes user data:

[9]:
class ValidateEmailStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate email format."""
        if 'email' in data:
            email = data['email']
            # Simple email validation
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            data['email_valid'] = bool(re.match(email_pattern, email))
        return data

class ValidateAgeStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Validate age is reasonable."""
        if 'age' in data:
            age = data['age']
            data['age_valid'] = isinstance(age, int) and 0 <= age <= 150
        return data

class SummarizeValidationStep(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Summarize validation results."""
        email_valid = data.get('email_valid', False)
        age_valid = data.get('age_valid', False)

        data['is_valid'] = email_valid and age_valid
        data['validation_errors'] = []

        if not email_valid:
            data['validation_errors'].append('Invalid email format')
        if not age_valid:
            data['validation_errors'].append('Invalid age')

        return data

# Create validation pipeline
validation_pipeline = Pipeline(name="UserValidationPipeline")
validation_pipeline.add_step(ValidateEmailStep())
validation_pipeline.add_step(ValidateAgeStep())
validation_pipeline.add_step(SummarizeValidationStep())

# Test with various user data
test_users = [
    {"name": "John Doe", "email": "john@example.com", "age": 25},
    {"name": "Jane Smith", "email": "invalid-email", "age": 30},
    {"name": "Bob Johnson", "email": "bob@test.com", "age": -5},
    {"name": "Alice Brown", "email": "alice@company.org", "age": 200}
]

print("User Validation Results:")
print("=" * 50)

for user in test_users:
    result = validation_pipeline.process(user.copy())

    print(f"Name: {result['name']}")
    print(f"Email: {result['email']} ({'βœ“' if result['email_valid'] else 'βœ—'})")
    print(f"Age: {result['age']} ({'βœ“' if result['age_valid'] else 'βœ—'})")
    print(f"Valid: {'βœ“' if result['is_valid'] else 'βœ—'}")

    if result['validation_errors']:
        print(f"Errors: {', '.join(result['validation_errors'])}")

    print("-" * 50)
User Validation Results:
==================================================
Name: John Doe
Email: john@example.com (βœ“)
Age: 25 (βœ“)
Valid: βœ“
--------------------------------------------------
Name: Jane Smith
Email: invalid-email (βœ—)
Age: 30 (βœ“)
Valid: βœ—
Errors: Invalid email format
--------------------------------------------------
Name: Bob Johnson
Email: bob@test.com (βœ“)
Age: -5 (βœ—)
Valid: βœ—
Errors: Invalid age
--------------------------------------------------
Name: Alice Brown
Email: alice@company.org (βœ“)
Age: 200 (βœ—)
Valid: βœ—
Errors: Invalid age
--------------------------------------------------

Next Steps

Now that you’ve learned the basics of AI-Graph, here are some next steps to explore:

Documentation

  • Concepts: Learn about the core concepts in detail

  • API Reference: Full API documentation for all classes and methods

Best Practices

  1. Keep steps small and focused - Each step should do one thing well

  2. Use meaningful names - Name your steps clearly to improve readability

  3. Handle errors gracefully - Always consider what might go wrong

  4. Test your pipelines - Write unit tests for your custom steps

  5. Use type hints - AI-Graph supports full type checking with mypy

Advanced Features

  • Custom ForEach implementations - Create specialized iteration patterns

  • Pipeline composition - Combine multiple pipelines

  • Performance optimization - Tips for handling large datasets

  • Integration patterns - How to integrate with other frameworks

Summary

In this quick start guide, you learned:

βœ… How to create custom pipeline steps by extending BasePipelineStep
βœ… How to build pipelines and chain multiple steps together
βœ… How to use built-in steps like AddKeyStep and DelKeyStep
βœ… How to process collections with ForEachStep
βœ… Error handling patterns and best practices
βœ… Real-world examples for text processing and data validation

The AI-Graph framework provides a powerful and flexible way to build processing pipelines that are easy to understand, test, and maintain. Happy coding! πŸš€