CrewAI flows

⏱ Czas czytania: ~15 minut | 📊 Poziom: Sredniozaawansowany | 📅 Aktualizacja: Grudzien 2025

CrewAI Flows to warstwa orkiestracji pozwalajaca budowac zlozone, wieloetapowe procesy laczace Crews, funkcje Python i logike warunkowa. Flows rozwiazuja problem, ktorego same Crews nie moga — budowe zlozonych pipeline’ow z rozgalezieniami, petlami i zarzadzaniem stanem.

🎯 Czego sie nauczysz:
  • Roznice miedzy Crews a Flows
  • Dekoratory: @start, @listen, @router, @and_, @or_
  • Zarzadzanie stanem (State)
  • Routing warunkowy
  • Laczenie Flows z Crews
  • Praktyczne wzorce i przyklady

Crews vs Flows — kiedy co uzywac?

Crews i Flows to dwa rozne poziomy abstrakcji w CrewAI:

Aspekt Crews Flows
Zakres Jedno zadanie / grupa zadan Caly workflow / pipeline
Przeplyw Sekwencyjny lub hierarchiczny Event-driven, dowolny graf
Stan Context (przekazywany) Centralny State (Pydantic)
Rozgalezienia ❌ Brak ✅ @router
Rownolegle wykonanie Ograniczone ✅ @and_, @or_
Typowe uzycie Zespol agentow nad zadaniem Orkiestracja wielu Crews
Flow orkiestruje wiele Crews FLOW START Crew 1 Research Crew 2 Analysis Crew 3 Writing State Flow zarzadza stanem i przeplywem miedzy Crews

Instalacja

# CrewAI z Flows (wbudowane od wersji 0.30+)
pip install crewai

# Sprawdz wersje
pip show crewai

Podstawy Flows

Flow to klasa dziedziczaca po Flow, gdzie metody polaczone sa dekoratorami:

from crewai.flow.flow import Flow, listen, start

class SimpleFlow(Flow):
    """Prosty przyklad Flow z trzema krokami."""
    
    @start()
    def get_topic(self):
        """Punkt startowy - zwraca temat."""
        print("Step 1: Getting topic")
        return "AI Agents in 2025"
    
    @listen(get_topic)
    def research(self, topic: str):
        """Wykonuje sie po get_topic, otrzymuje jego wynik."""
        print(f"Step 2: Researching '{topic}'")
        return {
            "topic": topic,
            "findings": ["Finding 1", "Finding 2", "Finding 3"]
        }
    
    @listen(research)
    def summarize(self, data: dict):
        """Wykonuje sie po research, otrzymuje jego wynik."""
        print(f"Step 3: Summarizing {len(data['findings'])} findings")
        return f"Summary of {data['topic']}: {len(data['findings'])} key points found"

# Uruchomienie
flow = SimpleFlow()
result = flow.kickoff()
print(f"Final result: {result}")

# Output:
# Step 1: Getting topic
# Step 2: Researching 'AI Agents in 2025'
# Step 3: Summarizing 3 findings
# Final result: Summary of AI Agents in 2025: 3 key points found

Dekoratory Flow

Flows uzywaja dekoratorow do definiowania przeplywu:

Dekorator Opis Przyklad
@start() Punkt wejscia flow @start() def init(self)
@listen(fn) Wykonaj gdy fn zakonczy @listen(init) def step2(self, data)
@listen("name") Sluchaj na nazwanym zdarzeniu @listen(“tech_path”)
@router(fn) Dynamiczny wybor sciezki return “path_a” lub “path_b”
@and_(f1, f2) Gdy WSZYSTKIE zakoncza @and_(research, validate)
@or_(f1, f2) Gdy KTORYKOLWIEK zakonczy @or_(fast_path, slow_path)

State Management

Flows moga miec centralny stan (State) oparty na Pydantic:

from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import Optional

# 1. DEFINICJA STANU
class ResearchState(BaseModel):
    """Centralny stan flow - typowany dzieki Pydantic."""
    topic: str = ""
    research_data: list[str] = []
    analysis: str = ""
    final_report: str = ""
    quality_score: Optional[float] = None

