CrewAI Flows to warstwa orkiestracji pozwalajaca budowac zlozone, wieloetapowe procesy laczace Crews, funkcje Python i logike warunkowa. Flows rozwiazuja problem, ktorego same Crews nie moga — budowe zlozonych pipeline’ow z rozgalezieniami, petlami i zarzadzaniem stanem.
- Roznice miedzy Crews a Flows
- Dekoratory: @start, @listen, @router, @and_, @or_
- Zarzadzanie stanem (State)
- Routing warunkowy
- Laczenie Flows z Crews
- Praktyczne wzorce i przyklady
Crews vs Flows — kiedy co uzywac?
Crews i Flows to dwa rozne poziomy abstrakcji w CrewAI:
Instalacja
# CrewAI z Flows (wbudowane od wersji 0.30+)
pip install crewai
# Sprawdz wersje
pip show crewai
Podstawy Flows
Flow to klasa dziedziczaca po Flow, gdzie metody polaczone sa dekoratorami:
from crewai.flow.flow import Flow, listen, start
class SimpleFlow(Flow):
"""Prosty przyklad Flow z trzema krokami."""
@start()
def get_topic(self):
"""Punkt startowy - zwraca temat."""
print("Step 1: Getting topic")
return "AI Agents in 2025"
@listen(get_topic)
def research(self, topic: str):
"""Wykonuje sie po get_topic, otrzymuje jego wynik."""
print(f"Step 2: Researching '{topic}'")
return {
"topic": topic,
"findings": ["Finding 1", "Finding 2", "Finding 3"]
}
@listen(research)
def summarize(self, data: dict):
"""Wykonuje sie po research, otrzymuje jego wynik."""
print(f"Step 3: Summarizing {len(data['findings'])} findings")
return f"Summary of {data['topic']}: {len(data['findings'])} key points found"
# Uruchomienie
flow = SimpleFlow()
result = flow.kickoff()
print(f"Final result: {result}")
# Output:
# Step 1: Getting topic
# Step 2: Researching 'AI Agents in 2025'
# Step 3: Summarizing 3 findings
# Final result: Summary of AI Agents in 2025: 3 key points found
Dekoratory Flow
Flows uzywaja dekoratorow do definiowania przeplywu:
State Management
Flows moga miec centralny stan (State) oparty na Pydantic:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
from typing import Optional
# 1. DEFINICJA STANU
class ResearchState(BaseModel):
"""Centralny stan flow - typowany dzieki Pydantic."""
topic: str = ""
research_data: list[str] = []
analysis: str = ""
final_report: str = ""
quality_score: Optional[float] = None
# 2. FLOW Z TYPOWANYM STANEM
class ResearchFlow(Flow[ResearchState]):
"""Flow z centralnym stanem."""
@start()
def initialize(self):
"""Inicjalizacja - ustaw temat w stanie."""
self.state.topic = "AI Agents"
print(f"Initialized with topic: {self.state.topic}")
return self.state.topic
@listen(initialize)
def research(self, topic: str):
"""Research - zapisz wyniki w stanie."""
self.state.research_data = [
"LangGraph dominates stateful agents",
"CrewAI best for team collaboration",
"MCP becomes universal standard"
]
print(f"Found {len(self.state.research_data)} insights")
return self.state.research_data
@listen(research)
def analyze(self, data: list):
"""Analiza - wykorzystaj dane ze stanu."""
self.state.analysis = f"Analysis of {len(data)} points for '{self.state.topic}'"
self.state.quality_score = 0.85
return self.state.analysis
@listen(analyze)
def generate_report(self, analysis: str):
"""Raport koncowy - wszystko z self.state."""
self.state.final_report = f"""
=== REPORT: {self.state.topic} ===
Findings: {len(self.state.research_data)}
Quality: {self.state.quality_score}
Analysis: {self.state.analysis}
"""
return self.state.final_report
# Uruchomienie
flow = ResearchFlow()
result = flow.kickoff()
# Dostep do koncowego stanu
print(f"Final state topic: {flow.state.topic}")
print(f"Quality score: {flow.state.quality_score}")
- Typowanie — Pydantic waliduje typy
- Centralnosc — kazda metoda ma dostep przez
self.state - Debugowanie — latwo sprawdzic stan po kazdym kroku
- Serializacja — mozna zapisac/wczytac stan (persystencja)
Routing warunkowy (@router)
Router pozwala na dynamiczne wybieranie sciezki na podstawie danych:
from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel
class QueryState(BaseModel):
query: str = ""
query_type: str = ""
response: str = ""
class QueryRouter(Flow[QueryState]):
"""Flow z routingiem warunkowym."""
@start()
def classify_query(self):
"""Klasyfikuj zapytanie."""
self.state.query = "How do neural networks work?"
# Prosta klasyfikacja (w produkcji uzyj LLM)
if "how" in self.state.query.lower():
self.state.query_type = "technical"
elif "price" in self.state.query.lower():
self.state.query_type = "sales"
else:
self.state.query_type = "general"
return self.state.query_type
@router(classify_query)
def route_query(self, query_type: str) -> str:
"""Router - zwraca nazwe sciezki."""
if query_type == "technical":
return "technical_path"
elif query_type == "sales":
return "sales_path"
else:
return "general_path"
# Rozne sciezki - sluchaja na stringach
@listen("technical_path")
def handle_technical(self):
"""Obsluga pytan technicznych."""
self.state.response = f"[TECH] Detailed explanation for: {self.state.query}"
return self.state.response
@listen("sales_path")
def handle_sales(self):
"""Obsluga pytan sprzedazowych."""
self.state.response = f"[SALES] Pricing info for: {self.state.query}"
return self.state.response
@listen("general_path")
def handle_general(self):
"""Obsluga ogolnych pytan."""
self.state.response = f"[GENERAL] Answer for: {self.state.query}"
return self.state.response
# Test
flow = QueryRouter()
result = flow.kickoff()
print(result) # [TECH] Detailed explanation for: How do neural networks work?
Rownolegle wykonanie (@and_, @or_)
from crewai.flow.flow import Flow, listen, start, and_, or_
class ParallelFlow(Flow):
"""Flow z rownoleglym wykonaniem."""
@start()
def init(self):
return "Start"
# Te dwie metody uruchomia sie ROWNOLEGLE po init
@listen(init)
def research_web(self, data):
"""Szukaj w internecie."""
print("Researching web...")
return "Web results"
@listen(init)
def research_database(self, data):
"""Szukaj w bazie."""
print("Researching database...")
return "Database results"
# Poczekaj az OBA zakoncza
@and_(research_web, research_database)
def merge_results(self, web_data, db_data):
"""Polacz wyniki gdy oba gotowe."""
print(f"Merging: {web_data} + {db_data}")
return f"Merged: {web_data} | {db_data}"
class RaceFlow(Flow):
"""Flow gdzie wygrywa pierwszy wynik."""
@start()
def init(self):
return "Query"
@listen(init)
def fast_api(self, data):
return "Fast API result"
@listen(init)
def slow_api(self, data):
import time; time.sleep(2)
return "Slow API result"
# Kontynuuj gdy KTORYKOLWIEK zakonczy
@or_(fast_api, slow_api)
def use_first_result(self, result):
"""Uzyj pierwszego dostepnego wyniku."""
return f"Using: {result}"
Laczenie Flows z Crews
Prawdziwa moc Flows to orkiestracja wielu Crews:
from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel
class ContentState(BaseModel):
topic: str = ""
research: str = ""
article: str = ""
review: str = ""
class ContentPipeline(Flow[ContentState]):
"""Pipeline laczacy 3 Crews."""
@start()
def get_topic(self):
self.state.topic = "AI Trends 2025"
return self.state.topic
@listen(get_topic)
def research_phase(self, topic: str):
"""Crew 1: Research team."""
researcher = Agent(
role="Researcher",
goal=f"Research {topic} thoroughly",
backstory="Expert researcher with deep knowledge"
)
task = Task(
description=f"Research the topic: {topic}",
expected_output="Comprehensive research findings",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
self.state.research = str(result)
return self.state.research
@listen(research_phase)
def writing_phase(self, research: str):
"""Crew 2: Writing team."""
writer = Agent(
role="Writer",
goal="Write engaging article",
backstory="Professional content writer"
)
task = Task(
description=f"Write article based on: {research[:500]}",
expected_output="Well-written article",
agent=writer
)
crew = Crew(agents=[writer], tasks=[task])
result = crew.kickoff()
self.state.article = str(result)
return self.state.article
@listen(writing_phase)
def review_phase(self, article: str):
"""Crew 3: Review team."""
editor = Agent(
role="Editor",
goal="Review and improve article",
backstory="Senior editor with keen eye"
)
task = Task(
description=f"Review article: {article[:500]}",
expected_output="Editorial feedback and score",
agent=editor
)
crew = Crew(agents=[editor], tasks=[task])
result = crew.kickoff()
self.state.review = str(result)
return self.state.review
# Uruchomienie pipeline
pipeline = ContentPipeline()
final_result = pipeline.kickoff()
print(f"Pipeline complete! Review: {pipeline.state.review}")
Wizualizacja Flow
# Generowanie wizualizacji grafu flow
flow = ContentPipeline()
# Zapisz jako obrazek
flow.plot("content_pipeline_graph") # tworzy .png
# Lub wyswietl w Jupyter
flow.plot() # wyswietli inline
Praktyczne wzorce
🔁 Retry Pattern
Router sprawdza jakosc i wraca do poprzedniego kroku jesli niezadowalajaca.
👥 Fan-out / Fan-in
Jeden start → wiele rownoleglych krokow → @and_ laczy wyniki.
🎯 Pipeline Pattern
Sekwencja Crews: Research → Analysis → Writing → Review.
🔌 Human-in-loop
Router czeka na input uzytkownika przed kontynuacja.
- Multi-stage pipeline — wiele etapow z roznymi Crews
- Warunkowa logika — rozne sciezki na podstawie danych
- Rownolegle przetwarzanie — @and_, @or_
- Centralny stan — dane wspoldzielone miedzy krokami
- Orkiestracja Crews — koordynacja wielu zespolow
📚 Bibliografia
- CrewAI. (2025). Flows Documentation. docs.crewai.com/concepts/flows
- CrewAI. (2025). Flow State Management. docs.crewai.com/concepts/flows#state-management
- CrewAI GitHub. (2025). Flow Examples. github.com/crewAIInc/crewAI
- CrewAI Blog. (2024). Introducing Flows. blog.crewai.com