AI-Graph Framework Exampleο
This notebook demonstrates how to use the AI-Graph framework to build and manage AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.
Overviewο
The AI-Graph framework provides:
Pipeline: A container for managing sequential processing steps
BasePipelineStep: Abstract base class for creating custom processing steps
Built-in Steps: Ready-to-use steps like
AddKeyStep,DelKeyStep, andForEachStepChain of Responsibility: Steps can be chained together for complex workflows
1. Basic Setup and Importsο
First, letβs import the necessary components from the AI-Graph framework.
[1]:
# Import the core components from AI-Graph
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep
# Additional imports for our examples
import json
from typing import Dict, Any
print("AI-Graph framework imported successfully!")
AI-Graph framework imported successfully!
2. Creating a Custom Pipeline Stepο
Letβs create a custom step that processes text data. This demonstrates how to extend the BasePipelineStep class.
[2]:
class TextProcessingStep(BasePipelineStep):
"""A custom step that processes text data."""
def __init__(self, operation: str = "upper", name: str = None):
super().__init__(name or f"TextProcessing_{operation}")
self.operation = operation
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process text data based on the specified operation."""
if "text" in data:
text = data["text"]
if self.operation == "upper":
data["text"] = text.upper()
elif self.operation == "lower":
data["text"] = text.lower()
elif self.operation == "reverse":
data["text"] = text[::-1]
elif self.operation == "word_count":
data["word_count"] = len(text.split())
# Add processing metadata
if "processing_history" not in data:
data["processing_history"] = []
data["processing_history"].append(f"Applied {self.operation} operation")
return data
print("Custom TextProcessingStep created!")
Custom TextProcessingStep created!
3. Basic Pipeline Exampleο
Now letβs create a simple pipeline that processes text data through multiple steps.
[3]:
# Create a pipeline with multiple steps
pipeline = Pipeline(name="TextProcessingPipeline")
# Add steps to the pipeline
pipeline.add_step(AddKeyStep("timestamp", "2025-07-09", "AddTimestamp"))
pipeline.add_step(TextProcessingStep("word_count", "CountWords"))
pipeline.add_step(TextProcessingStep("upper", "ToUpperCase"))
pipeline.add_step(AddKeyStep("processed_by", "AI-Graph Framework", "AddProcessedBy"))
# Test data
input_data = {
"text": "Hello, World! This is a test of the AI-Graph framework.",
"source": "example_notebook"
}
print("Input data:")
print(json.dumps(input_data, indent=2))
# Process the data through the pipeline
result = pipeline.process(input_data)
print("\nProcessed result:")
print(json.dumps(result, indent=2))
Input data:
{
"text": "Hello, World! This is a test of the AI-Graph framework.",
"source": "example_notebook"
}
Processed result:
{
"text": "HELLO, WORLD! THIS IS A TEST OF THE AI-GRAPH FRAMEWORK.",
"source": "example_notebook",
"timestamp": "2025-07-09",
"word_count": 10,
"processing_history": [
"Applied word_count operation",
"Applied upper operation"
],
"processed_by": "AI-Graph Framework"
}
4. Using ForEach Stepο
The ForEachStep allows you to process collections of items or run a fixed number of iterations. Letβs demonstrate both approaches.
[4]:
# Example 1: Processing a list of items
foreach_pipeline = Pipeline(name="ForEachTextProcessing")
# Create a ForEach step that processes each text item
foreach_step = ForEachStep(
items_key="texts",
results_key="processed_texts",
name="ProcessEachText"
)
# Add sub-steps to the ForEach step
foreach_step.add_sub_step(TextProcessingStep("lower", "ToLowerCase"))
foreach_step.add_sub_step(TextProcessingStep("reverse", "ReverseText"))
foreach_step.add_sub_step(AddKeyStep("processed", True, "MarkAsProcessed"))
# Add the ForEach step to the pipeline
foreach_pipeline.add_step(foreach_step)
# Test data with multiple text items
foreach_input = {
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001"
}
print("ForEach input data:")
print(json.dumps(foreach_input, indent=2))
# Process the data
foreach_result = foreach_pipeline.process(foreach_input)
print("\nForEach processing result:")
print(json.dumps(foreach_result, indent=2))
ForEach input data:
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001"
}
Processing ProcessEachText: 100%|ββββββββββ| 4/4 [00:00<00:00, 35246.25item/s]
ForEach processing result:
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001",
"processed_texts": [
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001",
"_current_item": "Hello World",
"_iteration_index": 0,
"processed": true
},
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001",
"_current_item": "AI-Graph Framework",
"_iteration_index": 1,
"processed": true
},
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001",
"_current_item": "Pipeline Processing",
"_iteration_index": 2,
"processed": true
},
{
"texts": [
"Hello World",
"AI-Graph Framework",
"Pipeline Processing",
"Chain of Responsibility"
],
"batch_id": "example_batch_001",
"_current_item": "Chain of Responsibility",
"_iteration_index": 3,
"processed": true
}
]
}
5. Fixed Iterations Exampleο
Now letβs demonstrate using ForEachStep with a fixed number of iterations instead of processing a collection.
[5]:
class CounterStep(BasePipelineStep):
"""A step that increments a counter."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
current_count = data.get("counter", 0)
data["counter"] = current_count + 1
data["iteration_info"] = f"Iteration {data['counter']} completed"
return data
# Create a pipeline with fixed iterations
iteration_pipeline = Pipeline(name="IterationExample")
# Create a ForEach step with fixed iterations
iteration_step = ForEachStep(
iterations=5, # Run 5 iterations
results_key="iteration_results",
name="RunIterations"
)
# Add sub-steps
iteration_step.add_sub_step(CounterStep("IncrementCounter"))
iteration_step.add_sub_step(AddKeyStep("status", "completed", "MarkCompleted"))
# Add to pipeline
iteration_pipeline.add_step(iteration_step)
# Test data
iteration_input = {
"counter": 0,
"experiment_name": "counter_test"
}
print("Iteration input data:")
print(json.dumps(iteration_input, indent=2))
# Process the data
iteration_result = iteration_pipeline.process(iteration_input)
print("\nIteration processing result:")
print(json.dumps(iteration_result, indent=2))
Iteration input data:
{
"counter": 0,
"experiment_name": "counter_test"
}
Processing RunIterations: 100%|ββββββββββ| 5/5 [00:00<00:00, 58908.76item/s]
Iteration processing result:
{
"counter": 0,
"experiment_name": "counter_test",
"iteration_results": [
{
"counter": 1,
"experiment_name": "counter_test",
"_current_item": 0,
"_iteration_index": 0,
"iteration_info": "Iteration 1 completed",
"status": "completed"
},
{
"counter": 1,
"experiment_name": "counter_test",
"_current_item": 1,
"_iteration_index": 1,
"iteration_info": "Iteration 1 completed",
"status": "completed"
},
{
"counter": 1,
"experiment_name": "counter_test",
"_current_item": 2,
"_iteration_index": 2,
"iteration_info": "Iteration 1 completed",
"status": "completed"
},
{
"counter": 1,
"experiment_name": "counter_test",
"_current_item": 3,
"_iteration_index": 3,
"iteration_info": "Iteration 1 completed",
"status": "completed"
},
{
"counter": 1,
"experiment_name": "counter_test",
"_current_item": 4,
"_iteration_index": 4,
"iteration_info": "Iteration 1 completed",
"status": "completed"
}
]
}
6. Complex Pipeline Exampleο
Letβs create a more complex pipeline that demonstrates multiple features working together.
[6]:
class DataValidationStep(BasePipelineStep):
"""A step that validates required fields in the data."""
def __init__(self, required_fields: list, name: str = None):
super().__init__(name or "DataValidation")
self.required_fields = required_fields
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
missing_fields = [field for field in self.required_fields if field not in data]
data["validation_result"] = {
"is_valid": len(missing_fields) == 0,
"missing_fields": missing_fields,
"required_fields": self.required_fields
}
if missing_fields:
data["validation_errors"] = f"Missing required fields: {', '.join(missing_fields)}"
return data
class SummaryStep(BasePipelineStep):
"""A step that creates a summary of the processed data."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
summary = {
"total_keys": len(data.keys()),
"has_processing_history": "processing_history" in data,
"processing_steps": len(data.get("processing_history", [])),
"validation_passed": data.get("validation_result", {}).get("is_valid", False)
}
data["summary"] = summary
return data
# Create a complex pipeline
complex_pipeline = Pipeline(name="ComplexDataProcessing")
# Step 1: Validate input data
complex_pipeline.add_step(DataValidationStep(["name", "data"], "ValidateInput"))
# Step 2: Add metadata
complex_pipeline.add_step(AddKeyStep("pipeline_version", "1.0", "AddVersion"))
complex_pipeline.add_step(AddKeyStep("processing_date", "2025-07-09", "AddDate"))
# Step 3: Process data items if they exist
data_foreach = ForEachStep(
items_key="data",
results_key="processed_data",
name="ProcessDataItems"
)
data_foreach.add_sub_step(TextProcessingStep("upper", "ToUpper"))
data_foreach.add_sub_step(TextProcessingStep("word_count", "CountWords"))
complex_pipeline.add_step(data_foreach)
# Step 4: Create summary
complex_pipeline.add_step(SummaryStep("CreateSummary"))
# Step 5: Clean up temporary data
complex_pipeline.add_step(DelKeyStep("validation_result", "CleanupValidation"))
# Test the complex pipeline
complex_input = {
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
}
}
print("Complex pipeline input:")
print(json.dumps(complex_input, indent=2))
# Process the data
complex_result = complex_pipeline.process(complex_input)
print("\nComplex pipeline result:")
print(json.dumps(complex_result, indent=2))
Complex pipeline input:
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
}
}
Processing ProcessDataItems: 100%|ββββββββββ| 4/4 [00:00<00:00, 57065.36item/s]
Complex pipeline result:
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
},
"pipeline_version": "1.0",
"processing_date": "2025-07-09",
"processed_data": [
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
},
"validation_result": {
"is_valid": true,
"missing_fields": [],
"required_fields": [
"name",
"data"
]
},
"pipeline_version": "1.0",
"processing_date": "2025-07-09",
"_current_item": "artificial intelligence",
"_iteration_index": 0
},
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
},
"validation_result": {
"is_valid": true,
"missing_fields": [],
"required_fields": [
"name",
"data"
]
},
"pipeline_version": "1.0",
"processing_date": "2025-07-09",
"_current_item": "machine learning pipeline",
"_iteration_index": 1
},
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
},
"validation_result": {
"is_valid": true,
"missing_fields": [],
"required_fields": [
"name",
"data"
]
},
"pipeline_version": "1.0",
"processing_date": "2025-07-09",
"_current_item": "data processing framework",
"_iteration_index": 2
},
{
"name": "ComplexDataProcessingExample",
"data": [
"artificial intelligence",
"machine learning pipeline",
"data processing framework",
"chain of responsibility pattern"
],
"metadata": {
"source": "example_notebook",
"type": "demo"
},
"validation_result": {
"is_valid": true,
"missing_fields": [],
"required_fields": [
"name",
"data"
]
},
"pipeline_version": "1.0",
"processing_date": "2025-07-09",
"_current_item": "chain of responsibility pattern",
"_iteration_index": 3
}
],
"summary": {
"total_keys": 7,
"has_processing_history": false,
"processing_steps": 0,
"validation_passed": true
}
}
7. Error Handling Exampleο
Letβs demonstrate how to handle errors in the pipeline.
[7]:
class SafeProcessingStep(BasePipelineStep):
"""A step that demonstrates error handling."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
# Simulate some processing that might fail
if "error_trigger" in data:
raise ValueError("Simulated processing error")
# Normal processing
data["safe_processing_completed"] = True
except Exception as e:
# Handle the error gracefully
data["error_occurred"] = True
data["error_message"] = str(e)
data["error_step"] = self.name
return data
# Create a pipeline with error handling
error_pipeline = Pipeline(name="ErrorHandlingExample")
error_pipeline.add_step(SafeProcessingStep("SafeProcessor"))
error_pipeline.add_step(AddKeyStep("final_step", "reached", "FinalStep"))
# Test with normal data
normal_data = {"message": "This should work fine"}
print("Testing with normal data:")
normal_result = error_pipeline.process(normal_data)
print(json.dumps(normal_result, indent=2))
print("\n" + "="*50 + "\n")
# Test with error-triggering data
error_data = {"message": "This will trigger an error", "error_trigger": True}
print("Testing with error-triggering data:")
error_result = error_pipeline.process(error_data)
print(json.dumps(error_result, indent=2))
Testing with normal data:
{
"message": "This should work fine",
"safe_processing_completed": true,
"final_step": "reached"
}
==================================================
Testing with error-triggering data:
{
"message": "This will trigger an error",
"error_trigger": true,
"error_occurred": true,
"error_message": "Simulated processing error",
"error_step": "SafeProcessor",
"final_step": "reached"
}
8. Performance Monitoring Exampleο
Letβs create a step that monitors performance metrics.
[8]:
import time
from typing import Dict, Any
class PerformanceMonitoringStep(BasePipelineStep):
"""A step that monitors performance metrics."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
start_time = time.time()
# Simulate some processing work
time.sleep(0.1) # Simulate 100ms of work
# Add some computational work
if "items" in data:
processed_items = len(data["items"])
else:
processed_items = 1
end_time = time.time()
processing_time = end_time - start_time
# Add performance metrics
if "performance_metrics" not in data:
data["performance_metrics"] = []
data["performance_metrics"].append({
"step_name": self.name,
"processing_time_seconds": round(processing_time, 4),
"items_processed": processed_items,
"items_per_second": round(processed_items / processing_time, 2) if processing_time > 0 else 0
})
return data
# Create a performance monitoring pipeline
perf_pipeline = Pipeline(name="PerformanceMonitoring")
# Add multiple performance-monitored steps
perf_pipeline.add_step(PerformanceMonitoringStep("DataPreprocessing"))
perf_pipeline.add_step(PerformanceMonitoringStep("MainProcessing"))
perf_pipeline.add_step(PerformanceMonitoringStep("PostProcessing"))
# Add a summary step
class PerformanceSummaryStep(BasePipelineStep):
"""Summarize performance metrics."""
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
if "performance_metrics" in data:
metrics = data["performance_metrics"]
total_time = sum(m["processing_time_seconds"] for m in metrics)
total_items = sum(m["items_processed"] for m in metrics)
data["performance_summary"] = {
"total_processing_time_seconds": round(total_time, 4),
"total_items_processed": total_items,
"average_items_per_second": round(total_items / total_time, 2) if total_time > 0 else 0,
"steps_executed": len(metrics)
}
return data
perf_pipeline.add_step(PerformanceSummaryStep("CreatePerformanceSummary"))
# Test the performance monitoring
perf_input = {
"items": ["item1", "item2", "item3", "item4", "item5"],
"operation": "performance_test"
}
print("Running performance monitoring pipeline...")
perf_result = perf_pipeline.process(perf_input)
print("\nPerformance monitoring result:")
print(json.dumps(perf_result, indent=2))
Running performance monitoring pipeline...
Performance monitoring result:
{
"items": [
"item1",
"item2",
"item3",
"item4",
"item5"
],
"operation": "performance_test",
"performance_metrics": [
{
"step_name": "DataPreprocessing",
"processing_time_seconds": 0.1001,
"items_processed": 5,
"items_per_second": 49.95
},
{
"step_name": "MainProcessing",
"processing_time_seconds": 0.1001,
"items_processed": 5,
"items_per_second": 49.95
},
{
"step_name": "PostProcessing",
"processing_time_seconds": 0.1001,
"items_processed": 5,
"items_per_second": 49.96
}
],
"performance_summary": {
"total_processing_time_seconds": 0.3003,
"total_items_processed": 15,
"average_items_per_second": 49.95,
"steps_executed": 3
}
}
9. Conclusionο
This notebook has demonstrated the key features of the AI-Graph framework:
Key Features Demonstrated:ο
Pipeline Creation: Building sequential processing workflows
Custom Steps: Creating custom processing steps by extending
BasePipelineStepBuilt-in Steps: Using
AddKeyStep,DelKeyStep, andForEachStepChain of Responsibility: Steps automatically chain together
ForEach Processing: Handling collections and fixed iterations
Error Handling: Graceful error handling within pipeline steps
Performance Monitoring: Tracking processing metrics
Data Validation: Validating input data structure
Benefits of the AI-Graph Framework:ο
Modular Design: Easy to add, remove, or modify processing steps
Reusable Components: Steps can be reused across different pipelines
Error Resilience: Built-in error handling capabilities
Performance Monitoring: Easy to add performance tracking
Flexible Data Flow: Data is passed through the chain automatically
Extensible: Easy to create custom steps for specific needs
Use Cases:ο
Data Processing Pipelines: ETL operations, data transformation
AI/ML Workflows: Model training, inference, and post-processing
Content Processing: Text processing, image processing
Business Logic: Sequential business rule processing
API Request Processing: Multi-step API request handling
The AI-Graph framework provides a solid foundation for building scalable, maintainable AI workflows with clear separation of concerns and easy extensibility.