Core Concepts

Understanding the fundamental concepts of AI-Graph will help you build more effective processing pipelines. This interactive guide will walk you through each concept with practical examples you can run and experiment with.

What You’ll Learn

  • Chain of Responsibility Pattern - How AI-Graph orchestrates data flow

  • Steps - The building blocks of processing pipelines

  • Pipelines - How to orchestrate multiple steps

  • ForEach Processing - Working with collections and iterations

  • Data Flow Patterns - Common patterns for data transformation

  • Best Practices - Guidelines for effective pipeline design

Setup and Imports

Let’s start by importing the necessary components:

[1]:
# Core AI-Graph imports
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Additional imports for examples
import json
import time
import re
from datetime import datetime
from typing import Dict, Any, List, Optional

print("βœ… AI-Graph framework loaded successfully!")
print("πŸ“š Ready to explore core concepts...")
βœ… AI-Graph framework loaded successfully!
πŸ“š Ready to explore core concepts...

The Chain of Responsibility Pattern

AI-Graph is built on the Chain of Responsibility design pattern, where:

  • Each Step is a handler that processes data

  • Pipelines chain these handlers together

  • Data flows through the chain sequentially

  • Each step can transform, filter, or enrich the data

Input β†’ Step 1 β†’ Step 2 β†’ Step 3 β†’ Output

Let’s see this in action:

[2]:
# Demonstration of the Chain of Responsibility pattern

class Step1(BasePipelineStep):
    """First step: Add metadata"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        print(f"πŸ”— Step 1: Processing {data}")
        data['processed_by_step1'] = True
        data['step1_timestamp'] = datetime.now().isoformat()
        return data

class Step2(BasePipelineStep):
    """Second step: Transform data"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        print(f"πŸ”— Step 2: Processing {data}")
        if 'value' in data:
            data['value'] = data['value'] * 2
            data['transformed_by_step2'] = True
        return data

class Step3(BasePipelineStep):
    """Third step: Add final metadata"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        print(f"πŸ”— Step 3: Processing {data}")
        data['final_result'] = f"Processed value: {data.get('value', 'N/A')}"
        data['completed'] = True
        return data

# Create the pipeline and chain the steps
chain_demo = Pipeline(name="ChainOfResponsibilityDemo")
chain_demo.add_step(Step1())
chain_demo.add_step(Step2())
chain_demo.add_step(Step3())

# Test the chain
input_data = {"value": 10, "user_id": "demo_user"}
print("πŸš€ Starting chain processing...\n")

result = chain_demo.process(input_data)

print("\nπŸ“Š Final Result:")
for key, value in result.items():
    print(f"  {key}: {value}")
πŸš€ Starting chain processing...

πŸ”— Step 1: Processing {'value': 10, 'user_id': 'demo_user'}
πŸ”— Step 2: Processing {'value': 10, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-08-14T19:07:48.147529'}
πŸ”— Step 3: Processing {'value': 20, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-08-14T19:07:48.147529', 'transformed_by_step2': True}

πŸ“Š Final Result:
  value: 20
  user_id: demo_user
  processed_by_step1: True
  step1_timestamp: 2025-08-14T19:07:48.147529
  transformed_by_step2: True
  final_result: Processed value: 20
  completed: True

Steps: The Building Blocks

Steps are the fundamental processing units in AI-Graph. Every step inherits from BasePipelineStep and implements the _process_step method.

Step Lifecycle

Each step goes through this lifecycle:

  1. Initialization - Create the step instance

  2. Processing - Execute the _process_step method

  3. Result - Return the processed data

Let’s explore different types of steps:

[3]:
# Example 1: Logging Step
class LoggingStep(BasePipelineStep):
    def __init__(self, message: str, name: str = None):
        super().__init__(name or f"LoggingStep_{message}")
        self.message = message

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        print(f"πŸ“ {self.message}: {data}")
        return data  # Pass data through unchanged

# Test the logging step
logging_pipeline = Pipeline(name="LoggingDemo")
logging_pipeline.add_step(LoggingStep("Before processing"))
logging_pipeline.add_step(AddKeyStep("processed", True))
logging_pipeline.add_step(LoggingStep("After processing"))

test_data = {"id": 123, "name": "test"}
result = logging_pipeline.process(test_data)
print(f"\nβœ… Final result: {result}")
πŸ“ Before processing: {'id': 123, 'name': 'test'}
πŸ“ After processing: {'id': 123, 'name': 'test', 'processed': True}

βœ… Final result: {'id': 123, 'name': 'test', 'processed': True}

Data Transformation Patterns

Steps can transform data in several ways. Let’s implement and test each pattern:

[4]:
# Pattern 1: Transform - Change the data structure or values
class UppercaseStep(BasePipelineStep):
    """Transforms text to uppercase"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            data['text'] = data['text'].upper()
            data['transformed'] = True
        return data

