Core Conceptsο
Understanding the fundamental concepts of AI-Graph will help you build more effective processing pipelines. This interactive guide will walk you through each concept with practical examples you can run and experiment with.
What Youβll Learnο
Chain of Responsibility Pattern - How AI-Graph orchestrates data flow
Steps - The building blocks of processing pipelines
Pipelines - How to orchestrate multiple steps
ForEach Processing - Working with collections and iterations
Data Flow Patterns - Common patterns for data transformation
Best Practices - Guidelines for effective pipeline design
Setup and Importsο
Letβs start by importing the necessary components:
[1]:
# Core AI-Graph imports
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep
# Additional imports for examples
import json
import time
import re
from datetime import datetime
from typing import Dict, Any, List, Optional
print("β
AI-Graph framework loaded successfully!")
print("π Ready to explore core concepts...")
β
AI-Graph framework loaded successfully!
π Ready to explore core concepts...
The Chain of Responsibility Patternο
AI-Graph is built on the Chain of Responsibility design pattern, where:
Each Step is a handler that processes data
Pipelines chain these handlers together
Data flows through the chain sequentially
Each step can transform, filter, or enrich the data
Input β Step 1 β Step 2 β Step 3 β Output
Letβs see this in action:
[2]:
# Demonstration of the Chain of Responsibility pattern
class Step1(BasePipelineStep):
"""First step: Add metadata"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
print(f"π Step 1: Processing {data}")
data['processed_by_step1'] = True
data['step1_timestamp'] = datetime.now().isoformat()
return data
class Step2(BasePipelineStep):
"""Second step: Transform data"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
print(f"π Step 2: Processing {data}")
if 'value' in data:
data['value'] = data['value'] * 2
data['transformed_by_step2'] = True
return data
class Step3(BasePipelineStep):
"""Third step: Add final metadata"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
print(f"π Step 3: Processing {data}")
data['final_result'] = f"Processed value: {data.get('value', 'N/A')}"
data['completed'] = True
return data
# Create the pipeline and chain the steps
chain_demo = Pipeline(name="ChainOfResponsibilityDemo")
chain_demo.add_step(Step1())
chain_demo.add_step(Step2())
chain_demo.add_step(Step3())
# Test the chain
input_data = {"value": 10, "user_id": "demo_user"}
print("π Starting chain processing...\n")
result = chain_demo.process(input_data)
print("\nπ Final Result:")
for key, value in result.items():
print(f" {key}: {value}")
π Starting chain processing...
π Step 1: Processing {'value': 10, 'user_id': 'demo_user'}
π Step 2: Processing {'value': 10, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-08-14T19:07:48.147529'}
π Step 3: Processing {'value': 20, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-08-14T19:07:48.147529', 'transformed_by_step2': True}
π Final Result:
value: 20
user_id: demo_user
processed_by_step1: True
step1_timestamp: 2025-08-14T19:07:48.147529
transformed_by_step2: True
final_result: Processed value: 20
completed: True
Steps: The Building Blocksο
Steps are the fundamental processing units in AI-Graph. Every step inherits from BasePipelineStep and implements the _process_step method.
Step Lifecycleο
Each step goes through this lifecycle:
Initialization - Create the step instance
Processing - Execute the
_process_stepmethodResult - Return the processed data
Letβs explore different types of steps:
[3]:
# Example 1: Logging Step
class LoggingStep(BasePipelineStep):
def __init__(self, message: str, name: str = None):
super().__init__(name or f"LoggingStep_{message}")
self.message = message
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
print(f"π {self.message}: {data}")
return data # Pass data through unchanged
# Test the logging step
logging_pipeline = Pipeline(name="LoggingDemo")
logging_pipeline.add_step(LoggingStep("Before processing"))
logging_pipeline.add_step(AddKeyStep("processed", True))
logging_pipeline.add_step(LoggingStep("After processing"))
test_data = {"id": 123, "name": "test"}
result = logging_pipeline.process(test_data)
print(f"\nβ
Final result: {result}")
π Before processing: {'id': 123, 'name': 'test'}
π After processing: {'id': 123, 'name': 'test', 'processed': True}
β
Final result: {'id': 123, 'name': 'test', 'processed': True}
Data Transformation Patternsο
Steps can transform data in several ways. Letβs implement and test each pattern:
[4]:
# Pattern 1: Transform - Change the data structure or values
class UppercaseStep(BasePipelineStep):
"""Transforms text to uppercase"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
data['text'] = data['text'].upper()
data['transformed'] = True
return data
# Test transform pattern
transform_pipeline = Pipeline(name="TransformDemo")
transform_pipeline.add_step(UppercaseStep())
test_data = {"text": "hello world", "id": 1}
result = transform_pipeline.process(test_data)
print(f"π Transform result: {result}")
π Transform result: {'text': 'HELLO WORLD', 'id': 1, 'transformed': True}
[5]:
# Pattern 2: Filter - Remove or conditionally pass data
class FilterPositiveStep(BasePipelineStep):
"""Only allows positive numbers to pass through"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'number' in data:
if data['number'] <= 0:
data['filtered_out'] = True
data['reason'] = "Number is not positive"
else:
data['filtered_out'] = False
return data
# Test filter pattern with positive and negative numbers
filter_pipeline = Pipeline(name="FilterDemo")
filter_pipeline.add_step(FilterPositiveStep())
test_cases = [
{"number": 5, "id": "positive"},
{"number": -3, "id": "negative"},
{"number": 0, "id": "zero"}
]
print("π Filter Pattern Results:")
for test_data in test_cases:
result = filter_pipeline.process(test_data.copy())
status = "β Filtered" if result.get('filtered_out') else "β
Passed"
print(f" {test_data['id']}: {status} - {result}")
π Filter Pattern Results:
positive: β
Passed - {'number': 5, 'id': 'positive', 'filtered_out': False}
negative: β Filtered - {'number': -3, 'id': 'negative', 'filtered_out': True, 'reason': 'Number is not positive'}
zero: β Filtered - {'number': 0, 'id': 'zero', 'filtered_out': True, 'reason': 'Number is not positive'}
[6]:
# Pattern 3: Enrich - Add additional information
class AddTimestampStep(BasePipelineStep):
"""Enriches data with timestamp information"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
now = datetime.now()
data['enrichment'] = {
'timestamp': now.isoformat(),
'day_of_week': now.strftime('%A'),
'processing_time': now.strftime('%H:%M:%S')
}
return data
# Test enrich pattern
enrich_pipeline = Pipeline(name="EnrichDemo")
enrich_pipeline.add_step(AddTimestampStep())
test_data = {"user_id": "user123", "action": "login"}
result = enrich_pipeline.process(test_data)
print("π Enrich Pattern Result:")
for key, value in result.items():
print(f" {key}: {value}")
π Enrich Pattern Result:
user_id: user123
action: login
enrichment: {'timestamp': '2025-08-14T19:07:48.173542', 'day_of_week': 'Thursday', 'processing_time': '19:07:48'}
[7]:
# Pattern 4: Aggregate - Combine multiple pieces of data
class SumStep(BasePipelineStep):
"""Aggregates numeric data"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'numbers' in data and isinstance(data['numbers'], list):
data['sum'] = sum(data['numbers'])
data['count'] = len(data['numbers'])
data['average'] = data['sum'] / data['count'] if data['count'] > 0 else 0
data['aggregated'] = True
return data
# Test aggregate pattern
aggregate_pipeline = Pipeline(name="AggregateDemo")
aggregate_pipeline.add_step(SumStep())
test_data = {"numbers": [1, 2, 3, 4, 5], "dataset": "sample"}
result = aggregate_pipeline.process(test_data)
print("π Aggregate Pattern Result:")
for key, value in result.items():
print(f" {key}: {value}")
π Aggregate Pattern Result:
numbers: [1, 2, 3, 4, 5]
dataset: sample
sum: 15
count: 5
average: 3.0
aggregated: True
Pipelines: Orchestrating the Flowο
Pipelines coordinate the execution of multiple steps in sequence. Letβs explore different pipeline patterns:
[8]:
# Pipeline Execution Model Demonstration
class Step1Process(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
data['text'] = data['text'].upper()
print(f" Step 1: Uppercase β '{data['text']}'")
return data
class Step2Process(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
data['text'] = f"PREFIX: {data['text']}"
print(f" Step 2: Add prefix β '{data['text']}'")
return data
class Step3Process(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
char_count = len(data['text'])
data['char_count'] = char_count
print(f" Step 3: Count chars β {char_count}")
return data
# Create and test the pipeline
execution_demo = Pipeline(name="ExecutionFlowDemo")
execution_demo.add_step(Step1Process())
execution_demo.add_step(Step2Process())
execution_demo.add_step(Step3Process())
print("π Pipeline Execution Flow:")
test_data = {"text": "hello", "id": "demo"}
print(f" Input: {test_data}")
result = execution_demo.process(test_data)
print(f" Final Result: {result}")
π Pipeline Execution Flow:
Input: {'text': 'hello', 'id': 'demo'}
Step 1: Uppercase β 'HELLO'
Step 2: Add prefix β 'PREFIX: HELLO'
Step 3: Count chars β 13
Final Result: {'text': 'PREFIX: HELLO', 'id': 'demo', 'char_count': 13}
Error Handling in Pipelinesο
Pipelines handle errors gracefully. Letβs see how this works:
[9]:
class RiskyStep(BasePipelineStep):
"""A step that might fail under certain conditions"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'number' in data and data['number'] < 0:
raise ValueError(f"Negative values not allowed: {data['number']}")
if 'number' in data:
data['number'] = data['number'] * 2
data['processed_successfully'] = True
return data
class SafeStep(BasePipelineStep):
"""A step that always succeeds"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
data['safe_step_completed'] = True
return data
# Create pipeline with risky step
error_demo = Pipeline(name="ErrorHandlingDemo")
error_demo.add_step(SafeStep())
error_demo.add_step(RiskyStep())
error_demo.add_step(SafeStep())
# Test with safe data
print("β
Testing with safe data:")
safe_data = {"number": 5, "id": "safe_test"}
try:
result = error_demo.process(safe_data.copy())
print(f" Success: {result}")
except Exception as e:
print(f" Error: {e}")
# Test with risky data
print("\nβ Testing with risky data:")
risky_data = {"number": -5, "id": "risky_test"}
try:
result = error_demo.process(risky_data.copy())
print(f" Success: {result}")
except ValueError as e:
print(f" Caught ValueError: {e}")
except Exception as e:
print(f" Unexpected error: {e}")
β
Testing with safe data:
Success: {'number': 10, 'id': 'safe_test', 'safe_step_completed': True, 'processed_successfully': True}
β Testing with risky data:
Caught ValueError: Negative values not allowed: -5
ForEach: Processing Collectionsο
The ForEachStep enables processing collections of data by applying a sub-pipeline to each item. Letβs explore different ForEach patterns:
[10]:
# Basic ForEach Usage
class DoubleItemStep(BasePipelineStep):
"""Doubles the current item in a ForEach iteration"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if '_current_item' in data:
original = data['_current_item']
data['_current_item'] = original * 2
print(f" Doubling: {original} β {data['_current_item']}")
return data
# Create ForEach pipeline
foreach_demo = Pipeline(name="ForEachDemo")
foreach_step = ForEachStep(
items_key="numbers",
results_key="doubled_numbers"
)
foreach_step.add_sub_step(DoubleItemStep())
foreach_demo.add_step(foreach_step)
# Test ForEach
print("π ForEach Processing:")
test_data = {"numbers": [1, 2, 3, 4, 5], "operation": "double"}
result = foreach_demo.process(test_data)
original_numbers = test_data['numbers']
doubled_numbers = [item['_current_item'] for item in result['doubled_numbers']]
print(f"\nπ Results:")
print(f" Original: {original_numbers}")
print(f" Doubled: {doubled_numbers}")
π ForEach Processing:
Processing ForEachStep: 100%|ββββββββββ| 5/5 [00:00<00:00, 19099.74item/s]
Doubling: 1 β 2
Doubling: 2 β 4
Doubling: 3 β 6
Doubling: 4 β 8
Doubling: 5 β 10
π Results:
Original: [1, 2, 3, 4, 5]
Doubled: [2, 4, 6, 8, 10]
[11]:
# ForEach with Fixed Iterations
class CounterStep(BasePipelineStep):
"""Adds iteration count to data"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
iteration = data.get('_iteration_index', 0)
data['iteration_result'] = f"Iteration {iteration + 1} completed"
print(f" {data['iteration_result']}")
return data
# Create fixed iteration pipeline
iteration_demo = Pipeline(name="IterationDemo")
iteration_foreach = ForEachStep(
iterations=5,
results_key="iteration_results"
)
iteration_foreach.add_sub_step(CounterStep())
iteration_demo.add_step(iteration_foreach)
print("π’ Fixed Iterations Processing:")
test_data = {"task": "repeated_operation"}
result = iteration_demo.process(test_data)
print(f"\nπ Completed {len(result['iteration_results'])} iterations")
for i, iteration_result in enumerate(result['iteration_results']):
print(f" Result {i+1}: {iteration_result.get('iteration_result')}")
π’ Fixed Iterations Processing:
Processing ForEachStep: 100%|ββββββββββ| 5/5 [00:00<00:00, 20681.97item/s]
Iteration 1 completed
Iteration 2 completed
Iteration 3 completed
Iteration 4 completed
Iteration 5 completed
π Completed 5 iterations
Result 1: Iteration 1 completed
Result 2: Iteration 2 completed
Result 3: Iteration 3 completed
Result 4: Iteration 4 completed
Result 5: Iteration 5 completed
Data Flow Patternsο
Understanding how data flows through your pipeline is crucial for design. Letβs implement and test common data flow patterns:
[12]:
# Linear Flow Pattern
# Input β Clean β Validate β Transform β Output
class CleanDataStep(BasePipelineStep):
"""Cleans input data"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
# Remove extra whitespace and convert to lowercase
data['text'] = re.sub(r'\s+', ' ', data['text'].strip().lower())
data['cleaned'] = True
print(f" π§Ή Cleaned: '{data['text']}'")
return data
class ValidateDataStep(BasePipelineStep):
"""Validates data quality"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'text' in data:
is_valid = len(data['text']) > 0 and len(data['text']) < 100
data['is_valid'] = is_valid
data['validation_passed'] = is_valid
status = "β
Valid" if is_valid else "β Invalid"
print(f" π Validation: {status}")
return data
class TransformDataStep(BasePipelineStep):
"""Transforms validated data"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if data.get('is_valid', False) and 'text' in data:
# Add word count and create summary
words = data['text'].split()
data['word_count'] = len(words)
data['summary'] = f"Text with {len(words)} words: '{data['text'][:50]}..."
print(f" π Transformed: {data['word_count']} words")
else:
print(f" β οΈ Skipped transformation (invalid data)")
return data
# Create linear flow pipeline
linear_flow = Pipeline(name="LinearFlowDemo")
linear_flow.add_step(CleanDataStep())
linear_flow.add_step(ValidateDataStep())
linear_flow.add_step(TransformDataStep())
# Test linear flow
print("π Linear Flow Pattern:")
test_cases = [
{"text": " Hello World! ", "id": "valid_case"},
{"text": "", "id": "empty_case"},
{"text": "A" * 150, "id": "too_long_case"}
]
for i, test_data in enumerate(test_cases, 1):
print(f"\nTest {i} ({test_data['id']}):")
result = linear_flow.process(test_data.copy())
print(f" Result: Valid={result.get('is_valid')}, Words={result.get('word_count', 'N/A')}")
π Linear Flow Pattern:
Test 1 (valid_case):
π§Ή Cleaned: 'hello world!'
π Validation: β
Valid
π Transformed: 2 words
Result: Valid=True, Words=2
Test 2 (empty_case):
π§Ή Cleaned: ''
π Validation: β Invalid
β οΈ Skipped transformation (invalid data)
Result: Valid=False, Words=N/A
Test 3 (too_long_case):
π§Ή Cleaned: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
π Validation: β Invalid
β οΈ Skipped transformation (invalid data)
Result: Valid=False, Words=N/A
[13]:
# Branching Flow Pattern
# Use conditional steps to create branching logic
class ConditionalStep(BasePipelineStep):
"""Processes data differently based on conditions"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'number' in data:
number = data['number']
if number > 100:
# Path A: Large numbers
data['category'] = 'large'
data['processed_value'] = number // 10 # Divide by 10
data['processing_path'] = 'A'
print(f" π Path A (Large): {number} β {data['processed_value']}")
elif number > 0:
# Path B: Small positive numbers
data['category'] = 'small_positive'
data['processed_value'] = number * 2 # Double
data['processing_path'] = 'B'
print(f" π Path B (Small+): {number} β {data['processed_value']}")
else:
# Path C: Zero or negative
data['category'] = 'zero_or_negative'
data['processed_value'] = 0 # Set to zero
data['processing_path'] = 'C'
print(f" π Path C (Zero/Neg): {number} β {data['processed_value']}")
return data
# Create branching flow pipeline
branching_flow = Pipeline(name="BranchingFlowDemo")
branching_flow.add_step(ConditionalStep())
# Test branching flow
print("π³ Branching Flow Pattern:")
test_numbers = [150, 25, 0, -10, 1000, 5]
for number in test_numbers:
test_data = {"number": number, "id": f"test_{number}"}
result = branching_flow.process(test_data)
print(f"\nπ Summary of branching results:")
path_counts = {'A': 0, 'B': 0, 'C': 0}
for number in test_numbers:
test_data = {"number": number}
result = branching_flow.process(test_data)
path = result.get('processing_path', 'Unknown')
if path in path_counts:
path_counts[path] += 1
for path, count in path_counts.items():
print(f" Path {path}: {count} items")
π³ Branching Flow Pattern:
π Path A (Large): 150 β 15
π Path B (Small+): 25 β 50
π Path C (Zero/Neg): 0 β 0
π Path C (Zero/Neg): -10 β 0
π Path A (Large): 1000 β 100
π Path B (Small+): 5 β 10
π Summary of branching results:
π Path A (Large): 150 β 15
π Path B (Small+): 25 β 50
π Path C (Zero/Neg): 0 β 0
π Path C (Zero/Neg): -10 β 0
π Path A (Large): 1000 β 100
π Path B (Small+): 5 β 10
Path A: 2 items
Path B: 2 items
Path C: 2 items
Complete Example: Email Processing Pipelineο
Letβs put all concepts together in a comprehensive example that demonstrates real-world usage:
[14]:
# Complete Email Processing Pipeline Example
class ValidateEmailStep(BasePipelineStep):
"""Validates email format using regex"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if '_current_item' in data:
email = data['_current_item']
# Simple email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
is_valid = bool(re.match(email_pattern, email))
if is_valid:
data['email_valid'] = True
print(f" β
Valid: {email}")
else:
data['email_valid'] = False
data['_current_item'] = None # Filter out invalid emails
print(f" β Invalid: {email}")
return data
class NormalizeEmailStep(BasePipelineStep):
"""Normalizes email to lowercase and trims whitespace"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if data.get('email_valid') and '_current_item' in data and data['_current_item']:
original = data['_current_item']
normalized = original.lower().strip()
data['_current_item'] = normalized
data['normalized'] = True
if original != normalized:
print(f" π Normalized: {original} β {normalized}")
else:
print(f" β‘οΈ Already normalized: {normalized}")
return data
class ExtractDomainStep(BasePipelineStep):
"""Extracts domain from email and adds metadata"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if data.get('email_valid') and '_current_item' in data and data['_current_item']:
email = data['_current_item']
domain = email.split('@')[1]
data['domain'] = domain
data['email'] = email
# Add domain type classification
common_domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'hotmail.com']
data['domain_type'] = 'common' if domain in common_domains else 'other'
print(f" π·οΈ Domain: {domain} ({data['domain_type']})")
return data
class EmailStatisticsStep(BasePipelineStep):
"""Aggregates statistics about processed emails"""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'processed_emails' in data:
results = data['processed_emails']
# Filter out invalid emails (None values)
valid_results = [r for r in results if r.get('email_valid', False)]
# Calculate statistics
total_processed = len(results)
total_valid = len(valid_results)
total_invalid = total_processed - total_valid
# Domain statistics
domains = [r.get('domain') for r in valid_results if r.get('domain')]
domain_counts = {}
for domain in domains:
domain_counts[domain] = domain_counts.get(domain, 0) + 1
# Type statistics
common_count = sum(1 for r in valid_results if r.get('domain_type') == 'common')
other_count = total_valid - common_count
data['statistics'] = {
'total_processed': total_processed,
'valid_emails': total_valid,
'invalid_emails': total_invalid,
'success_rate': f"{(total_valid/total_processed*100):.1f}%" if total_processed > 0 else "0%",
'domain_distribution': domain_counts,
'common_domains': common_count,
'other_domains': other_count
}
print(f"\nπ Email Processing Statistics:")
print(f" Total processed: {total_processed}")
print(f" Valid emails: {total_valid} ({data['statistics']['success_rate']})")
print(f" Invalid emails: {total_invalid}")
print(f" Common domains: {common_count}")
print(f" Other domains: {other_count}")
return data
# Build the complete email processing pipeline
print("π§ Building Email Processing Pipeline...\n")
# Create the sub-pipeline for individual email processing
email_foreach = ForEachStep(
items_key="emails",
results_key="processed_emails"
)
email_foreach.add_sub_step(ValidateEmailStep())
email_foreach.add_sub_step(NormalizeEmailStep())
email_foreach.add_sub_step(ExtractDomainStep())
# Create the main pipeline
email_pipeline = Pipeline(name="EmailProcessingPipeline")
email_pipeline.add_step(email_foreach)
email_pipeline.add_step(EmailStatisticsStep())
# Test with sample emails
test_emails = {
"emails": [
"John.Doe@GMAIL.COM",
"invalid-email",
"alice@example.org",
"bob@company.com",
" JANE@yahoo.com ",
"test@outlook.com",
"@invalid.com",
"user@hotmail.com",
"another@custom-domain.net"
],
"batch_id": "demo_batch_001"
}
print(f"π Processing {len(test_emails['emails'])} emails...\n")
result = email_pipeline.process(test_emails)
# Display results
print("\nπ Valid Email Results:")
valid_emails = [r for r in result['processed_emails'] if r.get('email_valid', False)]
for i, email_result in enumerate(valid_emails, 1):
email = email_result.get('email', 'N/A')
domain = email_result.get('domain', 'N/A')
domain_type = email_result.get('domain_type', 'N/A')
print(f" {i}. {email} β {domain} ({domain_type})")
print(f"\nπ― Final Statistics:")
stats = result.get('statistics', {})
for key, value in stats.items():
if key == 'domain_distribution':
print(f" {key}:")
for domain, count in value.items():
print(f" {domain}: {count}")
else:
print(f" {key}: {value}")
π§ Building Email Processing Pipeline...
π Processing 9 emails...
Processing ForEachStep: 100%|ββββββββββ| 9/9 [00:00<00:00, 11397.57item/s]
β
Valid: John.Doe@GMAIL.COM
π Normalized: John.Doe@GMAIL.COM β john.doe@gmail.com
π·οΈ Domain: gmail.com (common)
β Invalid: invalid-email
β
Valid: alice@example.org
β‘οΈ Already normalized: alice@example.org
π·οΈ Domain: example.org (other)
β
Valid: bob@company.com
β‘οΈ Already normalized: bob@company.com
π·οΈ Domain: company.com (other)
β Invalid: JANE@yahoo.com
β
Valid: test@outlook.com
β‘οΈ Already normalized: test@outlook.com
π·οΈ Domain: outlook.com (common)
β Invalid: @invalid.com
β
Valid: user@hotmail.com
β‘οΈ Already normalized: user@hotmail.com
π·οΈ Domain: hotmail.com (common)
β
Valid: another@custom-domain.net
β‘οΈ Already normalized: another@custom-domain.net
π·οΈ Domain: custom-domain.net (other)
π Email Processing Statistics:
Total processed: 9
Valid emails: 6 (66.7%)
Invalid emails: 3
Common domains: 3
Other domains: 3
π Valid Email Results:
1. john.doe@gmail.com β gmail.com (common)
2. alice@example.org β example.org (other)
3. bob@company.com β company.com (other)
4. test@outlook.com β outlook.com (common)
5. user@hotmail.com β hotmail.com (common)
6. another@custom-domain.net β custom-domain.net (other)
π― Final Statistics:
total_processed: 9
valid_emails: 6
invalid_emails: 3
success_rate: 66.7%
domain_distribution:
gmail.com: 1
example.org: 1
company.com: 1
outlook.com: 1
hotmail.com: 1
custom-domain.net: 1
common_domains: 3
other_domains: 3
Best Practices Summaryο
Based on the examples weβve explored, here are the key best practices for using AI-Graph effectively:
[15]:
# Best Practices Demonstration
print("π― AI-Graph Best Practices:")
print("\n1. π― Single Responsibility")
print(" Each step should have one clear purpose.")
print(" β
Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep")
print(" β Bad: ProcessAllEmailDataStep")
print("\n2. π Immutable Data")
print(" Avoid modifying input data; return new data instead.")
print(" β
Good: data['new_field'] = processed_value")
print(" β Bad: Modifying data['existing_field'] in place without copying")
print("\n3. π‘οΈ Error Handling")
print(" Always consider what can go wrong and handle it gracefully.")
print(" β
Good: Check for required fields, validate data types")
print(" β Bad: Assume data is always in expected format")
print("\n4. π·οΈ Type Safety")
print(" Use type hints to make your code more robust.")
print(" β
Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]")
print(" β Bad: def _process_step(self, data) -> dict")
print("\n5. π§ͺ Testing")
print(" Write unit tests for each step individually.")
print(" β
Good: Test each step with various input scenarios")
print(" β Bad: Only test the complete pipeline")
print("\n6. π Documentation")
print(" Document what each step does and its expected input/output.")
print(" β
Good: Clear docstrings and parameter documentation")
print(" β Bad: No documentation or unclear naming")
# Example of a well-designed step following all best practices
class BestPracticeStep(BasePipelineStep):
"""
Example step following all best practices.
This step validates and normalizes user names in the input data.
Expected input:
- data['name']: str - The user name to process
Output:
- data['name']: str - The normalized name (if valid)
- data['name_valid']: bool - Whether the name is valid
- data['name_length']: int - Length of the normalized name
"""
def __init__(self, min_length: int = 2, max_length: int = 50, name: str = None):
super().__init__(name or f"NameValidator_{min_length}_{max_length}")
self.min_length = min_length
self.max_length = max_length
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process and validate user name."""
try:
# Check if name field exists and is string
if 'name' not in data:
data['name_valid'] = False
data['validation_error'] = "Name field missing"
return data
raw_name = data['name']
if not isinstance(raw_name, str):
data['name_valid'] = False
data['validation_error'] = "Name must be string"
return data
# Normalize name (immutable approach)
normalized_name = raw_name.strip().title()
# Validate length
if len(normalized_name) < self.min_length:
data['name_valid'] = False
data['validation_error'] = f"Name too short (min {self.min_length})"
return data
if len(normalized_name) > self.max_length:
data['name_valid'] = False
data['validation_error'] = f"Name too long (max {self.max_length})"
return data
# Success case
data['name'] = normalized_name
data['name_valid'] = True
data['name_length'] = len(normalized_name)
data['validation_error'] = None
except Exception as e:
# Graceful error handling
data['name_valid'] = False
data['validation_error'] = f"Unexpected error: {str(e)}"
return data
print("\nπ Example of Best Practice Step Created!")
print("See the BestPracticeStep class above for a complete example.")
π― AI-Graph Best Practices:
1. π― Single Responsibility
Each step should have one clear purpose.
β
Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep
β Bad: ProcessAllEmailDataStep
2. π Immutable Data
Avoid modifying input data; return new data instead.
β
Good: data['new_field'] = processed_value
β Bad: Modifying data['existing_field'] in place without copying
3. π‘οΈ Error Handling
Always consider what can go wrong and handle it gracefully.
β
Good: Check for required fields, validate data types
β Bad: Assume data is always in expected format
4. π·οΈ Type Safety
Use type hints to make your code more robust.
β
Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]
β Bad: def _process_step(self, data) -> dict
5. π§ͺ Testing
Write unit tests for each step individually.
β
Good: Test each step with various input scenarios
β Bad: Only test the complete pipeline
6. π Documentation
Document what each step does and its expected input/output.
β
Good: Clear docstrings and parameter documentation
β Bad: No documentation or unclear naming
π Example of Best Practice Step Created!
See the BestPracticeStep class above for a complete example.
Summaryο
In this comprehensive concepts guide, youβve learned:
Key Takeawaysο
Keep steps focused - Each step should do one thing well
Plan your data flow - Understand how data transforms through your pipeline
Handle errors gracefully - Always consider edge cases and failures
Use type hints - Make your code more robust and maintainable
Test thoroughly - Validate each step and the complete pipeline
Document clearly - Help others (and future you) understand your design
Next Stepsο
Now that you understand the core concepts, you can:
Build custom pipelines for your specific use cases
Combine patterns to create sophisticated data processing workflows
Optimize performance for large-scale data processing
Integrate with other systems using AI-Graph as a processing engine
Happy pipeline building! π