{ "cells": [ { "cell_type": "markdown", "id": "48aa125a", "metadata": {}, "source": [ "# AI-Graph Framework Example\n", "\n", "This notebook demonstrates how to use the AI-Graph framework to build and manage AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.\n", "\n", "## Overview\n", "\n", "The AI-Graph framework provides:\n", "- **Pipeline**: A container for managing sequential processing steps\n", "- **BasePipelineStep**: Abstract base class for creating custom processing steps\n", "- **Built-in Steps**: Ready-to-use steps like `AddKeyStep`, `DelKeyStep`, and `ForEachStep`\n", "- **Chain of Responsibility**: Steps can be chained together for complex workflows" ] }, { "cell_type": "markdown", "id": "3d34a121", "metadata": {}, "source": [ "## 1. Basic Setup and Imports\n", "\n", "First, let's import the necessary components from the AI-Graph framework." ] }, { "cell_type": "code", "execution_count": null, "id": "f929b202", "metadata": {}, "outputs": [], "source": [ "# Import the core components from AI-Graph\n", "from ai_graph.pipeline.base import Pipeline\n", "from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep\n", "from ai_graph.step.foreach import ForEachStep\n", "\n", "# Additional imports for our examples\n", "import json\n", "from typing import Dict, Any\n", "\n", "print(\"AI-Graph framework imported successfully!\")" ] }, { "cell_type": "markdown", "id": "9d9ee588", "metadata": {}, "source": [ "## 2. Creating a Custom Pipeline Step\n", "\n", "Let's create a custom step that processes text data. This demonstrates how to extend the `BasePipelineStep` class." ] }, { "cell_type": "code", "execution_count": null, "id": "f3771560", "metadata": {}, "outputs": [], "source": [ "class TextProcessingStep(BasePipelineStep):\n", " \"\"\"A custom step that processes text data.\"\"\"\n", " \n", " def __init__(self, operation: str = \"upper\", name: str = None):\n", " super().__init__(name or f\"TextProcessing_{operation}\")\n", " self.operation = operation\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Process text data based on the specified operation.\"\"\"\n", " if \"text\" in data:\n", " text = data[\"text\"]\n", " \n", " if self.operation == \"upper\":\n", " data[\"text\"] = text.upper()\n", " elif self.operation == \"lower\":\n", " data[\"text\"] = text.lower()\n", " elif self.operation == \"reverse\":\n", " data[\"text\"] = text[::-1]\n", " elif self.operation == \"word_count\":\n", " data[\"word_count\"] = len(text.split())\n", " \n", " # Add processing metadata\n", " if \"processing_history\" not in data:\n", " data[\"processing_history\"] = []\n", " data[\"processing_history\"].append(f\"Applied {self.operation} operation\")\n", " \n", " return data\n", "\n", "print(\"Custom TextProcessingStep created!\")" ] }, { "cell_type": "markdown", "id": "2a46bd64", "metadata": {}, "source": [ "## 3. Basic Pipeline Example\n", "\n", "Now let's create a simple pipeline that processes text data through multiple steps." ] }, { "cell_type": "code", "execution_count": null, "id": "da9d87ac", "metadata": {}, "outputs": [], "source": [ "# Create a pipeline with multiple steps\n", "pipeline = Pipeline(name=\"TextProcessingPipeline\")\n", "\n", "# Add steps to the pipeline\n", "pipeline.add_step(AddKeyStep(\"timestamp\", \"2025-07-09\", \"AddTimestamp\"))\n", "pipeline.add_step(TextProcessingStep(\"word_count\", \"CountWords\"))\n", "pipeline.add_step(TextProcessingStep(\"upper\", \"ToUpperCase\"))\n", "pipeline.add_step(AddKeyStep(\"processed_by\", \"AI-Graph Framework\", \"AddProcessedBy\"))\n", "\n", "# Test data\n", "input_data = {\n", " \"text\": \"Hello, World! This is a test of the AI-Graph framework.\",\n", " \"source\": \"example_notebook\"\n", "}\n", "\n", "print(\"Input data:\")\n", "print(json.dumps(input_data, indent=2))\n", "\n", "# Process the data through the pipeline\n", "result = pipeline.process(input_data)\n", "\n", "print(\"\\nProcessed result:\")\n", "print(json.dumps(result, indent=2))" ] }, { "cell_type": "markdown", "id": "fbb2fa7c", "metadata": {}, "source": [ "## 4. Using ForEach Step\n", "\n", "The `ForEachStep` allows you to process collections of items or run a fixed number of iterations. Let's demonstrate both approaches." ] }, { "cell_type": "code", "execution_count": null, "id": "2e5af005", "metadata": {}, "outputs": [], "source": [ "# Example 1: Processing a list of items\n", "foreach_pipeline = Pipeline(name=\"ForEachTextProcessing\")\n", "\n", "# Create a ForEach step that processes each text item\n", "foreach_step = ForEachStep(\n", " items_key=\"texts\",\n", " results_key=\"processed_texts\",\n", " name=\"ProcessEachText\"\n", ")\n", "\n", "# Add sub-steps to the ForEach step\n", "foreach_step.add_sub_step(TextProcessingStep(\"lower\", \"ToLowerCase\"))\n", "foreach_step.add_sub_step(TextProcessingStep(\"reverse\", \"ReverseText\"))\n", "foreach_step.add_sub_step(AddKeyStep(\"processed\", True, \"MarkAsProcessed\"))\n", "\n", "# Add the ForEach step to the pipeline\n", "foreach_pipeline.add_step(foreach_step)\n", "\n", "# Test data with multiple text items\n", "foreach_input = {\n", " \"texts\": [\n", " \"Hello World\",\n", " \"AI-Graph Framework\",\n", " \"Pipeline Processing\",\n", " \"Chain of Responsibility\"\n", " ],\n", " \"batch_id\": \"example_batch_001\"\n", "}\n", "\n", "print(\"ForEach input data:\")\n", "print(json.dumps(foreach_input, indent=2))\n", "\n", "# Process the data\n", "foreach_result = foreach_pipeline.process(foreach_input)\n", "\n", "print(\"\\nForEach processing result:\")\n", "print(json.dumps(foreach_result, indent=2))" ] }, { "cell_type": "markdown", "id": "3f583490", "metadata": {}, "source": [ "## 5. Fixed Iterations Example\n", "\n", "Now let's demonstrate using `ForEachStep` with a fixed number of iterations instead of processing a collection." ] }, { "cell_type": "code", "execution_count": null, "id": "a95b24b8", "metadata": {}, "outputs": [], "source": [ "class CounterStep(BasePipelineStep):\n", " \"\"\"A step that increments a counter.\"\"\"\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " current_count = data.get(\"counter\", 0)\n", " data[\"counter\"] = current_count + 1\n", " data[\"iteration_info\"] = f\"Iteration {data['counter']} completed\"\n", " return data\n", "\n", "# Create a pipeline with fixed iterations\n", "iteration_pipeline = Pipeline(name=\"IterationExample\")\n", "\n", "# Create a ForEach step with fixed iterations\n", "iteration_step = ForEachStep(\n", " iterations=5, # Run 5 iterations\n", " results_key=\"iteration_results\",\n", " name=\"RunIterations\"\n", ")\n", "\n", "# Add sub-steps\n", "iteration_step.add_sub_step(CounterStep(\"IncrementCounter\"))\n", "iteration_step.add_sub_step(AddKeyStep(\"status\", \"completed\", \"MarkCompleted\"))\n", "\n", "# Add to pipeline\n", "iteration_pipeline.add_step(iteration_step)\n", "\n", "# Test data\n", "iteration_input = {\n", " \"counter\": 0,\n", " \"experiment_name\": \"counter_test\"\n", "}\n", "\n", "print(\"Iteration input data:\")\n", "print(json.dumps(iteration_input, indent=2))\n", "\n", "# Process the data\n", "iteration_result = iteration_pipeline.process(iteration_input)\n", "\n", "print(\"\\nIteration processing result:\")\n", "print(json.dumps(iteration_result, indent=2))" ] }, { "cell_type": "markdown", "id": "5af9d321", "metadata": {}, "source": [ "## 6. Complex Pipeline Example\n", "\n", "Let's create a more complex pipeline that demonstrates multiple features working together." ] }, { "cell_type": "code", "execution_count": null, "id": "eaad4c47", "metadata": {}, "outputs": [], "source": [ "class DataValidationStep(BasePipelineStep):\n", " \"\"\"A step that validates required fields in the data.\"\"\"\n", " \n", " def __init__(self, required_fields: list, name: str = None):\n", " super().__init__(name or \"DataValidation\")\n", " self.required_fields = required_fields\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " missing_fields = [field for field in self.required_fields if field not in data]\n", " \n", " data[\"validation_result\"] = {\n", " \"is_valid\": len(missing_fields) == 0,\n", " \"missing_fields\": missing_fields,\n", " \"required_fields\": self.required_fields\n", " }\n", " \n", " if missing_fields:\n", " data[\"validation_errors\"] = f\"Missing required fields: {', '.join(missing_fields)}\"\n", " \n", " return data\n", "\n", "class SummaryStep(BasePipelineStep):\n", " \"\"\"A step that creates a summary of the processed data.\"\"\"\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " summary = {\n", " \"total_keys\": len(data.keys()),\n", " \"has_processing_history\": \"processing_history\" in data,\n", " \"processing_steps\": len(data.get(\"processing_history\", [])),\n", " \"validation_passed\": data.get(\"validation_result\", {}).get(\"is_valid\", False)\n", " }\n", " \n", " data[\"summary\"] = summary\n", " return data\n", "\n", "# Create a complex pipeline\n", "complex_pipeline = Pipeline(name=\"ComplexDataProcessing\")\n", "\n", "# Step 1: Validate input data\n", "complex_pipeline.add_step(DataValidationStep([\"name\", \"data\"], \"ValidateInput\"))\n", "\n", "# Step 2: Add metadata\n", "complex_pipeline.add_step(AddKeyStep(\"pipeline_version\", \"1.0\", \"AddVersion\"))\n", "complex_pipeline.add_step(AddKeyStep(\"processing_date\", \"2025-07-09\", \"AddDate\"))\n", "\n", "# Step 3: Process data items if they exist\n", "data_foreach = ForEachStep(\n", " items_key=\"data\",\n", " results_key=\"processed_data\",\n", " name=\"ProcessDataItems\"\n", ")\n", "data_foreach.add_sub_step(TextProcessingStep(\"upper\", \"ToUpper\"))\n", "data_foreach.add_sub_step(TextProcessingStep(\"word_count\", \"CountWords\"))\n", "complex_pipeline.add_step(data_foreach)\n", "\n", "# Step 4: Create summary\n", "complex_pipeline.add_step(SummaryStep(\"CreateSummary\"))\n", "\n", "# Step 5: Clean up temporary data\n", "complex_pipeline.add_step(DelKeyStep(\"validation_result\", \"CleanupValidation\"))\n", "\n", "# Test the complex pipeline\n", "complex_input = {\n", " \"name\": \"ComplexDataProcessingExample\",\n", " \"data\": [\n", " \"artificial intelligence\",\n", " \"machine learning pipeline\",\n", " \"data processing framework\",\n", " \"chain of responsibility pattern\"\n", " ],\n", " \"metadata\": {\n", " \"source\": \"example_notebook\",\n", " \"type\": \"demo\"\n", " }\n", "}\n", "\n", "print(\"Complex pipeline input:\")\n", "print(json.dumps(complex_input, indent=2))\n", "\n", "# Process the data\n", "complex_result = complex_pipeline.process(complex_input)\n", "\n", "print(\"\\nComplex pipeline result:\")\n", "print(json.dumps(complex_result, indent=2))" ] }, { "cell_type": "markdown", "id": "f1471c66", "metadata": {}, "source": [ "## 7. Error Handling Example\n", "\n", "Let's demonstrate how to handle errors in the pipeline." ] }, { "cell_type": "code", "execution_count": null, "id": "ed70dd9d", "metadata": {}, "outputs": [], "source": [ "class SafeProcessingStep(BasePipelineStep):\n", " \"\"\"A step that demonstrates error handling.\"\"\"\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " try:\n", " # Simulate some processing that might fail\n", " if \"error_trigger\" in data:\n", " raise ValueError(\"Simulated processing error\")\n", " \n", " # Normal processing\n", " data[\"safe_processing_completed\"] = True\n", " \n", " except Exception as e:\n", " # Handle the error gracefully\n", " data[\"error_occurred\"] = True\n", " data[\"error_message\"] = str(e)\n", " data[\"error_step\"] = self.name\n", " \n", " return data\n", "\n", "# Create a pipeline with error handling\n", "error_pipeline = Pipeline(name=\"ErrorHandlingExample\")\n", "error_pipeline.add_step(SafeProcessingStep(\"SafeProcessor\"))\n", "error_pipeline.add_step(AddKeyStep(\"final_step\", \"reached\", \"FinalStep\"))\n", "\n", "# Test with normal data\n", "normal_data = {\"message\": \"This should work fine\"}\n", "print(\"Testing with normal data:\")\n", "normal_result = error_pipeline.process(normal_data)\n", "print(json.dumps(normal_result, indent=2))\n", "\n", "print(\"\\n\" + \"=\"*50 + \"\\n\")\n", "\n", "# Test with error-triggering data\n", "error_data = {\"message\": \"This will trigger an error\", \"error_trigger\": True}\n", "print(\"Testing with error-triggering data:\")\n", "error_result = error_pipeline.process(error_data)\n", "print(json.dumps(error_result, indent=2))" ] }, { "cell_type": "markdown", "id": "aba4c78a", "metadata": {}, "source": [ "## 8. Performance Monitoring Example\n", "\n", "Let's create a step that monitors performance metrics." ] }, { "cell_type": "code", "execution_count": null, "id": "09dea0e4", "metadata": {}, "outputs": [], "source": [ "import time\n", "from typing import Dict, Any\n", "\n", "class PerformanceMonitoringStep(BasePipelineStep):\n", " \"\"\"A step that monitors performance metrics.\"\"\"\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " start_time = time.time()\n", " \n", " # Simulate some processing work\n", " time.sleep(0.1) # Simulate 100ms of work\n", " \n", " # Add some computational work\n", " if \"items\" in data:\n", " processed_items = len(data[\"items\"])\n", " else:\n", " processed_items = 1\n", " \n", " end_time = time.time()\n", " processing_time = end_time - start_time\n", " \n", " # Add performance metrics\n", " if \"performance_metrics\" not in data:\n", " data[\"performance_metrics\"] = []\n", " \n", " data[\"performance_metrics\"].append({\n", " \"step_name\": self.name,\n", " \"processing_time_seconds\": round(processing_time, 4),\n", " \"items_processed\": processed_items,\n", " \"items_per_second\": round(processed_items / processing_time, 2) if processing_time > 0 else 0\n", " })\n", " \n", " return data\n", "\n", "# Create a performance monitoring pipeline\n", "perf_pipeline = Pipeline(name=\"PerformanceMonitoring\")\n", "\n", "# Add multiple performance-monitored steps\n", "perf_pipeline.add_step(PerformanceMonitoringStep(\"DataPreprocessing\"))\n", "perf_pipeline.add_step(PerformanceMonitoringStep(\"MainProcessing\"))\n", "perf_pipeline.add_step(PerformanceMonitoringStep(\"PostProcessing\"))\n", "\n", "# Add a summary step\n", "class PerformanceSummaryStep(BasePipelineStep):\n", " \"\"\"Summarize performance metrics.\"\"\"\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if \"performance_metrics\" in data:\n", " metrics = data[\"performance_metrics\"]\n", " total_time = sum(m[\"processing_time_seconds\"] for m in metrics)\n", " total_items = sum(m[\"items_processed\"] for m in metrics)\n", " \n", " data[\"performance_summary\"] = {\n", " \"total_processing_time_seconds\": round(total_time, 4),\n", " \"total_items_processed\": total_items,\n", " \"average_items_per_second\": round(total_items / total_time, 2) if total_time > 0 else 0,\n", " \"steps_executed\": len(metrics)\n", " }\n", " \n", " return data\n", "\n", "perf_pipeline.add_step(PerformanceSummaryStep(\"CreatePerformanceSummary\"))\n", "\n", "# Test the performance monitoring\n", "perf_input = {\n", " \"items\": [\"item1\", \"item2\", \"item3\", \"item4\", \"item5\"],\n", " \"operation\": \"performance_test\"\n", "}\n", "\n", "print(\"Running performance monitoring pipeline...\")\n", "perf_result = perf_pipeline.process(perf_input)\n", "\n", "print(\"\\nPerformance monitoring result:\")\n", "print(json.dumps(perf_result, indent=2))" ] }, { "cell_type": "markdown", "id": "2a565862", "metadata": {}, "source": [ "## 9. Conclusion\n", "\n", "This notebook has demonstrated the key features of the AI-Graph framework:\n", "\n", "### Key Features Demonstrated:\n", "\n", "1. **Pipeline Creation**: Building sequential processing workflows\n", "2. **Custom Steps**: Creating custom processing steps by extending `BasePipelineStep`\n", "3. **Built-in Steps**: Using `AddKeyStep`, `DelKeyStep`, and `ForEachStep`\n", "4. **Chain of Responsibility**: Steps automatically chain together\n", "5. **ForEach Processing**: Handling collections and fixed iterations\n", "6. **Error Handling**: Graceful error handling within pipeline steps\n", "7. **Performance Monitoring**: Tracking processing metrics\n", "8. **Data Validation**: Validating input data structure\n", "\n", "### Benefits of the AI-Graph Framework:\n", "\n", "- **Modular Design**: Easy to add, remove, or modify processing steps\n", "- **Reusable Components**: Steps can be reused across different pipelines\n", "- **Error Resilience**: Built-in error handling capabilities\n", "- **Performance Monitoring**: Easy to add performance tracking\n", "- **Flexible Data Flow**: Data is passed through the chain automatically\n", "- **Extensible**: Easy to create custom steps for specific needs\n", "\n", "### Use Cases:\n", "\n", "- **Data Processing Pipelines**: ETL operations, data transformation\n", "- **AI/ML Workflows**: Model training, inference, and post-processing\n", "- **Content Processing**: Text processing, image processing\n", "- **Business Logic**: Sequential business rule processing\n", "- **API Request Processing**: Multi-step API request handling\n", "\n", "The AI-Graph framework provides a solid foundation for building scalable, maintainable AI workflows with clear separation of concerns and easy extensibility." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.1" } }, "nbformat": 4, "nbformat_minor": 5 }