# Test transform pattern
transform_pipeline = Pipeline(name="TransformDemo")
transform_pipeline.add_step(UppercaseStep())

test_data = {"text": "hello world", "id": 1}
result = transform_pipeline.process(test_data)
print(f"πŸ”„ Transform result: {result}")
πŸ”„ Transform result: {'text': 'HELLO WORLD', 'id': 1, 'transformed': True}
[5]:
# Pattern 2: Filter - Remove or conditionally pass data
class FilterPositiveStep(BasePipelineStep):
    """Only allows positive numbers to pass through"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'number' in data:
            if data['number'] <= 0:
                data['filtered_out'] = True
                data['reason'] = "Number is not positive"
            else:
                data['filtered_out'] = False
        return data

# Test filter pattern with positive and negative numbers
filter_pipeline = Pipeline(name="FilterDemo")
filter_pipeline.add_step(FilterPositiveStep())

test_cases = [
    {"number": 5, "id": "positive"},
    {"number": -3, "id": "negative"},
    {"number": 0, "id": "zero"}
]

print("πŸ” Filter Pattern Results:")
for test_data in test_cases:
    result = filter_pipeline.process(test_data.copy())
    status = "❌ Filtered" if result.get('filtered_out') else "βœ… Passed"
    print(f"  {test_data['id']}: {status} - {result}")
πŸ” Filter Pattern Results:
  positive: βœ… Passed - {'number': 5, 'id': 'positive', 'filtered_out': False}
  negative: ❌ Filtered - {'number': -3, 'id': 'negative', 'filtered_out': True, 'reason': 'Number is not positive'}
  zero: ❌ Filtered - {'number': 0, 'id': 'zero', 'filtered_out': True, 'reason': 'Number is not positive'}
[6]:
# Pattern 3: Enrich - Add additional information
class AddTimestampStep(BasePipelineStep):
    """Enriches data with timestamp information"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        now = datetime.now()
        data['enrichment'] = {
            'timestamp': now.isoformat(),
            'day_of_week': now.strftime('%A'),
            'processing_time': now.strftime('%H:%M:%S')
        }
        return data

# Test enrich pattern
enrich_pipeline = Pipeline(name="EnrichDemo")
enrich_pipeline.add_step(AddTimestampStep())

test_data = {"user_id": "user123", "action": "login"}
result = enrich_pipeline.process(test_data)
print("πŸ“ˆ Enrich Pattern Result:")
for key, value in result.items():
    print(f"  {key}: {value}")
πŸ“ˆ Enrich Pattern Result:
  user_id: user123
  action: login
  enrichment: {'timestamp': '2025-08-14T19:07:48.173542', 'day_of_week': 'Thursday', 'processing_time': '19:07:48'}
