AI-Graph Framework Example

This notebook demonstrates how to use the AI-Graph framework to build and manage AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.

Overview

The AI-Graph framework provides:

  • Pipeline: A container for managing sequential processing steps

  • BasePipelineStep: Abstract base class for creating custom processing steps

  • Built-in Steps: Ready-to-use steps like AddKeyStep, DelKeyStep, and ForEachStep

  • Chain of Responsibility: Steps can be chained together for complex workflows

1. Basic Setup and Imports

First, let’s import the necessary components from the AI-Graph framework.

[1]:
# Import the core components from AI-Graph
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Additional imports for our examples
import json
from typing import Dict, Any

print("AI-Graph framework imported successfully!")
AI-Graph framework imported successfully!

2. Creating a Custom Pipeline Step

Let’s create a custom step that processes text data. This demonstrates how to extend the BasePipelineStep class.

[2]:
class TextProcessingStep(BasePipelineStep):
    """A custom step that processes text data."""

    def __init__(self, operation: str = "upper", name: str = None):
        super().__init__(name or f"TextProcessing_{operation}")
        self.operation = operation

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Process text data based on the specified operation."""
        if "text" in data:
            text = data["text"]

            if self.operation == "upper":
                data["text"] = text.upper()
            elif self.operation == "lower":
                data["text"] = text.lower()
            elif self.operation == "reverse":
                data["text"] = text[::-1]
            elif self.operation == "word_count":
                data["word_count"] = len(text.split())

            # Add processing metadata
            if "processing_history" not in data:
                data["processing_history"] = []
            data["processing_history"].append(f"Applied {self.operation} operation")

        return data

print("Custom TextProcessingStep created!")
Custom TextProcessingStep created!

3. Basic Pipeline Example

Now let’s create a simple pipeline that processes text data through multiple steps.

[3]:
# Create a pipeline with multiple steps
pipeline = Pipeline(name="TextProcessingPipeline")

# Add steps to the pipeline
pipeline.add_step(AddKeyStep("timestamp", "2025-07-09", "AddTimestamp"))
pipeline.add_step(TextProcessingStep("word_count", "CountWords"))
pipeline.add_step(TextProcessingStep("upper", "ToUpperCase"))
pipeline.add_step(AddKeyStep("processed_by", "AI-Graph Framework", "AddProcessedBy"))

# Test data
input_data = {
    "text": "Hello, World! This is a test of the AI-Graph framework.",
    "source": "example_notebook"
}

print("Input data:")
print(json.dumps(input_data, indent=2))

# Process the data through the pipeline
result = pipeline.process(input_data)

print("\nProcessed result:")
print(json.dumps(result, indent=2))
Input data:
{
  "text": "Hello, World! This is a test of the AI-Graph framework.",
  "source": "example_notebook"
}

Processed result:
{
  "text": "HELLO, WORLD! THIS IS A TEST OF THE AI-GRAPH FRAMEWORK.",
  "source": "example_notebook",
  "timestamp": "2025-07-09",
  "word_count": 10,
  "processing_history": [
    "Applied word_count operation",
    "Applied upper operation"
  ],
  "processed_by": "AI-Graph Framework"
}

4. Using ForEach Step

The ForEachStep allows you to process collections of items or run a fixed number of iterations. Let’s demonstrate both approaches.

[4]:
# Example 1: Processing a list of items
foreach_pipeline = Pipeline(name="ForEachTextProcessing")

# Create a ForEach step that processes each text item
foreach_step = ForEachStep(
    items_key="texts",
    results_key="processed_texts",
    name="ProcessEachText"
)

# Add sub-steps to the ForEach step
foreach_step.add_sub_step(TextProcessingStep("lower", "ToLowerCase"))
foreach_step.add_sub_step(TextProcessingStep("reverse", "ReverseText"))
foreach_step.add_sub_step(AddKeyStep("processed", True, "MarkAsProcessed"))

# Add the ForEach step to the pipeline
foreach_pipeline.add_step(foreach_step)

# Test data with multiple text items
foreach_input = {
    "texts": [
        "Hello World",
        "AI-Graph Framework",
        "Pipeline Processing",
        "Chain of Responsibility"
    ],
    "batch_id": "example_batch_001"
}

print("ForEach input data:")
print(json.dumps(foreach_input, indent=2))

# Process the data
foreach_result = foreach_pipeline.process(foreach_input)

print("\nForEach processing result:")
print(json.dumps(foreach_result, indent=2))
ForEach input data:
{
  "texts": [
    "Hello World",
    "AI-Graph Framework",
    "Pipeline Processing",
    "Chain of Responsibility"
  ],
  "batch_id": "example_batch_001"
}
Processing ProcessEachText: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 4/4 [00:00<00:00, 35246.25item/s]

ForEach processing result:
{
  "texts": [
    "Hello World",
    "AI-Graph Framework",
    "Pipeline Processing",
    "Chain of Responsibility"
  ],
  "batch_id": "example_batch_001",
  "processed_texts": [
    {
      "texts": [
        "Hello World",
        "AI-Graph Framework",
        "Pipeline Processing",
        "Chain of Responsibility"
      ],
      "batch_id": "example_batch_001",
      "_current_item": "Hello World",
      "_iteration_index": 0,
      "processed": true
    },
    {
      "texts": [
        "Hello World",
        "AI-Graph Framework",
        "Pipeline Processing",
        "Chain of Responsibility"
      ],
      "batch_id": "example_batch_001",
      "_current_item": "AI-Graph Framework",
      "_iteration_index": 1,
      "processed": true
    },
    {
      "texts": [
        "Hello World",
        "AI-Graph Framework",
        "Pipeline Processing",
        "Chain of Responsibility"
      ],
      "batch_id": "example_batch_001",
      "_current_item": "Pipeline Processing",
      "_iteration_index": 2,
      "processed": true
    },
    {
      "texts": [
        "Hello World",
        "AI-Graph Framework",
        "Pipeline Processing",
        "Chain of Responsibility"
      ],
      "batch_id": "example_batch_001",
      "_current_item": "Chain of Responsibility",
      "_iteration_index": 3,
      "processed": true
    }
  ]
}

5. Fixed Iterations Example

Now let’s demonstrate using ForEachStep with a fixed number of iterations instead of processing a collection.

[5]:
class CounterStep(BasePipelineStep):
    """A step that increments a counter."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        current_count = data.get("counter", 0)
        data["counter"] = current_count + 1
        data["iteration_info"] = f"Iteration {data['counter']} completed"
        return data