# 2. FLOW Z TYPOWANYM STANEM
class ResearchFlow(Flow[ResearchState]):
    """Flow z centralnym stanem."""
    
    @start()
    def initialize(self):
        """Inicjalizacja - ustaw temat w stanie."""
        self.state.topic = "AI Agents"
        print(f"Initialized with topic: {self.state.topic}")
        return self.state.topic
    
    @listen(initialize)
    def research(self, topic: str):
        """Research - zapisz wyniki w stanie."""
        self.state.research_data = [
            "LangGraph dominates stateful agents",
            "CrewAI best for team collaboration",
            "MCP becomes universal standard"
        ]
        print(f"Found {len(self.state.research_data)} insights")
        return self.state.research_data
    
    @listen(research)
    def analyze(self, data: list):
        """Analiza - wykorzystaj dane ze stanu."""
        self.state.analysis = f"Analysis of {len(data)} points for '{self.state.topic}'"
        self.state.quality_score = 0.85
        return self.state.analysis
    
    @listen(analyze)
    def generate_report(self, analysis: str):
        """Raport koncowy - wszystko z self.state."""
        self.state.final_report = f"""
=== REPORT: {self.state.topic} ===
Findings: {len(self.state.research_data)}
Quality: {self.state.quality_score}
Analysis: {self.state.analysis}
"""
        return self.state.final_report

# Uruchomienie
flow = ResearchFlow()
result = flow.kickoff()

# Dostep do koncowego stanu
print(f"Final state topic: {flow.state.topic}")
print(f"Quality score: {flow.state.quality_score}")
💡 Zalety State:
  • Typowanie — Pydantic waliduje typy
  • Centralnosc — kazda metoda ma dostep przez self.state
  • Debugowanie — latwo sprawdzic stan po kazdym kroku
  • Serializacja — mozna zapisac/wczytac stan (persystencja)

Routing warunkowy (@router)

Router pozwala na dynamiczne wybieranie sciezki na podstawie danych:

from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel

class QueryState(BaseModel):
    query: str = ""
    query_type: str = ""
    response: str = ""

class QueryRouter(Flow[QueryState]):
    """Flow z routingiem warunkowym."""
    
    @start()
    def classify_query(self):
        """Klasyfikuj zapytanie."""
        self.state.query = "How do neural networks work?"
        
        # Prosta klasyfikacja (w produkcji uzyj LLM)
        if "how" in self.state.query.lower():
            self.state.query_type = "technical"
        elif "price" in self.state.query.lower():
            self.state.query_type = "sales"
        else:
            self.state.query_type = "general"
        
        return self.state.query_type
    
    @router(classify_query)
    def route_query(self, query_type: str) -> str:
        """Router - zwraca nazwe sciezki."""
        if query_type == "technical":
            return "technical_path"
        elif query_type == "sales":
            return "sales_path"
        else:
            return "general_path"
    
    # Rozne sciezki - sluchaja na stringach
    @listen("technical_path")
    def handle_technical(self):
        """Obsluga pytan technicznych."""
        self.state.response = f"[TECH] Detailed explanation for: {self.state.query}"
        return self.state.response
    
    @listen("sales_path")
    def handle_sales(self):
        """Obsluga pytan sprzedazowych."""
        self.state.response = f"[SALES] Pricing info for: {self.state.query}"
        return self.state.response
    
    @listen("general_path")
    def handle_general(self):
        """Obsluga ogolnych pytan."""
        self.state.response = f"[GENERAL] Answer for: {self.state.query}"
        return self.state.response

# Test
flow = QueryRouter()
result = flow.kickoff()
print(result)  # [TECH] Detailed explanation for: How do neural networks work?

Rownolegle wykonanie (@and_, @or_)

from crewai.flow.flow import Flow, listen, start, and_, or_

class ParallelFlow(Flow):
    """Flow z rownoleglym wykonaniem."""
    
    @start()
    def init(self):
        return "Start"
    
    # Te dwie metody uruchomia sie ROWNOLEGLE po init
    @listen(init)
    def research_web(self, data):
        """Szukaj w internecie."""
        print("Researching web...")
        return "Web results"
    
    @listen(init)
    def research_database(self, data):
        """Szukaj w bazie."""
        print("Researching database...")
        return "Database results"
    
    # Poczekaj az OBA zakoncza
    @and_(research_web, research_database)
    def merge_results(self, web_data, db_data):
        """Polacz wyniki gdy oba gotowe."""
        print(f"Merging: {web_data} + {db_data}")
        return f"Merged: {web_data} | {db_data}"