[7]:
# Pattern 4: Aggregate - Combine multiple pieces of data
class SumStep(BasePipelineStep):
    """Aggregates numeric data"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'numbers' in data and isinstance(data['numbers'], list):
            data['sum'] = sum(data['numbers'])
            data['count'] = len(data['numbers'])
            data['average'] = data['sum'] / data['count'] if data['count'] > 0 else 0
            data['aggregated'] = True
        return data

# Test aggregate pattern
aggregate_pipeline = Pipeline(name="AggregateDemo")
aggregate_pipeline.add_step(SumStep())

test_data = {"numbers": [1, 2, 3, 4, 5], "dataset": "sample"}
result = aggregate_pipeline.process(test_data)
print("πŸ“Š Aggregate Pattern Result:")
for key, value in result.items():
    print(f"  {key}: {value}")
πŸ“Š Aggregate Pattern Result:
  numbers: [1, 2, 3, 4, 5]
  dataset: sample
  sum: 15
  count: 5
  average: 3.0
  aggregated: True

Pipelines: Orchestrating the Flow

Pipelines coordinate the execution of multiple steps in sequence. Let’s explore different pipeline patterns:

[8]:
# Pipeline Execution Model Demonstration
class Step1Process(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            data['text'] = data['text'].upper()
            print(f"  Step 1: Uppercase β†’ '{data['text']}'")
        return data

class Step2Process(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            data['text'] = f"PREFIX: {data['text']}"
            print(f"  Step 2: Add prefix β†’ '{data['text']}'")
        return data

class Step3Process(BasePipelineStep):
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            char_count = len(data['text'])
            data['char_count'] = char_count
            print(f"  Step 3: Count chars β†’ {char_count}")
        return data

# Create and test the pipeline
execution_demo = Pipeline(name="ExecutionFlowDemo")
execution_demo.add_step(Step1Process())
execution_demo.add_step(Step2Process())
execution_demo.add_step(Step3Process())

print("πŸ”„ Pipeline Execution Flow:")
test_data = {"text": "hello", "id": "demo"}
print(f"  Input: {test_data}")

result = execution_demo.process(test_data)
print(f"  Final Result: {result}")
πŸ”„ Pipeline Execution Flow:
  Input: {'text': 'hello', 'id': 'demo'}
  Step 1: Uppercase β†’ 'HELLO'
  Step 2: Add prefix β†’ 'PREFIX: HELLO'
  Step 3: Count chars β†’ 13
  Final Result: {'text': 'PREFIX: HELLO', 'id': 'demo', 'char_count': 13}

Error Handling in Pipelines

Pipelines handle errors gracefully. Let’s see how this works:

[9]:
class RiskyStep(BasePipelineStep):
    """A step that might fail under certain conditions"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'number' in data and data['number'] < 0:
            raise ValueError(f"Negative values not allowed: {data['number']}")

        if 'number' in data:
            data['number'] = data['number'] * 2
            data['processed_successfully'] = True
        return data

class SafeStep(BasePipelineStep):
    """A step that always succeeds"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        data['safe_step_completed'] = True
        return data

# Create pipeline with risky step
error_demo = Pipeline(name="ErrorHandlingDemo")
error_demo.add_step(SafeStep())
error_demo.add_step(RiskyStep())
error_demo.add_step(SafeStep())

# Test with safe data
print("βœ… Testing with safe data:")
safe_data = {"number": 5, "id": "safe_test"}
try:
    result = error_demo.process(safe_data.copy())
    print(f"  Success: {result}")
except Exception as e:
    print(f"  Error: {e}")

# Test with risky data
print("\n❌ Testing with risky data:")
risky_data = {"number": -5, "id": "risky_test"}
try:
    result = error_demo.process(risky_data.copy())
    print(f"  Success: {result}")
except ValueError as e:
    print(f"  Caught ValueError: {e}")
except Exception as e:
    print(f"  Unexpected error: {e}")
βœ… Testing with safe data:
  Success: {'number': 10, 'id': 'safe_test', 'safe_step_completed': True, 'processed_successfully': True}

❌ Testing with risky data:
  Caught ValueError: Negative values not allowed: -5

ForEach: Processing Collections

The ForEachStep enables processing collections of data by applying a sub-pipeline to each item. Let’s explore different ForEach patterns:

[10]:
# Basic ForEach Usage
class DoubleItemStep(BasePipelineStep):
    """Doubles the current item in a ForEach iteration"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if '_current_item' in data:
            original = data['_current_item']
            data['_current_item'] = original * 2
            print(f"  Doubling: {original} β†’ {data['_current_item']}")
        return data

# Create ForEach pipeline
foreach_demo = Pipeline(name="ForEachDemo")
foreach_step = ForEachStep(
    items_key="numbers",
    results_key="doubled_numbers"
)
foreach_step.add_sub_step(DoubleItemStep())
foreach_demo.add_step(foreach_step)