# Create a pipeline with fixed iterations
iteration_pipeline = Pipeline(name="IterationExample")

# Create a ForEach step with fixed iterations
iteration_step = ForEachStep(
    iterations=5,  # Run 5 iterations
    results_key="iteration_results",
    name="RunIterations"
)

# Add sub-steps
iteration_step.add_sub_step(CounterStep("IncrementCounter"))
iteration_step.add_sub_step(AddKeyStep("status", "completed", "MarkCompleted"))

# Add to pipeline
iteration_pipeline.add_step(iteration_step)

# Test data
iteration_input = {
    "counter": 0,
    "experiment_name": "counter_test"
}

print("Iteration input data:")
print(json.dumps(iteration_input, indent=2))

# Process the data
iteration_result = iteration_pipeline.process(iteration_input)

print("\nIteration processing result:")
print(json.dumps(iteration_result, indent=2))
Iteration input data:
{
  "counter": 0,
  "experiment_name": "counter_test"
}
Processing RunIterations: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:00<00:00, 58908.76item/s]

Iteration processing result:
{
  "counter": 0,
  "experiment_name": "counter_test",
  "iteration_results": [
    {
      "counter": 1,
      "experiment_name": "counter_test",
      "_current_item": 0,
      "_iteration_index": 0,
      "iteration_info": "Iteration 1 completed",
      "status": "completed"
    },
    {
      "counter": 1,
      "experiment_name": "counter_test",
      "_current_item": 1,
      "_iteration_index": 1,
      "iteration_info": "Iteration 1 completed",
      "status": "completed"
    },
    {
      "counter": 1,
      "experiment_name": "counter_test",
      "_current_item": 2,
      "_iteration_index": 2,
      "iteration_info": "Iteration 1 completed",
      "status": "completed"
    },
    {
      "counter": 1,
      "experiment_name": "counter_test",
      "_current_item": 3,
      "_iteration_index": 3,
      "iteration_info": "Iteration 1 completed",
      "status": "completed"
    },
    {
      "counter": 1,
      "experiment_name": "counter_test",
      "_current_item": 4,
      "_iteration_index": 4,
      "iteration_info": "Iteration 1 completed",
      "status": "completed"
    }
  ]
}