class RaceFlow(Flow):
    """Flow gdzie wygrywa pierwszy wynik."""
    
    @start()
    def init(self):
        return "Query"
    
    @listen(init)
    def fast_api(self, data):
        return "Fast API result"
    
    @listen(init)
    def slow_api(self, data):
        import time; time.sleep(2)
        return "Slow API result"
    
    # Kontynuuj gdy KTORYKOLWIEK zakonczy
    @or_(fast_api, slow_api)
    def use_first_result(self, result):
        """Uzyj pierwszego dostepnego wyniku."""
        return f"Using: {result}"

Laczenie Flows z Crews

Prawdziwa moc Flows to orkiestracja wielu Crews:

from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ContentState(BaseModel):
    topic: str = ""
    research: str = ""
    article: str = ""
    review: str = ""

class ContentPipeline(Flow[ContentState]):
    """Pipeline laczacy 3 Crews."""
    
    @start()
    def get_topic(self):
        self.state.topic = "AI Trends 2025"
        return self.state.topic
    
    @listen(get_topic)
    def research_phase(self, topic: str):
        """Crew 1: Research team."""
        researcher = Agent(
            role="Researcher",
            goal=f"Research {topic} thoroughly",
            backstory="Expert researcher with deep knowledge"
        )
        
        task = Task(
            description=f"Research the topic: {topic}",
            expected_output="Comprehensive research findings",
            agent=researcher
        )
        
        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        
        self.state.research = str(result)
        return self.state.research
    
    @listen(research_phase)
    def writing_phase(self, research: str):
        """Crew 2: Writing team."""
        writer = Agent(
            role="Writer",
            goal="Write engaging article",
            backstory="Professional content writer"
        )
        
        task = Task(
            description=f"Write article based on: {research[:500]}",
            expected_output="Well-written article",
            agent=writer
        )
        
        crew = Crew(agents=[writer], tasks=[task])
        result = crew.kickoff()
        
        self.state.article = str(result)
        return self.state.article
    
    @listen(writing_phase)
    def review_phase(self, article: str):
        """Crew 3: Review team."""
        editor = Agent(
            role="Editor",
            goal="Review and improve article",
            backstory="Senior editor with keen eye"
        )
        
        task = Task(
            description=f"Review article: {article[:500]}",
            expected_output="Editorial feedback and score",
            agent=editor
        )
        
        crew = Crew(agents=[editor], tasks=[task])
        result = crew.kickoff()
        
        self.state.review = str(result)
        return self.state.review

# Uruchomienie pipeline
pipeline = ContentPipeline()
final_result = pipeline.kickoff()
print(f"Pipeline complete! Review: {pipeline.state.review}")

Wizualizacja Flow

# Generowanie wizualizacji grafu flow
flow = ContentPipeline()

# Zapisz jako obrazek
flow.plot("content_pipeline_graph")  # tworzy .png

# Lub wyswietl w Jupyter
flow.plot()  # wyswietli inline

Praktyczne wzorce

🔁 Retry Pattern

Router sprawdza jakosc i wraca do poprzedniego kroku jesli niezadowalajaca.

👥 Fan-out / Fan-in

Jeden start → wiele rownoleglych krokow → @and_ laczy wyniki.

🎯 Pipeline Pattern

Sekwencja Crews: Research → Analysis → Writing → Review.

🔌 Human-in-loop

Router czeka na input uzytkownika przed kontynuacja.

✅ Kiedy uzywac Flows:
  • Multi-stage pipeline — wiele etapow z roznymi Crews
  • Warunkowa logika — rozne sciezki na podstawie danych
  • Rownolegle przetwarzanie — @and_, @or_
  • Centralny stan — dane wspoldzielone miedzy krokami
  • Orkiestracja Crews — koordynacja wielu zespolow

📚 Bibliografia

  1. CrewAI. (2025). Flows Documentation. docs.crewai.com/concepts/flows
  2. CrewAI. (2025). Flow State Management. docs.crewai.com/concepts/flows#state-management
  3. CrewAI GitHub. (2025). Flow Examples. github.com/crewAIInc/crewAI
  4. CrewAI Blog. (2024). Introducing Flows. blog.crewai.com