# Test ForEach
print("πŸ”„ ForEach Processing:")
test_data = {"numbers": [1, 2, 3, 4, 5], "operation": "double"}
result = foreach_demo.process(test_data)

original_numbers = test_data['numbers']
doubled_numbers = [item['_current_item'] for item in result['doubled_numbers']]
print(f"\nπŸ“Š Results:")
print(f"  Original: {original_numbers}")
print(f"  Doubled:  {doubled_numbers}")
πŸ”„ ForEach Processing:
Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:00<00:00, 19099.74item/s]
  Doubling: 1 β†’ 2
  Doubling: 2 β†’ 4
  Doubling: 3 β†’ 6
  Doubling: 4 β†’ 8
  Doubling: 5 β†’ 10

πŸ“Š Results:
  Original: [1, 2, 3, 4, 5]
  Doubled:  [2, 4, 6, 8, 10]

[11]:
# ForEach with Fixed Iterations
class CounterStep(BasePipelineStep):
    """Adds iteration count to data"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        iteration = data.get('_iteration_index', 0)
        data['iteration_result'] = f"Iteration {iteration + 1} completed"
        print(f"  {data['iteration_result']}")
        return data

# Create fixed iteration pipeline
iteration_demo = Pipeline(name="IterationDemo")
iteration_foreach = ForEachStep(
    iterations=5,
    results_key="iteration_results"
)
iteration_foreach.add_sub_step(CounterStep())
iteration_demo.add_step(iteration_foreach)

print("πŸ”’ Fixed Iterations Processing:")
test_data = {"task": "repeated_operation"}
result = iteration_demo.process(test_data)

print(f"\nπŸ“Š Completed {len(result['iteration_results'])} iterations")
for i, iteration_result in enumerate(result['iteration_results']):
    print(f"  Result {i+1}: {iteration_result.get('iteration_result')}")
πŸ”’ Fixed Iterations Processing:
Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:00<00:00, 20681.97item/s]
  Iteration 1 completed
  Iteration 2 completed
  Iteration 3 completed
  Iteration 4 completed
  Iteration 5 completed

πŸ“Š Completed 5 iterations
  Result 1: Iteration 1 completed
  Result 2: Iteration 2 completed
  Result 3: Iteration 3 completed
  Result 4: Iteration 4 completed
  Result 5: Iteration 5 completed

Data Flow Patterns

Understanding how data flows through your pipeline is crucial for design. Let’s implement and test common data flow patterns:

[12]:
# Linear Flow Pattern
# Input β†’ Clean β†’ Validate β†’ Transform β†’ Output

class CleanDataStep(BasePipelineStep):
    """Cleans input data"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            # Remove extra whitespace and convert to lowercase
            data['text'] = re.sub(r'\s+', ' ', data['text'].strip().lower())
            data['cleaned'] = True
            print(f"  🧹 Cleaned: '{data['text']}'")
        return data

class ValidateDataStep(BasePipelineStep):
    """Validates data quality"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'text' in data:
            is_valid = len(data['text']) > 0 and len(data['text']) < 100
            data['is_valid'] = is_valid
            data['validation_passed'] = is_valid
            status = "βœ… Valid" if is_valid else "❌ Invalid"
            print(f"  πŸ” Validation: {status}")
        return data

class TransformDataStep(BasePipelineStep):
    """Transforms validated data"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if data.get('is_valid', False) and 'text' in data:
            # Add word count and create summary
            words = data['text'].split()
            data['word_count'] = len(words)
            data['summary'] = f"Text with {len(words)} words: '{data['text'][:50]}..."
            print(f"  πŸ”„ Transformed: {data['word_count']} words")
        else:
            print(f"  ⚠️  Skipped transformation (invalid data)")
        return data

# Create linear flow pipeline
linear_flow = Pipeline(name="LinearFlowDemo")
linear_flow.add_step(CleanDataStep())
linear_flow.add_step(ValidateDataStep())
linear_flow.add_step(TransformDataStep())

# Test linear flow
print("πŸ“ˆ Linear Flow Pattern:")
test_cases = [
    {"text": "  Hello    World!  ", "id": "valid_case"},
    {"text": "", "id": "empty_case"},
    {"text": "A" * 150, "id": "too_long_case"}
]