6. Complex Pipeline Example

Let’s create a more complex pipeline that demonstrates multiple features working together.

[6]:
class DataValidationStep(BasePipelineStep):
    """A step that validates required fields in the data."""

    def __init__(self, required_fields: list, name: str = None):
        super().__init__(name or "DataValidation")
        self.required_fields = required_fields

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        missing_fields = [field for field in self.required_fields if field not in data]

        data["validation_result"] = {
            "is_valid": len(missing_fields) == 0,
            "missing_fields": missing_fields,
            "required_fields": self.required_fields
        }

        if missing_fields:
            data["validation_errors"] = f"Missing required fields: {', '.join(missing_fields)}"

        return data

class SummaryStep(BasePipelineStep):
    """A step that creates a summary of the processed data."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        summary = {
            "total_keys": len(data.keys()),
            "has_processing_history": "processing_history" in data,
            "processing_steps": len(data.get("processing_history", [])),
            "validation_passed": data.get("validation_result", {}).get("is_valid", False)
        }

        data["summary"] = summary
        return data

# Create a complex pipeline
complex_pipeline = Pipeline(name="ComplexDataProcessing")

# Step 1: Validate input data
complex_pipeline.add_step(DataValidationStep(["name", "data"], "ValidateInput"))

# Step 2: Add metadata
complex_pipeline.add_step(AddKeyStep("pipeline_version", "1.0", "AddVersion"))
complex_pipeline.add_step(AddKeyStep("processing_date", "2025-07-09", "AddDate"))

# Step 3: Process data items if they exist
data_foreach = ForEachStep(
    items_key="data",
    results_key="processed_data",
    name="ProcessDataItems"
)
data_foreach.add_sub_step(TextProcessingStep("upper", "ToUpper"))
data_foreach.add_sub_step(TextProcessingStep("word_count", "CountWords"))
complex_pipeline.add_step(data_foreach)

# Step 4: Create summary
complex_pipeline.add_step(SummaryStep("CreateSummary"))

# Step 5: Clean up temporary data
complex_pipeline.add_step(DelKeyStep("validation_result", "CleanupValidation"))

# Test the complex pipeline
complex_input = {
    "name": "ComplexDataProcessingExample",
    "data": [
        "artificial intelligence",
        "machine learning pipeline",
        "data processing framework",
        "chain of responsibility pattern"
    ],
    "metadata": {
        "source": "example_notebook",
        "type": "demo"
    }
}

print("Complex pipeline input:")
print(json.dumps(complex_input, indent=2))

# Process the data
complex_result = complex_pipeline.process(complex_input)

print("\nComplex pipeline result:")
print(json.dumps(complex_result, indent=2))
Complex pipeline input:
{
  "name": "ComplexDataProcessingExample",
  "data": [
    "artificial intelligence",
    "machine learning pipeline",
    "data processing framework",
    "chain of responsibility pattern"
  ],
  "metadata": {
    "source": "example_notebook",
    "type": "demo"
  }
}
Processing ProcessDataItems: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 4/4 [00:00<00:00, 57065.36item/s]

Complex pipeline result:
{
  "name": "ComplexDataProcessingExample",
  "data": [
    "artificial intelligence",
    "machine learning pipeline",
    "data processing framework",
    "chain of responsibility pattern"
  ],
  "metadata": {
    "source": "example_notebook",
    "type": "demo"
  },
  "pipeline_version": "1.0",
  "processing_date": "2025-07-09",
  "processed_data": [
    {
      "name": "ComplexDataProcessingExample",
      "data": [
        "artificial intelligence",
        "machine learning pipeline",
        "data processing framework",
        "chain of responsibility pattern"
      ],
      "metadata": {
        "source": "example_notebook",
        "type": "demo"
      },
      "validation_result": {
        "is_valid": true,
        "missing_fields": [],
        "required_fields": [
          "name",
          "data"
        ]
      },
      "pipeline_version": "1.0",
      "processing_date": "2025-07-09",
      "_current_item": "artificial intelligence",
      "_iteration_index": 0
    },
    {
      "name": "ComplexDataProcessingExample",
      "data": [
        "artificial intelligence",
        "machine learning pipeline",
        "data processing framework",
        "chain of responsibility pattern"
      ],
      "metadata": {
        "source": "example_notebook",
        "type": "demo"
      },
      "validation_result": {
        "is_valid": true,
        "missing_fields": [],
        "required_fields": [
          "name",
          "data"
        ]
      },
      "pipeline_version": "1.0",
      "processing_date": "2025-07-09",
      "_current_item": "machine learning pipeline",
      "_iteration_index": 1
    },
    {
      "name": "ComplexDataProcessingExample",
      "data": [
        "artificial intelligence",
        "machine learning pipeline",
        "data processing framework",
        "chain of responsibility pattern"
      ],
      "metadata": {
        "source": "example_notebook",
        "type": "demo"
      },
      "validation_result": {
        "is_valid": true,
        "missing_fields": [],
        "required_fields": [
          "name",
          "data"
        ]
      },
      "pipeline_version": "1.0",
      "processing_date": "2025-07-09",
      "_current_item": "data processing framework",
      "_iteration_index": 2
    },
    {
      "name": "ComplexDataProcessingExample",
      "data": [
        "artificial intelligence",
        "machine learning pipeline",
        "data processing framework",
        "chain of responsibility pattern"
      ],
      "metadata": {
        "source": "example_notebook",
        "type": "demo"
      },
      "validation_result": {
        "is_valid": true,
        "missing_fields": [],
        "required_fields": [
          "name",
          "data"
        ]
      },
      "pipeline_version": "1.0",
      "processing_date": "2025-07-09",
      "_current_item": "chain of responsibility pattern",
      "_iteration_index": 3
    }
  ],
  "summary": {
    "total_keys": 7,
    "has_processing_history": false,
    "processing_steps": 0,
    "validation_passed": true
  }
}

7. Error Handling Example

Let’s demonstrate how to handle errors in the pipeline.

[7]:
class SafeProcessingStep(BasePipelineStep):
    """A step that demonstrates error handling."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        try:
            # Simulate some processing that might fail
            if "error_trigger" in data:
                raise ValueError("Simulated processing error")

            # Normal processing
            data["safe_processing_completed"] = True

        except Exception as e:
            # Handle the error gracefully
            data["error_occurred"] = True
            data["error_message"] = str(e)
            data["error_step"] = self.name

        return data

