{ "cells": [ { "cell_type": "markdown", "id": "5de7e682", "metadata": {}, "source": [ "# Quick Start Guide\n", "\n", "This guide will help you get started with AI-Graph in just a few minutes. AI-Graph is a framework for building and managing AI workflows using a pipeline-based approach with the Chain of Responsibility pattern.\n", "\n", "## What You'll Learn\n", "\n", "- How to create custom pipeline steps\n", "- How to build and run pipelines\n", "- How to chain multiple steps together\n", "- How to work with collections using ForEach steps\n", "- Error handling best practices\n", "- Real-world examples" ] }, { "cell_type": "markdown", "id": "b89beb4f", "metadata": {}, "source": [ "## Setup and Imports\n", "\n", "First, let's import the necessary components from the AI-Graph framework:" ] }, { "cell_type": "code", "execution_count": 1, "id": "4eb48b7f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "AI-Graph framework imported successfully!\n" ] } ], "source": [ "# Import the core components from AI-Graph\n", "from ai_graph.pipeline.base import Pipeline\n", "from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep\n", "from ai_graph.step.foreach import ForEachStep\n", "\n", "# Additional imports for our examples\n", "import json\n", "import time\n", "import re\n", "from typing import Dict, Any\n", "\n", "print(\"AI-Graph framework imported successfully!\")" ] }, { "cell_type": "markdown", "id": "5ea4f1d0", "metadata": {}, "source": [ "## Your First Pipeline\n", "\n", "Let's create a simple pipeline step that processes numerical data. In AI-Graph, all steps work with dictionary data structures and inherit from `BasePipelineStep`:" ] }, { "cell_type": "code", "execution_count": 2, "id": "88578770", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Input: {'value': 10}\n", "Result: {'value': 10}\n" ] } ], "source": [ "class DoubleStep(BasePipelineStep):\n", " \"\"\"A step that doubles the input value.\"\"\"\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Double the 'value' key in the input data.\"\"\"\n", " if 'value' in data:\n", " data['value'] = data['value'] * 2\n", " return data\n", "\n", "# Create and run the pipeline\n", "pipeline = Pipeline(name=\"DoubleValuePipeline\")\n", "pipeline.add_step(DoubleStep())\n", "\n", "# Input data as a dictionary\n", "input_data = {\"value\": 5}\n", "result = pipeline.process(input_data)\n", "print(f\"Input: {input_data}\")\n", "print(f\"Result: {result}\")" ] }, { "cell_type": "markdown", "id": "a653d49a", "metadata": {}, "source": [ "## Multiple Steps in a Pipeline\n", "\n", "You can chain multiple steps together. Each step processes the data and passes it to the next step:" ] }, { "cell_type": "code", "execution_count": 3, "id": "231211a3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Input: {'value': 30}\n", "Result: {'value': 30}\n", "Calculation: (5 + 10) * 2 = 30\n" ] } ], "source": [ "class AddStep(BasePipelineStep):\n", " def __init__(self, add_value: int, name: str = None):\n", " super().__init__(name or f\"AddStep_{add_value}\")\n", " self.add_value = add_value\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'value' in data:\n", " data['value'] = data['value'] + self.add_value\n", " return data\n", "\n", "class MultiplyStep(BasePipelineStep):\n", " def __init__(self, multiply_value: int, name: str = None):\n", " super().__init__(name or f\"MultiplyStep_{multiply_value}\")\n", " self.multiply_value = multiply_value\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'value' in data:\n", " data['value'] = data['value'] * self.multiply_value\n", " return data\n", "\n", "# Create pipeline with multiple steps\n", "pipeline = Pipeline(name=\"MathPipeline\")\n", "pipeline.add_step(AddStep(10)) # Add 10\n", "pipeline.add_step(MultiplyStep(2)) # Multiply by 2\n", "\n", "input_data = {\"value\": 5}\n", "result = pipeline.process(input_data)\n", "print(f\"Input: {input_data}\")\n", "print(f\"Result: {result}\")\n", "print(f\"Calculation: (5 + 10) * 2 = {result['value']}\")" ] }, { "cell_type": "markdown", "id": "2aac0fbb", "metadata": {}, "source": [ "## Using Built-in Steps\n", "\n", "AI-Graph provides several built-in steps for common operations. Let's use `AddKeyStep` and `DelKeyStep`:" ] }, { "cell_type": "code", "execution_count": null, "id": "c18595a9", "metadata": {}, "outputs": [], "source": [ "# Create a pipeline that adds metadata and processes data\n", "pipeline = Pipeline(name=\"MetadataPipeline\")\n", "pipeline.add_step(AddKeyStep(\"timestamp\", \"2025-07-16T10:00:00Z\"))\n", "pipeline.add_step(AddKeyStep(\"processed_by\", \"AI-Graph\"))\n", "pipeline.add_step(DoubleStep()) # Our custom step from before\n", "pipeline.add_step(DelKeyStep(\"timestamp\")) # Remove timestamp after processing\n", "\n", "input_data = {\"value\": 42, \"user_id\": \"user123\"}\n", "result = pipeline.process(input_data)\n", "print(f\"Input: {input_data}\")\n", "print(f\"Result: {result}\")" ] }, { "cell_type": "markdown", "id": "7dc63b4c", "metadata": {}, "source": [ "## Working with Collections using ForEach\n", "\n", "Use ForEach steps to process collections of items. The ForEach step creates a sub-pipeline that processes each item:" ] }, { "cell_type": "code", "execution_count": 4, "id": "ee72cf73", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Processing ForEachStep: 100%|██████████| 5/5 [00:00<00:00, 48998.88item/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Input numbers: [1, 2, 3, 4, 5]\n", "Squared numbers: [1, 4, 9, 16, 25]\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "class SquareStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Square the current item in a ForEach iteration.\"\"\"\n", " if '_current_item' in data:\n", " # ForEach puts the current item in '_current_item'\n", " current_value = data['_current_item']\n", " data['_current_item'] = current_value ** 2\n", " return data\n", "\n", "# Create main pipeline with ForEach\n", "pipeline = Pipeline(name=\"SquareNumbersPipeline\")\n", "\n", "# Create ForEach step that will process each number in the 'numbers' key\n", "foreach_step = ForEachStep(\n", " items_key=\"numbers\",\n", " results_key=\"squared_numbers\"\n", ")\n", "foreach_step.add_sub_step(SquareStep())\n", "\n", "pipeline.add_step(foreach_step)\n", "\n", "input_data = {\"numbers\": [1, 2, 3, 4, 5]}\n", "result = pipeline.process(input_data)\n", "\n", "print(f\"Input numbers: {input_data['numbers']}\")\n", "print(f\"Squared numbers: {[item['_current_item'] for item in result['squared_numbers']]}\")" ] }, { "cell_type": "markdown", "id": "779ee9f4", "metadata": {}, "source": [ "## Error Handling\n", "\n", "AI-Graph provides built-in error handling. Let's create a step that might fail and see how to handle it:" ] }, { "cell_type": "code", "execution_count": 5, "id": "9c9af172", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Safe division - Input: {'value': 5.0}, Result: {'value': 5.0}\n", "Caught expected error: Cannot divide by zero\n" ] } ], "source": [ "class DivideStep(BasePipelineStep):\n", " def __init__(self, divisor: float, name: str = None):\n", " super().__init__(name or f\"DivideStep_{divisor}\")\n", " self.divisor = divisor\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if self.divisor == 0:\n", " raise ValueError(\"Cannot divide by zero\")\n", " \n", " if 'value' in data:\n", " data['value'] = data['value'] / self.divisor\n", " return data\n", "\n", "# Test with a safe divisor first\n", "pipeline = Pipeline(name=\"SafeDivisionPipeline\")\n", "pipeline.add_step(DivideStep(2))\n", "\n", "input_data = {\"value\": 10}\n", "result = pipeline.process(input_data)\n", "print(f\"Safe division - Input: {input_data}, Result: {result}\")\n", "\n", "# Now test with zero (this will raise an error)\n", "pipeline_with_error = Pipeline(name=\"ErrorPipeline\")\n", "pipeline_with_error.add_step(DivideStep(0))\n", "\n", "try:\n", " input_data = {\"value\": 10}\n", " result = pipeline_with_error.process(input_data)\n", "except ValueError as e:\n", " print(f\"Caught expected error: {e}\")" ] }, { "cell_type": "markdown", "id": "60a03405", "metadata": {}, "source": [ "## Real-World Example: Text Processing Pipeline\n", "\n", "Here's a more practical example for text processing that demonstrates multiple concepts:" ] }, { "cell_type": "code", "execution_count": 6, "id": "3f934c83", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Processing ForEachStep: 100%|██████████| 4/4 [00:00<00:00, 25930.78item/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Text Processing Results:\n", "========================================\n", "Original: 'Hello World!'\n", "Cleaned: 'hello world!'\n", "Words: 2\n", "Filtered: Yes\n", "----------------------------------------\n", "Original: 'This is a longer text with many words'\n", "Cleaned: 'this is a longer text with many words'\n", "Words: 8\n", "Filtered: No\n", "----------------------------------------\n", "Original: 'Short'\n", "Cleaned: 'short'\n", "Words: 1\n", "Filtered: Yes\n", "----------------------------------------\n", "Original: 'AI-Graph makes pipeline processing easy and efficient'\n", "Cleaned: 'ai-graph makes pipeline processing easy and efficient'\n", "Words: 7\n", "Filtered: No\n", "----------------------------------------\n", "\n", "Valid texts (>= 3 words): 2 out of 4\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "class CleanTextStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Clean text by removing extra whitespace and converting to lowercase.\"\"\"\n", " if '_current_item' in data:\n", " text = data['_current_item']\n", " # Remove extra whitespace and convert to lowercase\n", " cleaned = re.sub(r'\\s+', ' ', text.strip().lower())\n", " data['_current_item'] = cleaned\n", " return data\n", "\n", "class CountWordsStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Count words in the text.\"\"\"\n", " if '_current_item' in data:\n", " text = data['_current_item']\n", " word_count = len(text.split())\n", " data['word_count'] = word_count\n", " data['text'] = text # Keep the cleaned text\n", " return data\n", "\n", "class FilterLongTextsStep(BasePipelineStep):\n", " def __init__(self, min_words: int = 5, name: str = None):\n", " super().__init__(name or f\"FilterLongTexts_{min_words}\")\n", " self.min_words = min_words\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Filter out texts with fewer than min_words.\"\"\"\n", " if 'word_count' in data and data['word_count'] < self.min_words:\n", " data['filtered_out'] = True\n", " else:\n", " data['filtered_out'] = False\n", " return data\n", "\n", "# Create the text processing sub-pipeline\n", "foreach_step = ForEachStep(\n", " items_key=\"texts\",\n", " results_key=\"processed_texts\"\n", ")\n", "foreach_step.add_sub_step(CleanTextStep())\n", "foreach_step.add_sub_step(CountWordsStep())\n", "foreach_step.add_sub_step(FilterLongTextsStep(min_words=3))\n", "\n", "# Create main pipeline\n", "main_pipeline = Pipeline(name=\"TextProcessingPipeline\")\n", "main_pipeline.add_step(foreach_step)\n", "\n", "# Test data\n", "input_data = {\n", " \"texts\": [\n", " \"Hello World!\",\n", " \"This is a longer text with many words\",\n", " \"Short\",\n", " \"AI-Graph makes pipeline processing easy and efficient\"\n", " ]\n", "}\n", "\n", "result = main_pipeline.process(input_data)\n", "\n", "print(\"Text Processing Results:\")\n", "print(\"=\" * 40)\n", "for i, processed_item in enumerate(result['processed_texts']):\n", " original_text = input_data['texts'][i]\n", " cleaned_text = processed_item.get('text', 'N/A')\n", " word_count = processed_item.get('word_count', 0)\n", " filtered = processed_item.get('filtered_out', False)\n", " \n", " print(f\"Original: '{original_text}'\")\n", " print(f\"Cleaned: '{cleaned_text}'\")\n", " print(f\"Words: {word_count}\")\n", " print(f\"Filtered: {'Yes' if filtered else 'No'}\")\n", " print(\"-\" * 40)\n", "\n", "# Extract only non-filtered results\n", "valid_results = [\n", " item for item in result['processed_texts'] \n", " if not item.get('filtered_out', False)\n", "]\n", "print(f\"\\nValid texts (>= 3 words): {len(valid_results)} out of {len(input_data['texts'])}\")" ] }, { "cell_type": "markdown", "id": "2a11f3ad", "metadata": {}, "source": [ "## Progress Tracking with ForEach\n", "\n", "The ForEach step automatically shows progress bars for long-running operations. Let's see this in action:" ] }, { "cell_type": "code", "execution_count": 8, "id": "8fbb7ef1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Processing numbers with progress tracking:\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Processing ForEachStep: 100%|██████████| 10/10 [00:01<00:00, 9.91item/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "Original: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n", "Doubled: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "class SlowProcessingStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Simulate slow processing.\"\"\"\n", " time.sleep(0.1) # Simulate slow processing\n", " if '_current_item' in data:\n", " data['_current_item'] = data['_current_item'] * 2\n", " return data\n", "\n", "# Create pipeline with progress tracking\n", "foreach_step = ForEachStep(\n", " items_key=\"numbers\",\n", " results_key=\"processed_numbers\"\n", ")\n", "foreach_step.add_sub_step(SlowProcessingStep())\n", "\n", "pipeline = Pipeline(name=\"SlowProcessingPipeline\")\n", "pipeline.add_step(foreach_step)\n", "\n", "# Process a list of numbers with progress tracking\n", "input_data = {\"numbers\": list(range(10))}\n", "print(\"Processing numbers with progress tracking:\")\n", "result = pipeline.process(input_data)\n", "\n", "processed_numbers = [item['_current_item'] for item in result['processed_numbers']]\n", "print(f\"\\nOriginal: {input_data['numbers']}\")\n", "print(f\"Doubled: {processed_numbers}\")" ] }, { "cell_type": "markdown", "id": "dc3f6166", "metadata": {}, "source": [ "## Advanced Example: Data Validation Pipeline\n", "\n", "Let's create a more complex example that validates and processes user data:" ] }, { "cell_type": "code", "execution_count": 7, "id": "cb1cf0e4", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "User Validation Results:\n", "==================================================\n", "Name: John Doe\n", "Email: john@example.com (✓)\n", "Age: 25 (✓)\n", "Valid: ✓\n", "--------------------------------------------------\n", "Name: Jane Smith\n", "Email: invalid-email (✗)\n", "Age: 30 (✓)\n", "Valid: ✗\n", "Errors: Invalid email format\n", "--------------------------------------------------\n", "Name: Bob Johnson\n", "Email: bob@test.com (✓)\n", "Age: -5 (✗)\n", "Valid: ✗\n", "Errors: Invalid age\n", "--------------------------------------------------\n", "Name: Alice Brown\n", "Email: alice@company.org (✓)\n", "Age: 200 (✗)\n", "Valid: ✗\n", "Errors: Invalid age\n", "--------------------------------------------------\n" ] } ], "source": [ "class ValidateEmailStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Validate email format.\"\"\"\n", " if 'email' in data:\n", " email = data['email']\n", " # Simple email validation\n", " email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'\n", " data['email_valid'] = bool(re.match(email_pattern, email))\n", " return data\n", "\n", "class ValidateAgeStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Validate age is reasonable.\"\"\"\n", " if 'age' in data:\n", " age = data['age']\n", " data['age_valid'] = isinstance(age, int) and 0 <= age <= 150\n", " return data\n", "\n", "class SummarizeValidationStep(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Summarize validation results.\"\"\"\n", " email_valid = data.get('email_valid', False)\n", " age_valid = data.get('age_valid', False)\n", " \n", " data['is_valid'] = email_valid and age_valid\n", " data['validation_errors'] = []\n", " \n", " if not email_valid:\n", " data['validation_errors'].append('Invalid email format')\n", " if not age_valid:\n", " data['validation_errors'].append('Invalid age')\n", " \n", " return data\n", "\n", "# Create validation pipeline\n", "validation_pipeline = Pipeline(name=\"UserValidationPipeline\")\n", "validation_pipeline.add_step(ValidateEmailStep())\n", "validation_pipeline.add_step(ValidateAgeStep())\n", "validation_pipeline.add_step(SummarizeValidationStep())\n", "\n", "# Test with various user data\n", "test_users = [\n", " {\"name\": \"John Doe\", \"email\": \"john@example.com\", \"age\": 25},\n", " {\"name\": \"Jane Smith\", \"email\": \"invalid-email\", \"age\": 30},\n", " {\"name\": \"Bob Johnson\", \"email\": \"bob@test.com\", \"age\": -5},\n", " {\"name\": \"Alice Brown\", \"email\": \"alice@company.org\", \"age\": 200}\n", "]\n", "\n", "print(\"User Validation Results:\")\n", "print(\"=\" * 50)\n", "\n", "for user in test_users:\n", " result = validation_pipeline.process(user.copy())\n", " \n", " print(f\"Name: {result['name']}\")\n", " print(f\"Email: {result['email']} ({'✓' if result['email_valid'] else '✗'})\")\n", " print(f\"Age: {result['age']} ({'✓' if result['age_valid'] else '✗'})\")\n", " print(f\"Valid: {'✓' if result['is_valid'] else '✗'}\")\n", " \n", " if result['validation_errors']:\n", " print(f\"Errors: {', '.join(result['validation_errors'])}\")\n", " \n", " print(\"-\" * 50)" ] }, { "cell_type": "markdown", "id": "7d3f8a98", "metadata": {}, "source": [ "## Next Steps\n", "\n", "Now that you've learned the basics of AI-Graph, here are some next steps to explore:\n", "\n", "### Documentation\n", "- **Concepts**: Learn about the core concepts in detail\n", "- **API Reference**: Full API documentation for all classes and methods\n", "\n", "### Best Practices\n", "\n", "1. **Keep steps small and focused** - Each step should do one thing well\n", "2. **Use meaningful names** - Name your steps clearly to improve readability\n", "3. **Handle errors gracefully** - Always consider what might go wrong\n", "4. **Test your pipelines** - Write unit tests for your custom steps\n", "5. **Use type hints** - AI-Graph supports full type checking with mypy\n", "\n", "### Advanced Features\n", "\n", "- **Custom ForEach implementations** - Create specialized iteration patterns\n", "- **Pipeline composition** - Combine multiple pipelines\n", "- **Performance optimization** - Tips for handling large datasets\n", "- **Integration patterns** - How to integrate with other frameworks" ] }, { "cell_type": "markdown", "id": "e957b7dd", "metadata": {}, "source": [ "## Summary\n", "\n", "In this quick start guide, you learned:\n", "\n", "✅ How to create custom pipeline steps by extending `BasePipelineStep` \n", "✅ How to build pipelines and chain multiple steps together \n", "✅ How to use built-in steps like `AddKeyStep` and `DelKeyStep` \n", "✅ How to process collections with `ForEachStep` \n", "✅ Error handling patterns and best practices \n", "✅ Real-world examples for text processing and data validation \n", "\n", "The AI-Graph framework provides a powerful and flexible way to build processing pipelines that are easy to understand, test, and maintain. Happy coding! 🚀" ] } ], "metadata": { "kernelspec": { "display_name": "ai-graph", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.1" } }, "nbformat": 4, "nbformat_minor": 5 }