for i, test_data in enumerate(test_cases, 1):
    print(f"\nTest {i} ({test_data['id']}):")
    result = linear_flow.process(test_data.copy())
    print(f"  Result: Valid={result.get('is_valid')}, Words={result.get('word_count', 'N/A')}")
πŸ“ˆ Linear Flow Pattern:

Test 1 (valid_case):
  🧹 Cleaned: 'hello world!'
  πŸ” Validation: βœ… Valid
  πŸ”„ Transformed: 2 words
  Result: Valid=True, Words=2

Test 2 (empty_case):
  🧹 Cleaned: ''
  πŸ” Validation: ❌ Invalid
  ⚠️  Skipped transformation (invalid data)
  Result: Valid=False, Words=N/A

Test 3 (too_long_case):
  🧹 Cleaned: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
  πŸ” Validation: ❌ Invalid
  ⚠️  Skipped transformation (invalid data)
  Result: Valid=False, Words=N/A
[13]:
# Branching Flow Pattern
# Use conditional steps to create branching logic

class ConditionalStep(BasePipelineStep):
    """Processes data differently based on conditions"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'number' in data:
            number = data['number']

            if number > 100:
                # Path A: Large numbers
                data['category'] = 'large'
                data['processed_value'] = number // 10  # Divide by 10
                data['processing_path'] = 'A'
                print(f"  πŸ”€ Path A (Large): {number} β†’ {data['processed_value']}")
            elif number > 0:
                # Path B: Small positive numbers
                data['category'] = 'small_positive'
                data['processed_value'] = number * 2  # Double
                data['processing_path'] = 'B'
                print(f"  πŸ”€ Path B (Small+): {number} β†’ {data['processed_value']}")
            else:
                # Path C: Zero or negative
                data['category'] = 'zero_or_negative'
                data['processed_value'] = 0  # Set to zero
                data['processing_path'] = 'C'
                print(f"  πŸ”€ Path C (Zero/Neg): {number} β†’ {data['processed_value']}")

        return data

# Create branching flow pipeline
branching_flow = Pipeline(name="BranchingFlowDemo")
branching_flow.add_step(ConditionalStep())

# Test branching flow
print("🌳 Branching Flow Pattern:")
test_numbers = [150, 25, 0, -10, 1000, 5]

for number in test_numbers:
    test_data = {"number": number, "id": f"test_{number}"}
    result = branching_flow.process(test_data)

print(f"\nπŸ“Š Summary of branching results:")
path_counts = {'A': 0, 'B': 0, 'C': 0}
for number in test_numbers:
    test_data = {"number": number}
    result = branching_flow.process(test_data)
    path = result.get('processing_path', 'Unknown')
    if path in path_counts:
        path_counts[path] += 1

for path, count in path_counts.items():
    print(f"  Path {path}: {count} items")
🌳 Branching Flow Pattern:
  πŸ”€ Path A (Large): 150 β†’ 15
  πŸ”€ Path B (Small+): 25 β†’ 50
  πŸ”€ Path C (Zero/Neg): 0 β†’ 0
  πŸ”€ Path C (Zero/Neg): -10 β†’ 0
  πŸ”€ Path A (Large): 1000 β†’ 100
  πŸ”€ Path B (Small+): 5 β†’ 10

πŸ“Š Summary of branching results:
  πŸ”€ Path A (Large): 150 β†’ 15
  πŸ”€ Path B (Small+): 25 β†’ 50
  πŸ”€ Path C (Zero/Neg): 0 β†’ 0
  πŸ”€ Path C (Zero/Neg): -10 β†’ 0
  πŸ”€ Path A (Large): 1000 β†’ 100
  πŸ”€ Path B (Small+): 5 β†’ 10
  Path A: 2 items
  Path B: 2 items
  Path C: 2 items

Complete Example: Email Processing Pipeline

Let’s put all concepts together in a comprehensive example that demonstrates real-world usage:

[14]:
# Complete Email Processing Pipeline Example

class ValidateEmailStep(BasePipelineStep):
    """Validates email format using regex"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if '_current_item' in data:
            email = data['_current_item']
            # Simple email validation
            email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            is_valid = bool(re.match(email_pattern, email))

            if is_valid:
                data['email_valid'] = True
                print(f"  βœ… Valid: {email}")
            else:
                data['email_valid'] = False
                data['_current_item'] = None  # Filter out invalid emails
                print(f"  ❌ Invalid: {email}")
        return data

