{ "cells": [ { "cell_type": "markdown", "id": "acde1a86", "metadata": {}, "source": [ "# Core Concepts\n", "\n", "Understanding the fundamental concepts of AI-Graph will help you build more effective processing pipelines. This interactive guide will walk you through each concept with practical examples you can run and experiment with.\n", "\n", "## What You'll Learn\n", "\n", "- **Chain of Responsibility Pattern** - How AI-Graph orchestrates data flow\n", "- **Steps** - The building blocks of processing pipelines\n", "- **Pipelines** - How to orchestrate multiple steps\n", "- **ForEach Processing** - Working with collections and iterations\n", "- **Data Flow Patterns** - Common patterns for data transformation\n", "- **Best Practices** - Guidelines for effective pipeline design" ] }, { "cell_type": "markdown", "id": "d0e93310", "metadata": {}, "source": [ "## Setup and Imports\n", "\n", "Let's start by importing the necessary components:" ] }, { "cell_type": "code", "execution_count": 1, "id": "07a6cf58", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "โœ… AI-Graph framework loaded successfully!\n", "๐Ÿ“š Ready to explore core concepts...\n" ] } ], "source": [ "# Core AI-Graph imports\n", "from ai_graph.pipeline.base import Pipeline\n", "from ai_graph.step.base import BasePipelineStep, AddKeyStep, DelKeyStep\n", "from ai_graph.step.foreach import ForEachStep\n", "\n", "# Additional imports for examples\n", "import json\n", "import time\n", "import re\n", "from datetime import datetime\n", "from typing import Dict, Any, List, Optional\n", "\n", "print(\"โœ… AI-Graph framework loaded successfully!\")\n", "print(\"๐Ÿ“š Ready to explore core concepts...\")" ] }, { "cell_type": "markdown", "id": "f51a3f66", "metadata": {}, "source": [ "## The Chain of Responsibility Pattern\n", "\n", "AI-Graph is built on the Chain of Responsibility design pattern, where:\n", "\n", "- Each **Step** is a handler that processes data\n", "- **Pipelines** chain these handlers together\n", "- Data flows through the chain sequentially\n", "- Each step can transform, filter, or enrich the data\n", "\n", "```\n", "Input โ†’ Step 1 โ†’ Step 2 โ†’ Step 3 โ†’ Output\n", "```\n", "\n", "Let's see this in action:" ] }, { "cell_type": "code", "execution_count": 2, "id": "d454deb4", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐Ÿš€ Starting chain processing...\n", "\n", "๐Ÿ”— Step 1: Processing {'value': 10, 'user_id': 'demo_user'}\n", "๐Ÿ”— Step 2: Processing {'value': 10, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-07-16T09:37:58.050940'}\n", "๐Ÿ”— Step 3: Processing {'value': 20, 'user_id': 'demo_user', 'processed_by_step1': True, 'step1_timestamp': '2025-07-16T09:37:58.050940', 'transformed_by_step2': True}\n", "\n", "๐Ÿ“Š Final Result:\n", " value: 20\n", " user_id: demo_user\n", " processed_by_step1: True\n", " step1_timestamp: 2025-07-16T09:37:58.050940\n", " transformed_by_step2: True\n", " final_result: Processed value: 20\n", " completed: True\n" ] } ], "source": [ "# Demonstration of the Chain of Responsibility pattern\n", "\n", "class Step1(BasePipelineStep):\n", " \"\"\"First step: Add metadata\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " print(f\"๐Ÿ”— Step 1: Processing {data}\")\n", " data['processed_by_step1'] = True\n", " data['step1_timestamp'] = datetime.now().isoformat()\n", " return data\n", "\n", "class Step2(BasePipelineStep):\n", " \"\"\"Second step: Transform data\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " print(f\"๐Ÿ”— Step 2: Processing {data}\")\n", " if 'value' in data:\n", " data['value'] = data['value'] * 2\n", " data['transformed_by_step2'] = True\n", " return data\n", "\n", "class Step3(BasePipelineStep):\n", " \"\"\"Third step: Add final metadata\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " print(f\"๐Ÿ”— Step 3: Processing {data}\")\n", " data['final_result'] = f\"Processed value: {data.get('value', 'N/A')}\"\n", " data['completed'] = True\n", " return data\n", "\n", "# Create the pipeline and chain the steps\n", "chain_demo = Pipeline(name=\"ChainOfResponsibilityDemo\")\n", "chain_demo.add_step(Step1())\n", "chain_demo.add_step(Step2())\n", "chain_demo.add_step(Step3())\n", "\n", "# Test the chain\n", "input_data = {\"value\": 10, \"user_id\": \"demo_user\"}\n", "print(\"๐Ÿš€ Starting chain processing...\\n\")\n", "\n", "result = chain_demo.process(input_data)\n", "\n", "print(\"\\n๐Ÿ“Š Final Result:\")\n", "for key, value in result.items():\n", " print(f\" {key}: {value}\")" ] }, { "cell_type": "markdown", "id": "64f41e5b", "metadata": {}, "source": [ "## Steps: The Building Blocks\n", "\n", "Steps are the fundamental processing units in AI-Graph. Every step inherits from `BasePipelineStep` and implements the `_process_step` method.\n", "\n", "### Step Lifecycle\n", "\n", "Each step goes through this lifecycle:\n", "\n", "1. **Initialization** - Create the step instance\n", "2. **Processing** - Execute the `_process_step` method\n", "3. **Result** - Return the processed data\n", "\n", "Let's explore different types of steps:" ] }, { "cell_type": "code", "execution_count": null, "id": "b271ef04", "metadata": {}, "outputs": [], "source": [ "# Example 1: Logging Step\n", "class LoggingStep(BasePipelineStep):\n", " def __init__(self, message: str, name: str = None):\n", " super().__init__(name or f\"LoggingStep_{message}\")\n", " self.message = message\n", "\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " print(f\"๐Ÿ“ {self.message}: {data}\")\n", " return data # Pass data through unchanged\n", "\n", "# Test the logging step\n", "logging_pipeline = Pipeline(name=\"LoggingDemo\")\n", "logging_pipeline.add_step(LoggingStep(\"Before processing\"))\n", "logging_pipeline.add_step(AddKeyStep(\"processed\", True))\n", "logging_pipeline.add_step(LoggingStep(\"After processing\"))\n", "\n", "test_data = {\"id\": 123, \"name\": \"test\"}\n", "result = logging_pipeline.process(test_data)\n", "print(f\"\\nโœ… Final result: {result}\")" ] }, { "cell_type": "markdown", "id": "e9eb2762", "metadata": {}, "source": [ "## Data Transformation Patterns\n", "\n", "Steps can transform data in several ways. Let's implement and test each pattern:" ] }, { "cell_type": "code", "execution_count": 3, "id": "f78313d2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐Ÿ”„ Transform result: {'text': 'HELLO WORLD', 'id': 1, 'transformed': True}\n" ] } ], "source": [ "# Pattern 1: Transform - Change the data structure or values\n", "class UppercaseStep(BasePipelineStep):\n", " \"\"\"Transforms text to uppercase\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " data['text'] = data['text'].upper()\n", " data['transformed'] = True\n", " return data\n", "\n", "# Test transform pattern\n", "transform_pipeline = Pipeline(name=\"TransformDemo\")\n", "transform_pipeline.add_step(UppercaseStep())\n", "\n", "test_data = {\"text\": \"hello world\", \"id\": 1}\n", "result = transform_pipeline.process(test_data)\n", "print(f\"๐Ÿ”„ Transform result: {result}\")" ] }, { "cell_type": "code", "execution_count": 4, "id": "391fbf99", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐Ÿ” Filter Pattern Results:\n", " positive: โœ… Passed - {'number': 5, 'id': 'positive', 'filtered_out': False}\n", " negative: โŒ Filtered - {'number': -3, 'id': 'negative', 'filtered_out': True, 'reason': 'Number is not positive'}\n", " zero: โŒ Filtered - {'number': 0, 'id': 'zero', 'filtered_out': True, 'reason': 'Number is not positive'}\n" ] } ], "source": [ "# Pattern 2: Filter - Remove or conditionally pass data\n", "class FilterPositiveStep(BasePipelineStep):\n", " \"\"\"Only allows positive numbers to pass through\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'number' in data:\n", " if data['number'] <= 0:\n", " data['filtered_out'] = True\n", " data['reason'] = \"Number is not positive\"\n", " else:\n", " data['filtered_out'] = False\n", " return data\n", "\n", "# Test filter pattern with positive and negative numbers\n", "filter_pipeline = Pipeline(name=\"FilterDemo\")\n", "filter_pipeline.add_step(FilterPositiveStep())\n", "\n", "test_cases = [\n", " {\"number\": 5, \"id\": \"positive\"},\n", " {\"number\": -3, \"id\": \"negative\"},\n", " {\"number\": 0, \"id\": \"zero\"}\n", "]\n", "\n", "print(\"๐Ÿ” Filter Pattern Results:\")\n", "for test_data in test_cases:\n", " result = filter_pipeline.process(test_data.copy())\n", " status = \"โŒ Filtered\" if result.get('filtered_out') else \"โœ… Passed\"\n", " print(f\" {test_data['id']}: {status} - {result}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "8c2beed9", "metadata": {}, "outputs": [], "source": [ "# Pattern 3: Enrich - Add additional information\n", "class AddTimestampStep(BasePipelineStep):\n", " \"\"\"Enriches data with timestamp information\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " now = datetime.now()\n", " data['enrichment'] = {\n", " 'timestamp': now.isoformat(),\n", " 'day_of_week': now.strftime('%A'),\n", " 'processing_time': now.strftime('%H:%M:%S')\n", " }\n", " return data\n", "\n", "# Test enrich pattern\n", "enrich_pipeline = Pipeline(name=\"EnrichDemo\")\n", "enrich_pipeline.add_step(AddTimestampStep())\n", "\n", "test_data = {\"user_id\": \"user123\", \"action\": \"login\"}\n", "result = enrich_pipeline.process(test_data)\n", "print(\"๐Ÿ“ˆ Enrich Pattern Result:\")\n", "for key, value in result.items():\n", " print(f\" {key}: {value}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "9dc90311", "metadata": {}, "outputs": [], "source": [ "# Pattern 4: Aggregate - Combine multiple pieces of data\n", "class SumStep(BasePipelineStep):\n", " \"\"\"Aggregates numeric data\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'numbers' in data and isinstance(data['numbers'], list):\n", " data['sum'] = sum(data['numbers'])\n", " data['count'] = len(data['numbers'])\n", " data['average'] = data['sum'] / data['count'] if data['count'] > 0 else 0\n", " data['aggregated'] = True\n", " return data\n", "\n", "# Test aggregate pattern\n", "aggregate_pipeline = Pipeline(name=\"AggregateDemo\")\n", "aggregate_pipeline.add_step(SumStep())\n", "\n", "test_data = {\"numbers\": [1, 2, 3, 4, 5], \"dataset\": \"sample\"}\n", "result = aggregate_pipeline.process(test_data)\n", "print(\"๐Ÿ“Š Aggregate Pattern Result:\")\n", "for key, value in result.items():\n", " print(f\" {key}: {value}\")" ] }, { "cell_type": "markdown", "id": "6934fb8c", "metadata": {}, "source": [ "## Pipelines: Orchestrating the Flow\n", "\n", "Pipelines coordinate the execution of multiple steps in sequence. Let's explore different pipeline patterns:" ] }, { "cell_type": "code", "execution_count": null, "id": "83143798", "metadata": {}, "outputs": [], "source": [ "# Pipeline Execution Model Demonstration\n", "class Step1Process(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " data['text'] = data['text'].upper()\n", " print(f\" Step 1: Uppercase โ†’ '{data['text']}'\")\n", " return data\n", "\n", "class Step2Process(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " data['text'] = f\"PREFIX: {data['text']}\"\n", " print(f\" Step 2: Add prefix โ†’ '{data['text']}'\")\n", " return data\n", "\n", "class Step3Process(BasePipelineStep):\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " char_count = len(data['text'])\n", " data['char_count'] = char_count\n", " print(f\" Step 3: Count chars โ†’ {char_count}\")\n", " return data\n", "\n", "# Create and test the pipeline\n", "execution_demo = Pipeline(name=\"ExecutionFlowDemo\")\n", "execution_demo.add_step(Step1Process())\n", "execution_demo.add_step(Step2Process())\n", "execution_demo.add_step(Step3Process())\n", "\n", "print(\"๐Ÿ”„ Pipeline Execution Flow:\")\n", "test_data = {\"text\": \"hello\", \"id\": \"demo\"}\n", "print(f\" Input: {test_data}\")\n", "\n", "result = execution_demo.process(test_data)\n", "print(f\" Final Result: {result}\")" ] }, { "cell_type": "markdown", "id": "567c8e55", "metadata": {}, "source": [ "### Error Handling in Pipelines\n", "\n", "Pipelines handle errors gracefully. Let's see how this works:" ] }, { "cell_type": "code", "execution_count": null, "id": "da08a121", "metadata": {}, "outputs": [], "source": [ "class RiskyStep(BasePipelineStep):\n", " \"\"\"A step that might fail under certain conditions\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'number' in data and data['number'] < 0:\n", " raise ValueError(f\"Negative values not allowed: {data['number']}\")\n", " \n", " if 'number' in data:\n", " data['number'] = data['number'] * 2\n", " data['processed_successfully'] = True\n", " return data\n", "\n", "class SafeStep(BasePipelineStep):\n", " \"\"\"A step that always succeeds\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " data['safe_step_completed'] = True\n", " return data\n", "\n", "# Create pipeline with risky step\n", "error_demo = Pipeline(name=\"ErrorHandlingDemo\")\n", "error_demo.add_step(SafeStep())\n", "error_demo.add_step(RiskyStep())\n", "error_demo.add_step(SafeStep())\n", "\n", "# Test with safe data\n", "print(\"โœ… Testing with safe data:\")\n", "safe_data = {\"number\": 5, \"id\": \"safe_test\"}\n", "try:\n", " result = error_demo.process(safe_data.copy())\n", " print(f\" Success: {result}\")\n", "except Exception as e:\n", " print(f\" Error: {e}\")\n", "\n", "# Test with risky data\n", "print(\"\\nโŒ Testing with risky data:\")\n", "risky_data = {\"number\": -5, \"id\": \"risky_test\"}\n", "try:\n", " result = error_demo.process(risky_data.copy())\n", " print(f\" Success: {result}\")\n", "except ValueError as e:\n", " print(f\" Caught ValueError: {e}\")\n", "except Exception as e:\n", " print(f\" Unexpected error: {e}\")" ] }, { "cell_type": "markdown", "id": "50728aa1", "metadata": {}, "source": [ "## ForEach: Processing Collections\n", "\n", "The `ForEachStep` enables processing collections of data by applying a sub-pipeline to each item. Let's explore different ForEach patterns:" ] }, { "cell_type": "code", "execution_count": 5, "id": "e65807a9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐Ÿ”„ ForEach Processing:\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Processing ForEachStep: 100%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 5/5 [00:00<00:00, 18808.54item/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ " Doubling: 1 โ†’ 2\n", " Doubling: 2 โ†’ 4\n", " Doubling: 3 โ†’ 6\n", " Doubling: 4 โ†’ 8\n", " Doubling: 5 โ†’ 10\n", "\n", "๐Ÿ“Š Results:\n", " Original: [1, 2, 3, 4, 5]\n", " Doubled: [2, 4, 6, 8, 10]\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "# Basic ForEach Usage\n", "class DoubleItemStep(BasePipelineStep):\n", " \"\"\"Doubles the current item in a ForEach iteration\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if '_current_item' in data:\n", " original = data['_current_item']\n", " data['_current_item'] = original * 2\n", " print(f\" Doubling: {original} โ†’ {data['_current_item']}\")\n", " return data\n", "\n", "# Create ForEach pipeline\n", "foreach_demo = Pipeline(name=\"ForEachDemo\")\n", "foreach_step = ForEachStep(\n", " items_key=\"numbers\",\n", " results_key=\"doubled_numbers\"\n", ")\n", "foreach_step.add_sub_step(DoubleItemStep())\n", "foreach_demo.add_step(foreach_step)\n", "\n", "# Test ForEach\n", "print(\"๐Ÿ”„ ForEach Processing:\")\n", "test_data = {\"numbers\": [1, 2, 3, 4, 5], \"operation\": \"double\"}\n", "result = foreach_demo.process(test_data)\n", "\n", "original_numbers = test_data['numbers']\n", "doubled_numbers = [item['_current_item'] for item in result['doubled_numbers']]\n", "print(f\"\\n๐Ÿ“Š Results:\")\n", "print(f\" Original: {original_numbers}\")\n", "print(f\" Doubled: {doubled_numbers}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "84d1fba3", "metadata": {}, "outputs": [], "source": [ "# ForEach with Fixed Iterations\n", "class CounterStep(BasePipelineStep):\n", " \"\"\"Adds iteration count to data\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " iteration = data.get('_iteration_index', 0)\n", " data['iteration_result'] = f\"Iteration {iteration + 1} completed\"\n", " print(f\" {data['iteration_result']}\")\n", " return data\n", "\n", "# Create fixed iteration pipeline\n", "iteration_demo = Pipeline(name=\"IterationDemo\")\n", "iteration_foreach = ForEachStep(\n", " iterations=5,\n", " results_key=\"iteration_results\"\n", ")\n", "iteration_foreach.add_sub_step(CounterStep())\n", "iteration_demo.add_step(iteration_foreach)\n", "\n", "print(\"๐Ÿ”ข Fixed Iterations Processing:\")\n", "test_data = {\"task\": \"repeated_operation\"}\n", "result = iteration_demo.process(test_data)\n", "\n", "print(f\"\\n๐Ÿ“Š Completed {len(result['iteration_results'])} iterations\")\n", "for i, iteration_result in enumerate(result['iteration_results']):\n", " print(f\" Result {i+1}: {iteration_result.get('iteration_result')}\")" ] }, { "cell_type": "markdown", "id": "70c37991", "metadata": {}, "source": [ "## Data Flow Patterns\n", "\n", "Understanding how data flows through your pipeline is crucial for design. Let's implement and test common data flow patterns:" ] }, { "cell_type": "code", "execution_count": null, "id": "df6e11db", "metadata": {}, "outputs": [], "source": [ "# Linear Flow Pattern\n", "# Input โ†’ Clean โ†’ Validate โ†’ Transform โ†’ Output\n", "\n", "class CleanDataStep(BasePipelineStep):\n", " \"\"\"Cleans input data\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " # Remove extra whitespace and convert to lowercase\n", " data['text'] = re.sub(r'\\s+', ' ', data['text'].strip().lower())\n", " data['cleaned'] = True\n", " print(f\" ๐Ÿงน Cleaned: '{data['text']}'\")\n", " return data\n", "\n", "class ValidateDataStep(BasePipelineStep):\n", " \"\"\"Validates data quality\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'text' in data:\n", " is_valid = len(data['text']) > 0 and len(data['text']) < 100\n", " data['is_valid'] = is_valid\n", " data['validation_passed'] = is_valid\n", " status = \"โœ… Valid\" if is_valid else \"โŒ Invalid\"\n", " print(f\" ๐Ÿ” Validation: {status}\")\n", " return data\n", "\n", "class TransformDataStep(BasePipelineStep):\n", " \"\"\"Transforms validated data\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if data.get('is_valid', False) and 'text' in data:\n", " # Add word count and create summary\n", " words = data['text'].split()\n", " data['word_count'] = len(words)\n", " data['summary'] = f\"Text with {len(words)} words: '{data['text'][:50]}...\"\n", " print(f\" ๐Ÿ”„ Transformed: {data['word_count']} words\")\n", " else:\n", " print(f\" โš ๏ธ Skipped transformation (invalid data)\")\n", " return data\n", "\n", "# Create linear flow pipeline\n", "linear_flow = Pipeline(name=\"LinearFlowDemo\")\n", "linear_flow.add_step(CleanDataStep())\n", "linear_flow.add_step(ValidateDataStep())\n", "linear_flow.add_step(TransformDataStep())\n", "\n", "# Test linear flow\n", "print(\"๐Ÿ“ˆ Linear Flow Pattern:\")\n", "test_cases = [\n", " {\"text\": \" Hello World! \", \"id\": \"valid_case\"},\n", " {\"text\": \"\", \"id\": \"empty_case\"},\n", " {\"text\": \"A\" * 150, \"id\": \"too_long_case\"}\n", "]\n", "\n", "for i, test_data in enumerate(test_cases, 1):\n", " print(f\"\\nTest {i} ({test_data['id']}):\")\n", " result = linear_flow.process(test_data.copy())\n", " print(f\" Result: Valid={result.get('is_valid')}, Words={result.get('word_count', 'N/A')}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "91416e69", "metadata": {}, "outputs": [], "source": [ "# Branching Flow Pattern\n", "# Use conditional steps to create branching logic\n", "\n", "class ConditionalStep(BasePipelineStep):\n", " \"\"\"Processes data differently based on conditions\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'number' in data:\n", " number = data['number']\n", " \n", " if number > 100:\n", " # Path A: Large numbers\n", " data['category'] = 'large'\n", " data['processed_value'] = number // 10 # Divide by 10\n", " data['processing_path'] = 'A'\n", " print(f\" ๐Ÿ”€ Path A (Large): {number} โ†’ {data['processed_value']}\")\n", " elif number > 0:\n", " # Path B: Small positive numbers\n", " data['category'] = 'small_positive'\n", " data['processed_value'] = number * 2 # Double\n", " data['processing_path'] = 'B'\n", " print(f\" ๐Ÿ”€ Path B (Small+): {number} โ†’ {data['processed_value']}\")\n", " else:\n", " # Path C: Zero or negative\n", " data['category'] = 'zero_or_negative'\n", " data['processed_value'] = 0 # Set to zero\n", " data['processing_path'] = 'C'\n", " print(f\" ๐Ÿ”€ Path C (Zero/Neg): {number} โ†’ {data['processed_value']}\")\n", " \n", " return data\n", "\n", "# Create branching flow pipeline\n", "branching_flow = Pipeline(name=\"BranchingFlowDemo\")\n", "branching_flow.add_step(ConditionalStep())\n", "\n", "# Test branching flow\n", "print(\"๐ŸŒณ Branching Flow Pattern:\")\n", "test_numbers = [150, 25, 0, -10, 1000, 5]\n", "\n", "for number in test_numbers:\n", " test_data = {\"number\": number, \"id\": f\"test_{number}\"}\n", " result = branching_flow.process(test_data)\n", " \n", "print(f\"\\n๐Ÿ“Š Summary of branching results:\")\n", "path_counts = {'A': 0, 'B': 0, 'C': 0}\n", "for number in test_numbers:\n", " test_data = {\"number\": number}\n", " result = branching_flow.process(test_data)\n", " path = result.get('processing_path', 'Unknown')\n", " if path in path_counts:\n", " path_counts[path] += 1\n", " \n", "for path, count in path_counts.items():\n", " print(f\" Path {path}: {count} items\")" ] }, { "cell_type": "markdown", "id": "5df6e7cc", "metadata": {}, "source": [ "## Complete Example: Email Processing Pipeline\n", "\n", "Let's put all concepts together in a comprehensive example that demonstrates real-world usage:" ] }, { "cell_type": "code", "execution_count": 6, "id": "0cb31668", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐Ÿ“ง Building Email Processing Pipeline...\n", "\n", "๐Ÿš€ Processing 9 emails...\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Processing ForEachStep: 100%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 9/9 [00:00<00:00, 11234.74item/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ " โœ… Valid: John.Doe@GMAIL.COM\n", " ๐Ÿ”„ Normalized: John.Doe@GMAIL.COM โ†’ john.doe@gmail.com\n", " ๐Ÿท๏ธ Domain: gmail.com (common)\n", " โŒ Invalid: invalid-email\n", " โœ… Valid: alice@example.org\n", " โžก๏ธ Already normalized: alice@example.org\n", " ๐Ÿท๏ธ Domain: example.org (other)\n", " โœ… Valid: bob@company.com\n", " โžก๏ธ Already normalized: bob@company.com\n", " ๐Ÿท๏ธ Domain: company.com (other)\n", " โŒ Invalid: JANE@yahoo.com \n", " โœ… Valid: test@outlook.com\n", " โžก๏ธ Already normalized: test@outlook.com\n", " ๐Ÿท๏ธ Domain: outlook.com (common)\n", " โŒ Invalid: @invalid.com\n", " โœ… Valid: user@hotmail.com\n", " โžก๏ธ Already normalized: user@hotmail.com\n", " ๐Ÿท๏ธ Domain: hotmail.com (common)\n", " โœ… Valid: another@custom-domain.net\n", " โžก๏ธ Already normalized: another@custom-domain.net\n", " ๐Ÿท๏ธ Domain: custom-domain.net (other)\n", "\n", "๐Ÿ“Š Email Processing Statistics:\n", " Total processed: 9\n", " Valid emails: 6 (66.7%)\n", " Invalid emails: 3\n", " Common domains: 3\n", " Other domains: 3\n", "\n", "๐Ÿ“‹ Valid Email Results:\n", " 1. john.doe@gmail.com โ†’ gmail.com (common)\n", " 2. alice@example.org โ†’ example.org (other)\n", " 3. bob@company.com โ†’ company.com (other)\n", " 4. test@outlook.com โ†’ outlook.com (common)\n", " 5. user@hotmail.com โ†’ hotmail.com (common)\n", " 6. another@custom-domain.net โ†’ custom-domain.net (other)\n", "\n", "๐ŸŽฏ Final Statistics:\n", " total_processed: 9\n", " valid_emails: 6\n", " invalid_emails: 3\n", " success_rate: 66.7%\n", " domain_distribution:\n", " gmail.com: 1\n", " example.org: 1\n", " company.com: 1\n", " outlook.com: 1\n", " hotmail.com: 1\n", " custom-domain.net: 1\n", " common_domains: 3\n", " other_domains: 3\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "# Complete Email Processing Pipeline Example\n", "\n", "class ValidateEmailStep(BasePipelineStep):\n", " \"\"\"Validates email format using regex\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if '_current_item' in data:\n", " email = data['_current_item']\n", " # Simple email validation\n", " email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$'\n", " is_valid = bool(re.match(email_pattern, email))\n", " \n", " if is_valid:\n", " data['email_valid'] = True\n", " print(f\" โœ… Valid: {email}\")\n", " else:\n", " data['email_valid'] = False\n", " data['_current_item'] = None # Filter out invalid emails\n", " print(f\" โŒ Invalid: {email}\")\n", " return data\n", "\n", "class NormalizeEmailStep(BasePipelineStep):\n", " \"\"\"Normalizes email to lowercase and trims whitespace\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if data.get('email_valid') and '_current_item' in data and data['_current_item']:\n", " original = data['_current_item']\n", " normalized = original.lower().strip()\n", " data['_current_item'] = normalized\n", " data['normalized'] = True\n", " if original != normalized:\n", " print(f\" ๐Ÿ”„ Normalized: {original} โ†’ {normalized}\")\n", " else:\n", " print(f\" โžก๏ธ Already normalized: {normalized}\")\n", " return data\n", "\n", "class ExtractDomainStep(BasePipelineStep):\n", " \"\"\"Extracts domain from email and adds metadata\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if data.get('email_valid') and '_current_item' in data and data['_current_item']:\n", " email = data['_current_item']\n", " domain = email.split('@')[1]\n", " data['domain'] = domain\n", " data['email'] = email\n", " \n", " # Add domain type classification\n", " common_domains = ['gmail.com', 'yahoo.com', 'outlook.com', 'hotmail.com']\n", " data['domain_type'] = 'common' if domain in common_domains else 'other'\n", " \n", " print(f\" ๐Ÿท๏ธ Domain: {domain} ({data['domain_type']})\")\n", " return data\n", "\n", "class EmailStatisticsStep(BasePipelineStep):\n", " \"\"\"Aggregates statistics about processed emails\"\"\"\n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " if 'processed_emails' in data:\n", " results = data['processed_emails']\n", " \n", " # Filter out invalid emails (None values)\n", " valid_results = [r for r in results if r.get('email_valid', False)]\n", " \n", " # Calculate statistics\n", " total_processed = len(results)\n", " total_valid = len(valid_results)\n", " total_invalid = total_processed - total_valid\n", " \n", " # Domain statistics\n", " domains = [r.get('domain') for r in valid_results if r.get('domain')]\n", " domain_counts = {}\n", " for domain in domains:\n", " domain_counts[domain] = domain_counts.get(domain, 0) + 1\n", " \n", " # Type statistics\n", " common_count = sum(1 for r in valid_results if r.get('domain_type') == 'common')\n", " other_count = total_valid - common_count\n", " \n", " data['statistics'] = {\n", " 'total_processed': total_processed,\n", " 'valid_emails': total_valid,\n", " 'invalid_emails': total_invalid,\n", " 'success_rate': f\"{(total_valid/total_processed*100):.1f}%\" if total_processed > 0 else \"0%\",\n", " 'domain_distribution': domain_counts,\n", " 'common_domains': common_count,\n", " 'other_domains': other_count\n", " }\n", " \n", " print(f\"\\n๐Ÿ“Š Email Processing Statistics:\")\n", " print(f\" Total processed: {total_processed}\")\n", " print(f\" Valid emails: {total_valid} ({data['statistics']['success_rate']})\")\n", " print(f\" Invalid emails: {total_invalid}\")\n", " print(f\" Common domains: {common_count}\")\n", " print(f\" Other domains: {other_count}\")\n", " \n", " return data\n", "\n", "# Build the complete email processing pipeline\n", "print(\"๐Ÿ“ง Building Email Processing Pipeline...\\n\")\n", "\n", "# Create the sub-pipeline for individual email processing\n", "email_foreach = ForEachStep(\n", " items_key=\"emails\",\n", " results_key=\"processed_emails\"\n", ")\n", "email_foreach.add_sub_step(ValidateEmailStep())\n", "email_foreach.add_sub_step(NormalizeEmailStep())\n", "email_foreach.add_sub_step(ExtractDomainStep())\n", "\n", "# Create the main pipeline\n", "email_pipeline = Pipeline(name=\"EmailProcessingPipeline\")\n", "email_pipeline.add_step(email_foreach)\n", "email_pipeline.add_step(EmailStatisticsStep())\n", "\n", "# Test with sample emails\n", "test_emails = {\n", " \"emails\": [\n", " \"John.Doe@GMAIL.COM\",\n", " \"invalid-email\",\n", " \"alice@example.org\",\n", " \"bob@company.com\",\n", " \" JANE@yahoo.com \",\n", " \"test@outlook.com\",\n", " \"@invalid.com\",\n", " \"user@hotmail.com\",\n", " \"another@custom-domain.net\"\n", " ],\n", " \"batch_id\": \"demo_batch_001\"\n", "}\n", "\n", "print(f\"๐Ÿš€ Processing {len(test_emails['emails'])} emails...\\n\")\n", "result = email_pipeline.process(test_emails)\n", "\n", "# Display results\n", "print(\"\\n๐Ÿ“‹ Valid Email Results:\")\n", "valid_emails = [r for r in result['processed_emails'] if r.get('email_valid', False)]\n", "for i, email_result in enumerate(valid_emails, 1):\n", " email = email_result.get('email', 'N/A')\n", " domain = email_result.get('domain', 'N/A')\n", " domain_type = email_result.get('domain_type', 'N/A')\n", " print(f\" {i}. {email} โ†’ {domain} ({domain_type})\")\n", "\n", "print(f\"\\n๐ŸŽฏ Final Statistics:\")\n", "stats = result.get('statistics', {})\n", "for key, value in stats.items():\n", " if key == 'domain_distribution':\n", " print(f\" {key}:\")\n", " for domain, count in value.items():\n", " print(f\" {domain}: {count}\")\n", " else:\n", " print(f\" {key}: {value}\")" ] }, { "cell_type": "markdown", "id": "7700779c", "metadata": {}, "source": [ "## Best Practices Summary\n", "\n", "Based on the examples we've explored, here are the key best practices for using AI-Graph effectively:" ] }, { "cell_type": "code", "execution_count": 7, "id": "d7f585f7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "๐ŸŽฏ AI-Graph Best Practices:\n", "\n", "1. ๐ŸŽฏ Single Responsibility\n", " Each step should have one clear purpose.\n", " โœ… Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep\n", " โŒ Bad: ProcessAllEmailDataStep\n", "\n", "2. ๐Ÿ”’ Immutable Data\n", " Avoid modifying input data; return new data instead.\n", " โœ… Good: data['new_field'] = processed_value\n", " โŒ Bad: Modifying data['existing_field'] in place without copying\n", "\n", "3. ๐Ÿ›ก๏ธ Error Handling\n", " Always consider what can go wrong and handle it gracefully.\n", " โœ… Good: Check for required fields, validate data types\n", " โŒ Bad: Assume data is always in expected format\n", "\n", "4. ๐Ÿท๏ธ Type Safety\n", " Use type hints to make your code more robust.\n", " โœ… Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]\n", " โŒ Bad: def _process_step(self, data) -> dict\n", "\n", "5. ๐Ÿงช Testing\n", " Write unit tests for each step individually.\n", " โœ… Good: Test each step with various input scenarios\n", " โŒ Bad: Only test the complete pipeline\n", "\n", "6. ๐Ÿ“š Documentation\n", " Document what each step does and its expected input/output.\n", " โœ… Good: Clear docstrings and parameter documentation\n", " โŒ Bad: No documentation or unclear naming\n", "\n", "๐ŸŽ‰ Example of Best Practice Step Created!\n", "See the BestPracticeStep class above for a complete example.\n" ] } ], "source": [ "# Best Practices Demonstration\n", "\n", "print(\"๐ŸŽฏ AI-Graph Best Practices:\")\n", "print(\"\\n1. ๐ŸŽฏ Single Responsibility\")\n", "print(\" Each step should have one clear purpose.\")\n", "print(\" โœ… Good: ValidateEmailStep, NormalizeEmailStep, ExtractDomainStep\")\n", "print(\" โŒ Bad: ProcessAllEmailDataStep\")\n", "\n", "print(\"\\n2. ๐Ÿ”’ Immutable Data\")\n", "print(\" Avoid modifying input data; return new data instead.\")\n", "print(\" โœ… Good: data['new_field'] = processed_value\")\n", "print(\" โŒ Bad: Modifying data['existing_field'] in place without copying\")\n", "\n", "print(\"\\n3. ๐Ÿ›ก๏ธ Error Handling\")\n", "print(\" Always consider what can go wrong and handle it gracefully.\")\n", "print(\" โœ… Good: Check for required fields, validate data types\")\n", "print(\" โŒ Bad: Assume data is always in expected format\")\n", "\n", "print(\"\\n4. ๐Ÿท๏ธ Type Safety\")\n", "print(\" Use type hints to make your code more robust.\")\n", "print(\" โœ… Good: def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]\")\n", "print(\" โŒ Bad: def _process_step(self, data) -> dict\")\n", "\n", "print(\"\\n5. ๐Ÿงช Testing\")\n", "print(\" Write unit tests for each step individually.\")\n", "print(\" โœ… Good: Test each step with various input scenarios\")\n", "print(\" โŒ Bad: Only test the complete pipeline\")\n", "\n", "print(\"\\n6. ๐Ÿ“š Documentation\")\n", "print(\" Document what each step does and its expected input/output.\")\n", "print(\" โœ… Good: Clear docstrings and parameter documentation\")\n", "print(\" โŒ Bad: No documentation or unclear naming\")\n", "\n", "# Example of a well-designed step following all best practices\n", "class BestPracticeStep(BasePipelineStep):\n", " \"\"\"\n", " Example step following all best practices.\n", " \n", " This step validates and normalizes user names in the input data.\n", " \n", " Expected input:\n", " - data['name']: str - The user name to process\n", " \n", " Output:\n", " - data['name']: str - The normalized name (if valid)\n", " - data['name_valid']: bool - Whether the name is valid\n", " - data['name_length']: int - Length of the normalized name\n", " \"\"\"\n", " \n", " def __init__(self, min_length: int = 2, max_length: int = 50, name: str = None):\n", " super().__init__(name or f\"NameValidator_{min_length}_{max_length}\")\n", " self.min_length = min_length\n", " self.max_length = max_length\n", " \n", " def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Process and validate user name.\"\"\"\n", " try:\n", " # Check if name field exists and is string\n", " if 'name' not in data:\n", " data['name_valid'] = False\n", " data['validation_error'] = \"Name field missing\"\n", " return data\n", " \n", " raw_name = data['name']\n", " if not isinstance(raw_name, str):\n", " data['name_valid'] = False\n", " data['validation_error'] = \"Name must be string\"\n", " return data\n", " \n", " # Normalize name (immutable approach)\n", " normalized_name = raw_name.strip().title()\n", " \n", " # Validate length\n", " if len(normalized_name) < self.min_length:\n", " data['name_valid'] = False\n", " data['validation_error'] = f\"Name too short (min {self.min_length})\"\n", " return data\n", " \n", " if len(normalized_name) > self.max_length:\n", " data['name_valid'] = False\n", " data['validation_error'] = f\"Name too long (max {self.max_length})\"\n", " return data\n", " \n", " # Success case\n", " data['name'] = normalized_name\n", " data['name_valid'] = True\n", " data['name_length'] = len(normalized_name)\n", " data['validation_error'] = None\n", " \n", " except Exception as e:\n", " # Graceful error handling\n", " data['name_valid'] = False\n", " data['validation_error'] = f\"Unexpected error: {str(e)}\"\n", " \n", " return data\n", "\n", "print(\"\\n๐ŸŽ‰ Example of Best Practice Step Created!\")\n", "print(\"See the BestPracticeStep class above for a complete example.\")" ] }, { "cell_type": "markdown", "id": "f24be563", "metadata": {}, "source": [ "## Summary\n", "\n", "In this comprehensive concepts guide, you've learned:\n", "\n", "โœ… **Chain of Responsibility Pattern** - How data flows through connected steps \n", "โœ… **Step Design Patterns** - Transform, Filter, Enrich, and Aggregate patterns \n", "โœ… **Pipeline Orchestration** - Managing step execution and error handling \n", "โœ… **ForEach Processing** - Handling collections and iterations effectively \n", "โœ… **Data Flow Patterns** - Linear, branching, filtering, and aggregation flows \n", "โœ… **Best Practices** - Guidelines for robust, maintainable pipeline code \n", "โœ… **Real-world Example** - Complete email processing pipeline with statistics \n", "\n", "### Key Takeaways\n", "\n", "1. **Keep steps focused** - Each step should do one thing well\n", "2. **Plan your data flow** - Understand how data transforms through your pipeline\n", "3. **Handle errors gracefully** - Always consider edge cases and failures\n", "4. **Use type hints** - Make your code more robust and maintainable\n", "5. **Test thoroughly** - Validate each step and the complete pipeline\n", "6. **Document clearly** - Help others (and future you) understand your design\n", "\n", "### Next Steps\n", "\n", "Now that you understand the core concepts, you can:\n", "\n", "- **Build custom pipelines** for your specific use cases\n", "- **Combine patterns** to create sophisticated data processing workflows\n", "- **Optimize performance** for large-scale data processing\n", "- **Integrate with other systems** using AI-Graph as a processing engine\n", "\n", "Happy pipeline building! ๐Ÿš€" ] } ], "metadata": { "kernelspec": { "display_name": "ai-graph", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.1" } }, "nbformat": 4, "nbformat_minor": 5 }