Quick Start Guideο
This guide will help you get started with AI-Graph in just a few minutes. AI-Graph is a framework for building and managing AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.
What Youβll Learnο
How to create custom pipeline steps
How to build and run pipelines
How to chain multiple steps together
How to work with collections using ForEach steps
Error handling best practices
Real-world examples
Setup and Importsο
First, letβs import the necessary components from the AI-Graph framework:
[1]:
# Import the core components from AI-Graph
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep
# Additional imports for our examples
import json
import time
import re
from typing import Dict, Any
print("AI-Graph framework imported successfully!")
AI-Graph framework imported successfully!
Your First Pipelineο
Letβs create a simple pipeline step that processes numerical data. In AI-Graph, all steps work with dictionary data structures and inherit from BasePipelineStep:
[2]:
class DoubleStep(BasePipelineStep):
"""A step that doubles the input value."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Double the 'value' key in the input data."""
if 'value' in data:
data['value'] = data['value'] * 2
return data
# Create and run the pipeline
pipeline = Pipeline(name="DoubleValuePipeline")
pipeline.add_step(DoubleStep())
# Input data as a dictionary
input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
Input: {'value': 10}
Result: {'value': 10}
Multiple Steps in a Pipelineο
You can chain multiple steps together. Each step processes the data and passes it to the next step:
[3]:
class AddStep(BasePipelineStep):
def __init__(self, add_value: int, name: str = None):
super().__init__(name or f"AddStep_{add_value}")
self.add_value = add_value
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'value' in data:
data['value'] = data['value'] + self.add_value
return data
class MultiplyStep(BasePipelineStep):
def __init__(self, multiply_value: int, name: str = None):
super().__init__(name or f"MultiplyStep_{multiply_value}")
self.multiply_value = multiply_value
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if 'value' in data:
data['value'] = data['value'] * self.multiply_value
return data
# Create pipeline with multiple steps
pipeline = Pipeline(name="MathPipeline")
pipeline.add_step(AddStep(10)) # Add 10
pipeline.add_step(MultiplyStep(2)) # Multiply by 2
input_data = {"value": 5}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
print(f"Calculation: (5 + 10) * 2 = {result['value']}")
Input: {'value': 30}
Result: {'value': 30}
Calculation: (5 + 10) * 2 = 30
Using Built-in Stepsο
AI-Graph provides several built-in steps for common operations. Letβs use AddKeyStep and DelKeyStep:
[4]:
# Create a pipeline that adds metadata and processes data
pipeline = Pipeline(name="MetadataPipeline")
pipeline.add_step(AddKeyStep("timestamp", "2025-07-16T10:00:00Z"))
pipeline.add_step(AddKeyStep("processed_by", "AI-Graph"))
pipeline.add_step(DoubleStep()) # Our custom step from before
pipeline.add_step(DelKeyStep("timestamp")) # Remove timestamp after processing
input_data = {"value": 42, "user_id": "user123"}
result = pipeline.process(input_data)
print(f"Input: {input_data}")
print(f"Result: {result}")
Input: {'value': 84, 'user_id': 'user123', 'processed_by': 'AI-Graph'}
Result: {'value': 84, 'user_id': 'user123', 'processed_by': 'AI-Graph'}
Working with Collections using ForEachο
Use ForEach steps to process collections of items. The ForEach step creates a sub-pipeline that processes each item:
[5]:
class SquareStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Square the current item in a ForEach iteration."""
if '_current_item' in data:
# ForEach puts the current item in '_current_item'
current_value = data['_current_item']
data['_current_item'] = current_value ** 2
return data
# Create main pipeline with ForEach
pipeline = Pipeline(name="SquareNumbersPipeline")
# Create ForEach step that will process each number in the 'numbers' key
foreach_step = ForEachStep(
items_key="numbers",
results_key="squared_numbers"
)
foreach_step.add_sub_step(SquareStep())
pipeline.add_step(foreach_step)
input_data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(input_data)
print(f"Input numbers: {input_data['numbers']}")
print(f"Squared numbers: {[item['_current_item'] for item in result['squared_numbers']]}")
Processing ForEachStep: 100%|ββββββββββ| 5/5 [00:00<00:00, 50051.36item/s]
Input numbers: [1, 2, 3, 4, 5]
Squared numbers: [1, 4, 9, 16, 25]
Error Handlingο
AI-Graph provides built-in error handling. Letβs create a step that might fail and see how to handle it:
[6]:
class DivideStep(BasePipelineStep):
def __init__(self, divisor: float, name: str = None):
super().__init__(name or f"DivideStep_{divisor}")
self.divisor = divisor
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if self.divisor == 0:
raise ValueError("Cannot divide by zero")
if 'value' in data:
data['value'] = data['value'] / self.divisor
return data
# Test with a safe divisor first
pipeline = Pipeline(name="SafeDivisionPipeline")
pipeline.add_step(DivideStep(2))
input_data = {"value": 10}
result = pipeline.process(input_data)
print(f"Safe division - Input: {input_data}, Result: {result}")
# Now test with zero (this will raise an error)
pipeline_with_error = Pipeline(name="ErrorPipeline")
pipeline_with_error.add_step(DivideStep(0))
try:
input_data = {"value": 10}
result = pipeline_with_error.process(input_data)
except ValueError as e:
print(f"Caught expected error: {e}")
Safe division - Input: {'value': 5.0}, Result: {'value': 5.0}
Caught expected error: Cannot divide by zero
Real-World Example: Text Processing Pipelineο
Hereβs a more practical example for text processing that demonstrates multiple concepts:
[7]:
class CleanTextStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Clean text by removing extra whitespace and converting to lowercase."""
if '_current_item' in data:
text = data['_current_item']
# Remove extra whitespace and convert to lowercase
cleaned = re.sub(r'\s+', ' ', text.strip().lower())
data['_current_item'] = cleaned
return data
class CountWordsStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Count words in the text."""
if '_current_item' in data:
text = data['_current_item']
word_count = len(text.split())
data['word_count'] = word_count
data['text'] = text # Keep the cleaned text
return data
class FilterLongTextsStep(BasePipelineStep):
def __init__(self, min_words: int = 5, name: str = None):
super().__init__(name or f"FilterLongTexts_{min_words}")
self.min_words = min_words
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Filter out texts with fewer than min_words."""
if 'word_count' in data and data['word_count'] < self.min_words:
data['filtered_out'] = True
else:
data['filtered_out'] = False
return data
# Create the text processing sub-pipeline
foreach_step = ForEachStep(
items_key="texts",
results_key="processed_texts"
)
foreach_step.add_sub_step(CleanTextStep())
foreach_step.add_sub_step(CountWordsStep())
foreach_step.add_sub_step(FilterLongTextsStep(min_words=3))
# Create main pipeline
main_pipeline = Pipeline(name="TextProcessingPipeline")
main_pipeline.add_step(foreach_step)
# Test data
input_data = {
"texts": [
"Hello World!",
"This is a longer text with many words",
"Short",
"AI-Graph makes pipeline processing easy and efficient"
]
}
result = main_pipeline.process(input_data)
print("Text Processing Results:")
print("=" * 40)
for i, processed_item in enumerate(result['processed_texts']):
original_text = input_data['texts'][i]
cleaned_text = processed_item.get('text', 'N/A')
word_count = processed_item.get('word_count', 0)
filtered = processed_item.get('filtered_out', False)
print(f"Original: '{original_text}'")
print(f"Cleaned: '{cleaned_text}'")
print(f"Words: {word_count}")
print(f"Filtered: {'Yes' if filtered else 'No'}")
print("-" * 40)
# Extract only non-filtered results
valid_results = [
item for item in result['processed_texts']
if not item.get('filtered_out', False)
]
print(f"\nValid texts (>= 3 words): {len(valid_results)} out of {len(input_data['texts'])}")
Processing ForEachStep: 100%|ββββββββββ| 4/4 [00:00<00:00, 39290.90item/s]
Text Processing Results:
========================================
Original: 'Hello World!'
Cleaned: 'hello world!'
Words: 2
Filtered: Yes
----------------------------------------
Original: 'This is a longer text with many words'
Cleaned: 'this is a longer text with many words'
Words: 8
Filtered: No
----------------------------------------
Original: 'Short'
Cleaned: 'short'
Words: 1
Filtered: Yes
----------------------------------------
Original: 'AI-Graph makes pipeline processing easy and efficient'
Cleaned: 'ai-graph makes pipeline processing easy and efficient'
Words: 7
Filtered: No
----------------------------------------
Valid texts (>= 3 words): 2 out of 4
Progress Tracking with ForEachο
The ForEach step automatically shows progress bars for long-running operations. Letβs see this in action:
[8]:
class SlowProcessingStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Simulate slow processing."""
time.sleep(0.1) # Simulate slow processing
if '_current_item' in data:
data['_current_item'] = data['_current_item'] * 2
return data
# Create pipeline with progress tracking
foreach_step = ForEachStep(
items_key="numbers",
results_key="processed_numbers"
)
foreach_step.add_sub_step(SlowProcessingStep())
pipeline = Pipeline(name="SlowProcessingPipeline")
pipeline.add_step(foreach_step)
# Process a list of numbers with progress tracking
input_data = {"numbers": list(range(10))}
print("Processing numbers with progress tracking:")
result = pipeline.process(input_data)
processed_numbers = [item['_current_item'] for item in result['processed_numbers']]
print(f"\nOriginal: {input_data['numbers']}")
print(f"Doubled: {processed_numbers}")
Processing numbers with progress tracking:
Processing ForEachStep: 100%|ββββββββββ| 10/10 [00:01<00:00, 9.90item/s]
Original: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Doubled: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Advanced Example: Data Validation Pipelineο
Letβs create a more complex example that validates and processes user data:
[9]:
class ValidateEmailStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate email format."""
if 'email' in data:
email = data['email']
# Simple email validation
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
data['email_valid'] = bool(re.match(email_pattern, email))
return data
class ValidateAgeStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate age is reasonable."""
if 'age' in data:
age = data['age']
data['age_valid'] = isinstance(age, int) and 0 <= age <= 150
return data
class SummarizeValidationStep(BasePipelineStep):
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Summarize validation results."""
email_valid = data.get('email_valid', False)
age_valid = data.get('age_valid', False)
data['is_valid'] = email_valid and age_valid
data['validation_errors'] = []
if not email_valid:
data['validation_errors'].append('Invalid email format')
if not age_valid:
data['validation_errors'].append('Invalid age')
return data
# Create validation pipeline
validation_pipeline = Pipeline(name="UserValidationPipeline")
validation_pipeline.add_step(ValidateEmailStep())
validation_pipeline.add_step(ValidateAgeStep())
validation_pipeline.add_step(SummarizeValidationStep())
# Test with various user data
test_users = [
{"name": "John Doe", "email": "john@example.com", "age": 25},
{"name": "Jane Smith", "email": "invalid-email", "age": 30},
{"name": "Bob Johnson", "email": "bob@test.com", "age": -5},
{"name": "Alice Brown", "email": "alice@company.org", "age": 200}
]
print("User Validation Results:")
print("=" * 50)
for user in test_users:
result = validation_pipeline.process(user.copy())
print(f"Name: {result['name']}")
print(f"Email: {result['email']} ({'β' if result['email_valid'] else 'β'})")
print(f"Age: {result['age']} ({'β' if result['age_valid'] else 'β'})")
print(f"Valid: {'β' if result['is_valid'] else 'β'}")
if result['validation_errors']:
print(f"Errors: {', '.join(result['validation_errors'])}")
print("-" * 50)
User Validation Results:
==================================================
Name: John Doe
Email: john@example.com (β)
Age: 25 (β)
Valid: β
--------------------------------------------------
Name: Jane Smith
Email: invalid-email (β)
Age: 30 (β)
Valid: β
Errors: Invalid email format
--------------------------------------------------
Name: Bob Johnson
Email: bob@test.com (β)
Age: -5 (β)
Valid: β
Errors: Invalid age
--------------------------------------------------
Name: Alice Brown
Email: alice@company.org (β)
Age: 200 (β)
Valid: β
Errors: Invalid age
--------------------------------------------------
Next Stepsο
Now that youβve learned the basics of AI-Graph, here are some next steps to explore:
Documentationο
Concepts: Learn about the core concepts in detail
API Reference: Full API documentation for all classes and methods
Best Practicesο
Keep steps small and focused - Each step should do one thing well
Use meaningful names - Name your steps clearly to improve readability
Handle errors gracefully - Always consider what might go wrong
Test your pipelines - Write unit tests for your custom steps
Use type hints - AI-Graph supports full type checking with mypy
Advanced Featuresο
Custom ForEach implementations - Create specialized iteration patterns
Pipeline composition - Combine multiple pipelines
Performance optimization - Tips for handling large datasets
Integration patterns - How to integrate with other frameworks
Summaryο
In this quick start guide, you learned:
BasePipelineStepAddKeyStep and DelKeyStepForEachStepThe AI-Graph framework provides a powerful and flexible way to build processing pipelines that are easy to understand, test, and maintain. Happy coding! π