class NormalizeEmailStep(BasePipelineStep):
    """Normalizes email to lowercase and trims whitespace"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if data.get('email_valid') and '_current_item' in data and data['_current_item']:
            original = data['_current_item']
            normalized = original.lower().strip()
            data['_current_item'] = normalized
            data['normalized'] = True
            if original != normalized:
                print(f"  πŸ”„ Normalized: {original} β†’ {normalized}")
            else:
                print(f"  ➑️  Already normalized: {normalized}")
        return data

class ExtractDomainStep(BasePipelineStep):
    """Extracts domain from email and adds metadata"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if data.get('email_valid') and '_current_item' in data and data['_current_item']:
            email = data['_current_item']
            domain = email.split('@')[1]
            data['domain'] = domain
            data['email'] = email

            # Add domain type classification
            common_domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'hotmail.com']
            data['domain_type'] = 'common' if domain in common_domains else 'other'

            print(f"  🏷️  Domain: {domain} ({data['domain_type']})")
        return data

class EmailStatisticsStep(BasePipelineStep):
    """Aggregates statistics about processed emails"""
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if 'processed_emails' in data:
            results = data['processed_emails']

            # Filter out invalid emails (None values)
            valid_results = [r for r in results if r.get('email_valid', False)]

            # Calculate statistics
            total_processed = len(results)
            total_valid = len(valid_results)
            total_invalid = total_processed - total_valid

            # Domain statistics
            domains = [r.get('domain') for r in valid_results if r.get('domain')]
            domain_counts = {}
            for domain in domains:
                domain_counts[domain] = domain_counts.get(domain, 0) + 1

            # Type statistics
            common_count = sum(1 for r in valid_results if r.get('domain_type') == 'common')
            other_count = total_valid - common_count

            data['statistics'] = {
                'total_processed': total_processed,
                'valid_emails': total_valid,
                'invalid_emails': total_invalid,
                'success_rate': f"{(total_valid/total_processed*100):.1f}%" if total_processed > 0 else "0%",
                'domain_distribution': domain_counts,
                'common_domains': common_count,
                'other_domains': other_count
            }

            print(f"\nπŸ“Š Email Processing Statistics:")
            print(f"  Total processed: {total_processed}")
            print(f"  Valid emails: {total_valid} ({data['statistics']['success_rate']})")
            print(f"  Invalid emails: {total_invalid}")
            print(f"  Common domains: {common_count}")
            print(f"  Other domains: {other_count}")

        return data

# Build the complete email processing pipeline
print("πŸ“§ Building Email Processing Pipeline...\n")

# Create the sub-pipeline for individual email processing
email_foreach = ForEachStep(
    items_key="emails",
    results_key="processed_emails"
)
email_foreach.add_sub_step(ValidateEmailStep())
email_foreach.add_sub_step(NormalizeEmailStep())
email_foreach.add_sub_step(ExtractDomainStep())

# Create the main pipeline
email_pipeline = Pipeline(name="EmailProcessingPipeline")
email_pipeline.add_step(email_foreach)
email_pipeline.add_step(EmailStatisticsStep())

# Test with sample emails
test_emails = {
    "emails": [
        "John.Doe@GMAIL.COM",
        "invalid-email",
        "alice@example.org",
        "bob@company.com",
        "  JANE@yahoo.com  ",
        "test@outlook.com",
        "@invalid.com",
        "user@hotmail.com",
        "another@custom-domain.net"
    ],
    "batch_id": "demo_batch_001"
}

print(f"πŸš€ Processing {len(test_emails['emails'])} emails...\n")
result = email_pipeline.process(test_emails)

# Display results
print("\nπŸ“‹ Valid Email Results:")
valid_emails = [r for r in result['processed_emails'] if r.get('email_valid', False)]
for i, email_result in enumerate(valid_emails, 1):
    email = email_result.get('email', 'N/A')
    domain = email_result.get('domain', 'N/A')
    domain_type = email_result.get('domain_type', 'N/A')
    print(f"  {i}. {email} β†’ {domain} ({domain_type})")