# Create a pipeline with error handling
error_pipeline = Pipeline(name="ErrorHandlingExample")
error_pipeline.add_step(SafeProcessingStep("SafeProcessor"))
error_pipeline.add_step(AddKeyStep("final_step", "reached", "FinalStep"))

# Test with normal data
normal_data = {"message": "This should work fine"}
print("Testing with normal data:")
normal_result = error_pipeline.process(normal_data)
print(json.dumps(normal_result, indent=2))

print("\n" + "="*50 + "\n")

# Test with error-triggering data
error_data = {"message": "This will trigger an error", "error_trigger": True}
print("Testing with error-triggering data:")
error_result = error_pipeline.process(error_data)
print(json.dumps(error_result, indent=2))
Testing with normal data:
{
  "message": "This should work fine",
  "safe_processing_completed": true,
  "final_step": "reached"
}

==================================================

Testing with error-triggering data:
{
  "message": "This will trigger an error",
  "error_trigger": true,
  "error_occurred": true,
  "error_message": "Simulated processing error",
  "error_step": "SafeProcessor",
  "final_step": "reached"
}

8. Performance Monitoring Example

Let’s create a step that monitors performance metrics.

[8]:
import time
from typing import Dict, Any

class PerformanceMonitoringStep(BasePipelineStep):
    """A step that monitors performance metrics."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        start_time = time.time()

        # Simulate some processing work
        time.sleep(0.1)  # Simulate 100ms of work

        # Add some computational work
        if "items" in data:
            processed_items = len(data["items"])
        else:
            processed_items = 1

        end_time = time.time()
        processing_time = end_time - start_time

        # Add performance metrics
        if "performance_metrics" not in data:
            data["performance_metrics"] = []

        data["performance_metrics"].append({
            "step_name": self.name,
            "processing_time_seconds": round(processing_time, 4),
            "items_processed": processed_items,
            "items_per_second": round(processed_items / processing_time, 2) if processing_time > 0 else 0
        })

        return data

# Create a performance monitoring pipeline
perf_pipeline = Pipeline(name="PerformanceMonitoring")

# Add multiple performance-monitored steps
perf_pipeline.add_step(PerformanceMonitoringStep("DataPreprocessing"))
perf_pipeline.add_step(PerformanceMonitoringStep("MainProcessing"))
perf_pipeline.add_step(PerformanceMonitoringStep("PostProcessing"))

# Add a summary step
class PerformanceSummaryStep(BasePipelineStep):
    """Summarize performance metrics."""

    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:
        if "performance_metrics" in data:
            metrics = data["performance_metrics"]
            total_time = sum(m["processing_time_seconds"] for m in metrics)
            total_items = sum(m["items_processed"] for m in metrics)

            data["performance_summary"] = {
                "total_processing_time_seconds": round(total_time, 4),
                "total_items_processed": total_items,
                "average_items_per_second": round(total_items / total_time, 2) if total_time > 0 else 0,
                "steps_executed": len(metrics)
            }

        return data

perf_pipeline.add_step(PerformanceSummaryStep("CreatePerformanceSummary"))

# Test the performance monitoring
perf_input = {
    "items": ["item1", "item2", "item3", "item4", "item5"],
    "operation": "performance_test"
}

print("Running performance monitoring pipeline...")
perf_result = perf_pipeline.process(perf_input)

print("\nPerformance monitoring result:")
print(json.dumps(perf_result, indent=2))
Running performance monitoring pipeline...

Performance monitoring result:
{
  "items": [
    "item1",
    "item2",
    "item3",
    "item4",
    "item5"
  ],
  "operation": "performance_test",
  "performance_metrics": [
    {
      "step_name": "DataPreprocessing",
      "processing_time_seconds": 0.1001,
      "items_processed": 5,
      "items_per_second": 49.95
    },
    {
      "step_name": "MainProcessing",
      "processing_time_seconds": 0.1001,
      "items_processed": 5,
      "items_per_second": 49.95
    },
    {
      "step_name": "PostProcessing",
      "processing_time_seconds": 0.1001,
      "items_processed": 5,
      "items_per_second": 49.96
    }
  ],
  "performance_summary": {
    "total_processing_time_seconds": 0.3003,
    "total_items_processed": 15,
    "average_items_per_second": 49.95,
    "steps_executed": 3
  }
}

9. Conclusion

This notebook has demonstrated the key features of the AI-Graph framework:

Key Features Demonstrated:

  1. Pipeline Creation: Building sequential processing workflows

  2. Custom Steps: Creating custom processing steps by extending BasePipelineStep

  3. Built-in Steps: Using AddKeyStep, DelKeyStep, and ForEachStep

  4. Chain of Responsibility: Steps automatically chain together

  5. ForEach Processing: Handling collections and fixed iterations

  6. Error Handling: Graceful error handling within pipeline steps

  7. Performance Monitoring: Tracking processing metrics

  8. Data Validation: Validating input data structure

Benefits of the AI-Graph Framework:

  • Modular Design: Easy to add, remove, or modify processing steps

  • Reusable Components: Steps can be reused across different pipelines

  • Error Resilience: Built-in error handling capabilities

  • Performance Monitoring: Easy to add performance tracking

  • Flexible Data Flow: Data is passed through the chain automatically

  • Extensible: Easy to create custom steps for specific needs

Use Cases:

  • Data Processing Pipelines: ETL operations, data transformation

  • AI/ML Workflows: Model training, inference, and post-processing

  • Content Processing: Text processing, image processing

  • Business Logic: Sequential business rule processing

  • API Request Processing: Multi-step API request handling

The AI-Graph framework provides a solid foundation for building scalable, maintainable AI workflows with clear separation of concerns and easy extensibility.