print(f"\n🎯 Final Statistics:")
stats = result.get('statistics', {})
for key, value in stats.items():
    if key == 'domain_distribution':
        print(f"  {key}:")
        for domain, count in value.items():
            print(f"    {domain}: {count}")
    else:
        print(f"  {key}: {value}")
πŸ“§ Building Email Processing Pipeline...

πŸš€ Processing 9 emails...

Processing ForEachStep: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 9/9 [00:00<00:00, 11397.57item/s]
  βœ… Valid: John.Doe@GMAIL.COM
  πŸ”„ Normalized: John.Doe@GMAIL.COM β†’ john.doe@gmail.com
  🏷️  Domain: gmail.com (common)
  ❌ Invalid: invalid-email
  βœ… Valid: alice@example.org
  ➑️  Already normalized: alice@example.org
  🏷️  Domain: example.org (other)
  βœ… Valid: bob@company.com
  ➑️  Already normalized: bob@company.com
  🏷️  Domain: company.com (other)
  ❌ Invalid:   JANE@yahoo.com
  βœ… Valid: test@outlook.com
  ➑️  Already normalized: test@outlook.com
  🏷️  Domain: outlook.com (common)
  ❌ Invalid: @invalid.com
  βœ… Valid: user@hotmail.com
  ➑️  Already normalized: user@hotmail.com
  🏷️  Domain: hotmail.com (common)
  βœ… Valid: another@custom-domain.net
  ➑️  Already normalized: another@custom-domain.net
  🏷️  Domain: custom-domain.net (other)

πŸ“Š Email Processing Statistics:
  Total processed: 9
  Valid emails: 6 (66.7%)
  Invalid emails: 3
  Common domains: 3
  Other domains: 3

πŸ“‹ Valid Email Results:
  1. john.doe@gmail.com β†’ gmail.com (common)
  2. alice@example.org β†’ example.org (other)
  3. bob@company.com β†’ company.com (other)
  4. test@outlook.com β†’ outlook.com (common)
  5. user@hotmail.com β†’ hotmail.com (common)
  6. another@custom-domain.net β†’ custom-domain.net (other)

🎯 Final Statistics:
  total_processed: 9
  valid_emails: 6
  invalid_emails: 3
  success_rate: 66.7%
  domain_distribution:
    gmail.com: 1
    example.org: 1
    company.com: 1
    outlook.com: 1
    hotmail.com: 1
    custom-domain.net: 1
  common_domains: 3
  other_domains: 3

Best Practices Summary

Based on the examples we’ve explored, here are the key best practices for using AI-Graph effectively:

[15]:
# Best Practices Demonstration

print("🎯 AI-Graph Best Practices:")
print("\n1. 🎯 Single Responsibility")
print("   Each step should have one clear purpose.")
print("   βœ… Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep")
print("   ❌ Bad: ProcessAllEmailDataStep")

print("\n2. πŸ”’ Immutable Data")
print("   Avoid modifying input data; return new data instead.")
print("   βœ… Good: data['new_field'] = processed_value")
print("   ❌ Bad: Modifying data['existing_field'] in place without copying")

print("\n3. πŸ›‘οΈ Error Handling")
print("   Always consider what can go wrong and handle it gracefully.")
print("   βœ… Good: Check for required fields, validate data types")
print("   ❌ Bad: Assume data is always in expected format")

print("\n4. 🏷️ Type Safety")
print("   Use type hints to make your code more robust.")
print("   βœ… Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]")
print("   ❌ Bad: def _process_step(self, data) -> dict")

print("\n5. πŸ§ͺ Testing")
print("   Write unit tests for each step individually.")
print("   βœ… Good: Test each step with various input scenarios")
print("   ❌ Bad: Only test the complete pipeline")

print("\n6. πŸ“š Documentation")
print("   Document what each step does and its expected input/output.")
print("   βœ… Good: Clear docstrings and parameter documentation")
print("   ❌ Bad: No documentation or unclear naming")

# Example of a well-designed step following all best practices
class BestPracticeStep(BasePipelineStep):
    """
    Example step following all best practices.

    This step validates and normalizes user names in the input data.

    Expected input:
    - data['name']: str - The user name to process

    Output:
    - data['name']: str - The normalized name (if valid)
    - data['name_valid']: bool - Whether the name is valid
    - data['name_length']: int - Length of the normalized name
    """

    def __init__(self, min_length: int = 2, max_length: int = 50, name: str = None):
        super().__init__(name or f"NameValidator_{min_length}_{max_length}")
        self.min_length = min_length
        self.max_length = max_length

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Process and validate user name."""
        try:
            # Check if name field exists and is string
            if 'name' not in data:
                data['name_valid'] = False
                data['validation_error'] = "Name field missing"
                return data

            raw_name = data['name']
            if not isinstance(raw_name, str):
                data['name_valid'] = False
                data['validation_error'] = "Name must be string"
                return data

            # Normalize name (immutable approach)
            normalized_name = raw_name.strip().title()

            # Validate length
            if len(normalized_name) < self.min_length:
                data['name_valid'] = False
                data['validation_error'] = f"Name too short (min {self.min_length})"
                return data

            if len(normalized_name) > self.max_length:
                data['name_valid'] = False
                data['validation_error'] = f"Name too long (max {self.max_length})"
                return data

            # Success case
            data['name'] = normalized_name
            data['name_valid'] = True
            data['name_length'] = len(normalized_name)
            data['validation_error'] = None

        except Exception as e:
            # Graceful error handling
            data['name_valid'] = False
            data['validation_error'] = f"Unexpected error: {str(e)}"

        return data

print("\nπŸŽ‰ Example of Best Practice Step Created!")
print("See the BestPracticeStep class above for a complete example.")
🎯 AI-Graph Best Practices:

1. 🎯 Single Responsibility
   Each step should have one clear purpose.
   βœ… Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep
   ❌ Bad: ProcessAllEmailDataStep

2. πŸ”’ Immutable Data
   Avoid modifying input data; return new data instead.
   βœ… Good: data['new_field'] = processed_value
   ❌ Bad: Modifying data['existing_field'] in place without copying

3. πŸ›‘οΈ Error Handling
   Always consider what can go wrong and handle it gracefully.
   βœ… Good: Check for required fields, validate data types
   ❌ Bad: Assume data is always in expected format

4. 🏷️ Type Safety
   Use type hints to make your code more robust.
   βœ… Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]
   ❌ Bad: def _process_step(self, data) -> dict

5. πŸ§ͺ Testing
   Write unit tests for each step individually.
   βœ… Good: Test each step with various input scenarios
   ❌ Bad: Only test the complete pipeline

6. πŸ“š Documentation
   Document what each step does and its expected input/output.
   βœ… Good: Clear docstrings and parameter documentation
   ❌ Bad: No documentation or unclear naming

πŸŽ‰ Example of Best Practice Step Created!
See the BestPracticeStep class above for a complete example.

Summary

In this comprehensive concepts guide, you’ve learned:

βœ… Chain of Responsibility Pattern - How data flows through connected steps
βœ… Step Design Patterns - Transform, Filter, Enrich, and Aggregate patterns
βœ… Pipeline Orchestration - Managing step execution and error handling
βœ… ForEach Processing - Handling collections and iterations effectively
βœ… Data Flow Patterns - Linear, branching, filtering, and aggregation flows
βœ… Best Practices - Guidelines for robust, maintainable pipeline code
βœ… Real-world Example - Complete email processing pipeline with statistics

Key Takeaways

  1. Keep steps focused - Each step should do one thing well

  2. Plan your data flow - Understand how data transforms through your pipeline

  3. Handle errors gracefully - Always consider edge cases and failures

  4. Use type hints - Make your code more robust and maintainable

  5. Test thoroughly - Validate each step and the complete pipeline

  6. Document clearly - Help others (and future you) understand your design

Next Steps

Now that you understand the core concepts, you can:

  • Build custom pipelines for your specific use cases

  • Combine patterns to create sophisticated data processing workflows

  • Optimize performance for large-scale data processing

  • Integrate with other systems using AI-Graph as a processing engine

Happy pipeline building! πŸš€