Skip to content

MotiaDev/build-your-first-app

Repository files navigation

Pet Management System - Progressive Tutorial

A Step-by-Step Tutorial: From Basic API to Streaming AI Agents

This comprehensive tutorial demonstrates building an intelligent pet management system with Motia, progressively introducing concepts from simple REST APIs to advanced streaming AI agents. Each section builds upon the previous, showcasing real-world patterns across TypeScript, JavaScript, and Python.


πŸ“š Tutorial Overview - 5 Progressive Levels

This tutorial is structured as a progressive learning path where each level adds complexity and demonstrates new Motia capabilities:

Level 1: API Endpoints          β†’  Basic CRUD operations
          ↓
Level 2: Background Jobs        β†’  Async processing (Queue + Cron)
          ↓
Level 3: Workflow Orchestrator  β†’  Centralized state management
          ↓
Level 4: AI Agents             β†’  Intelligent decision making
          ↓
Level 5: Streaming AI Agents   β†’  Real-time updates with SSE

What You'll Build

By the end of this tutorial, you'll have a complete pet management system with:

  • βœ… RESTful APIs - Full CRUD operations for pet records
  • βœ… Background Jobs - Queue-based and scheduled async processing
  • βœ… Workflow Orchestration - Automated lifecycle management with guard enforcement
  • βœ… AI Decision Making - OpenAI-powered health and adoption reviews
  • βœ… Real-Time Streaming - Server-Sent Events for live progress updates
  • βœ… Multi-Language Support - Identical functionality in TypeScript, JavaScript, and Python

🌟 Features by Tutorial Level

Level 1: API Endpoints

  • Multi-language support (TypeScript, JavaScript, Python)
  • RESTful API design with proper HTTP methods
  • Request validation using Zod
  • File-based JSON storage
  • Standard error handling

Level 2: Background Jobs

  • Queue-Based Jobs: Event-triggered async processing
  • Cron-Based Jobs: Scheduled maintenance tasks
  • Soft delete pattern with 30-day retention
  • Non-blocking API responses
  • Event-driven job triggering

Level 3: Workflow Orchestrator

  • Centralized state management
  • Automated lifecycle transitions
  • Guard enforcement for valid transitions
  • Staff decision points vs automatic progressions
  • Status validation and rejection handling

Level 4: AI Agents

  • AI Profile Enrichment: Automatic pet profile generation using OpenAI
  • Health Review Agent: AI-driven health assessments with emit selection
  • Adoption Review Agent: AI-driven adoption readiness evaluation
  • Structured decision artifacts with rationale
  • Idempotent decision caching

Level 5: Streaming AI Agents ⭐

  • Real-Time SSE Streaming: Live progress updates during pet creation
  • Progressive Updates: Stream messages as background jobs execute
  • AI Enrichment Streaming: Real-time AI profile generation progress
  • Health Check Streaming: Live quarantine and health status updates
  • Orchestrator Streaming: Status transition notifications
  • Non-Blocking UX: Immediate API response with ongoing updates

πŸš€ Getting Started

Prerequisites

  1. Install Dependencies

    npm install
    pip install -r requirements.txt
  2. Set Up Environment Variables

    Create a .env file in the project root:

    # .env
    OPENAI_API_KEY=your_openai_api_key_here

    Get your OpenAI API key:

    • Visit OpenAI Platform
    • Create an account or sign in
    • Generate a new API key
    • Copy the key and add it to your environment
  3. Start Motia Server

    npm run dev
    # or
    motia dev
  4. Open Workbench

    • Navigate to Motia Workbench in your browser
    • Select the appropriate workflow (TsPetManagement, JsPetManagement, or PyPetManagement)
    • View the visual workflow diagram showing all steps and connections

πŸ“– Level 1: API Endpoints - Basic CRUD

The foundation of our system is a RESTful API for pet management. This level demonstrates basic API design, validation, and data persistence.

API Endpoints

All three languages provide identical CRUD functionality:

TypeScript Endpoints (/ts/pets)

Method Endpoint Description
POST /ts/pets Create a new pet
GET /ts/pets List all pets
GET /ts/pets/:id Get pet by ID
PUT /ts/pets/:id Update pet
DELETE /ts/pets/:id Soft delete pet

JavaScript Endpoints (/js/pets)

Method Endpoint Description
POST /js/pets Create a new pet
GET /js/pets List all pets
GET /js/pets/:id Get pet by ID
PUT /js/pets/:id Update pet
DELETE /js/pets/:id Soft delete pet

Python Endpoints (/py/pets)

Method Endpoint Description
POST /py/pets Create a new pet
GET /py/pets List all pets
GET /py/pets/:id Get pet by ID
PUT /py/pets/:id Update pet
DELETE /py/pets/:id Soft delete pet

Pet Data Model

{
  "id": "1",
  "name": "Buddy",
  "species": "dog",
  "ageMonths": 24,
  "status": "new",
  "createdAt": 1640995200000,
  "updatedAt": 1640995200000,
  "weightKg": 15.5,
  "symptoms": ["coughing", "lethargy"],
  "notes": "Welcome to our pet store!",
  "nextFeedingAt": 1641081600000,
  "deletedAt": null,
  "purgeAt": null,
  "profile": {
    "bio": "Buddy is a friendly and energetic golden retriever...",
    "breedGuess": "Golden Retriever Mix",
    "temperamentTags": ["friendly", "energetic", "loyal"],
    "adopterHints": "Perfect for active families..."
  }
}

Testing Level 1

# Create a pet
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Buddy",
    "species": "dog",
    "ageMonths": 24
  }'

# List all pets
curl http://localhost:3000/ts/pets

# Get specific pet
curl http://localhost:3000/ts/pets/1

# Update pet
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"ageMonths": 25}'

# Soft delete pet
curl -X DELETE http://localhost:3000/ts/pets/1

πŸ“– Level 2: Background Jobs - Async Processing

Building on Level 1, we add asynchronous background processing to handle time-consuming tasks without blocking API responses.

Queue-Based Job: SetNextFeedingReminder

Triggered by: Creating a pet via any POST /*/pets endpoint

Purpose: Sets next feeding reminder and adds welcome notes after pet creation

Process:

  1. Pet creation API completes immediately with status: 201
  2. API emits language-specific event (e.g., ts.feeding.reminder.enqueued)
  3. Background job picks up the event and processes asynchronously
  4. Job adds welcome notes, calculates next feeding time, and sets status to in_quarantine
  5. Job emits completion event with processing metrics

Console Output:

🐾 Pet created { petId: '1', name: 'Buddy', species: 'dog', status: 'new' }
πŸ”„ Setting next feeding reminder { petId: '1', enqueuedAt: 1640995200000 }
βœ… Next feeding reminder set { petId: '1', notes: 'Welcome to our pet store!...', nextFeedingAt: '2022-01-02T00:00:00.000Z' }

Cron-Based Job: Deletion Reaper

Schedule: Daily at 2:00 AM (0 2 * * *)

Purpose: Permanently removes pets that have been soft deleted and are past their purge date

Process:

  1. Scans for pets with status="deleted" and purgeAt <= now
  2. Permanently removes matching pets from the datastore
  3. Emits audit events for each purged pet
  4. Reports completion statistics

Console Output:

πŸ”„ Deletion Reaper started - scanning for pets to purge
πŸ’€ Pet permanently purged { petId: '1', name: 'Buddy', deletedAt: '2022-01-01T00:00:00.000Z', purgeAt: '2022-01-31T00:00:00.000Z' }
βœ… Deletion Reaper completed { totalScanned: 5, purgedCount: 2, failedCount: 0 }

Testing Level 2

# Create a pet and watch console for background job processing
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "Whiskers", "species": "cat", "ageMonths": 12}'

# Check pet record after job completion (wait a few seconds)
curl http://localhost:3000/ts/pets/1

# Test soft delete
curl -X DELETE http://localhost:3000/ts/pets/1

# Verify pet is marked as deleted but still exists
curl http://localhost:3000/ts/pets/1

πŸ“– Level 3: Workflow Orchestrator - Centralized State Management

Level 3 introduces a Pet Lifecycle Orchestrator that manages all status transitions through a realistic shelter workflow with guard enforcement.

Core Architecture

Centralized Control: Only the orchestrator can modify pet.status. All other components emit events.

Staff-Driven Workflow: Status changes require staff decisions through the existing PUT API.

Event-Driven: Orchestrator subscribes to domain events and validates transition rules.

Language Isolation: Each language has its own orchestrator with language-specific event namespaces.

Pet Lifecycle States

  • new - Initial status when pet is created
  • in_quarantine - Pet is in quarantine after feeding reminder setup
  • healthy - Pet is healthy and cleared from quarantine
  • available - Pet is ready for adoption
  • pending - Adoption application in progress
  • adopted - Pet has been adopted
  • ill - Pet is identified as ill
  • under_treatment - Pet is receiving medical treatment
  • recovered - Pet has recovered from illness
  • deleted - Pet is soft deleted (outside orchestrator)

Transition Rules

Trigger Event From Status To Status Action Type
feeding.reminder.completed new in_quarantine Automatic
status.update.requested in_quarantine healthy Staff Decision
status.update.requested healthy available Automatic Progression
status.update.requested healthy, in_quarantine, available ill Staff Assessment
status.update.requested ill under_treatment Automatic Progression
status.update.requested under_treatment recovered Staff Decision
status.update.requested recovered healthy Automatic Progression
status.update.requested available pending Staff Decision
status.update.requested pending adopted Staff Decision

Workflow Flow

POST /pets β†’ new
↓ (SetNextFeedingReminder completes - automatic)
in_quarantine
↓ (Staff health check via PUT API)
healthy
↓ (AUTOMATIC - orchestrator progression)
available
↓ (Staff adoption process via PUT API)
pending β†’ adopted

Illness Can Happen Anytime:
in_quarantine β†’ ill (Staff finds illness)
healthy β†’ ill (Staff assessment)
available β†’ ill (Staff discovers illness)
↓ (AUTOMATIC - orchestrator starts treatment)
under_treatment
↓ (Staff marks recovered via PUT API)
recovered
↓ (AUTOMATIC - orchestrator clears recovery)
healthy
↓ (AUTOMATIC - orchestrator marks ready)
available

Testing Level 3

# 1. Create a pet - starts with status=new, automatically moves to in_quarantine
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "Buddy", "species": "dog", "ageMonths": 24}'

# 2. Check current status (should be in_quarantine after feeding reminder)
curl http://localhost:3000/ts/pets/1

# 3. Staff health check - clear from quarantine to healthy
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "healthy"}'

# 4. Check status (should be available - automatic progression)
curl http://localhost:3000/ts/pets/1

# 5. Mark as ill
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "ill"}'

# 6. Check status (should be under_treatment - automatic progression)
curl http://localhost:3000/ts/pets/1

πŸ“– Level 4: AI Agents - Intelligent Decision Making

Level 4 introduces AI-powered agents that make intelligent decisions based on pet context using OpenAI.

Three Types of AI Agents

1. AI Profile Enrichment (Non-Routing)

Trigger: Automatic on pet.created event

Purpose: Generate detailed pet profiles using AI

AI-Generated Fields:

  • bio - Warm, engaging 2-3 sentence description
  • breedGuess - AI's best guess at breed or breed mix
  • temperamentTags - Array of 3-5 personality traits
  • adopterHints - Practical advice for potential adopters

Example Output:

{
  "profile": {
    "bio": "Luna is a graceful and independent cat with striking green eyes who enjoys sunny windowsills and gentle head scratches.",
    "breedGuess": "Domestic Shorthair",
    "temperamentTags": ["independent", "calm", "affectionate", "observant"],
    "adopterHints": "Ideal for singles or couples seeking a low-maintenance companion. Prefers quiet environments."
  }
}

2. Health Review Agent (Routing)

Endpoint: POST /ts/pets/:id/health-review

Purpose: AI evaluates pet health and chooses appropriate action

Available Decisions:

  • emit.health.treatment_required β†’ Pet needs medical treatment
  • emit.health.no_treatment_needed β†’ Pet is healthy

Decision Process:

  1. Analyze pet symptoms, age, species, current status
  2. LLM chooses appropriate emit based on context
  3. Provides structured rationale for decision
  4. Orchestrator consumes emit and applies transitions

3. Adoption Review Agent (Routing)

Endpoint: POST /ts/pets/:id/adoption-review

Purpose: AI evaluates adoption readiness

Available Decisions:

  • emit.adoption.needs_data β†’ Pet needs additional data
  • emit.adoption.ready β†’ Pet is ready for adoption

Decision Artifact:

{
  "petId": "1",
  "agentType": "health-review",
  "timestamp": 1640995200000,
  "parsedDecision": {
    "chosenEmit": "emit.health.treatment_required",
    "rationale": "Pet shows concerning symptoms of coughing and lethargy that require veterinary evaluation."
  },
  "success": true
}

Testing Level 4

# Test AI Profile Enrichment (automatic)
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "Luna", "species": "cat", "ageMonths": 18}'

# Wait 2-3 seconds, then check for AI-generated profile
curl http://localhost:3000/ts/pets/1

# Test Health Review Agent with symptoms
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Max",
    "species": "dog",
    "ageMonths": 36,
    "symptoms": ["coughing", "lethargy", "fever"]
  }'

# Trigger health review
curl -X POST http://localhost:3000/ts/pets/2/health-review

# Test Adoption Review Agent
curl -X POST http://localhost:3000/ts/pets/1/adoption-review

πŸ“– Level 5: Streaming AI Agents ⭐ - Real-Time Updates

The Final Level: Building on all previous levels, we add real-time streaming to provide immediate user feedback while background processes execute.

What is Streaming?

Traditional APIs return a single response after all processing completes. With Motia's native Streams API, your API can return immediately with a stream that updates in real-time as background jobs, AI agents, and orchestrators process asynchronously.

Stream Configuration

The streaming feature is defined in pet-creation.stream.ts:

import { StreamConfig } from 'motia'
import { z } from 'zod'

export const config: StreamConfig = {
  name: 'petCreation',
  schema: z.object({ 
    message: z.string()
  }),
  baseConfig: {
    storageType: 'default',
  },
}

This stream is available as context.streams.petCreation in all steps within the flow.

How Streaming Works

1. API Initializes Stream

// create-pet.step.ts
const result = await streams.petCreation.set(traceId, 'message', { 
  message: `Pet ${pet.name} (ID: ${pet.id}) created successfully` 
});

return { 
  status: 201, 
  body: result  // Returns stream immediately
};

2. Background Jobs Update Stream

// set-next-feeding-reminder.job.step.ts
await streams.petCreation.set(traceId, 'message', { 
  message: `Pet ${pet.name} entered quarantine period` 
});

await streams.petCreation.set(traceId, 'message', { 
  message: `Health check passed for ${pet.name} - no symptoms found` 
});

await streams.petCreation.set(traceId, 'message', { 
  message: `${pet.name} is healthy and ready for adoption! βœ…` 
});

3. AI Enrichment Streams Progress

// ai-profile-enrichment.step.ts
await streams.petCreation.set(traceId, 'enrichment_started', { 
  message: `AI enrichment started for ${pet.name}` 
});

await streams.petCreation.set(traceId, `progress_bio`, { 
  message: `Generated bio for ${pet.name}` 
});

await streams.petCreation.set(traceId, 'completed', { 
  message: `AI enrichment completed for ${pet.name}`,
  profile: generatedProfile
});

4. Orchestrator Streams Transitions

// pet-lifecycle-orchestrator.step.ts
await streams.petCreation.set(traceId, transitionTo, {
  message: `Status transition: ${currentStatus} β†’ ${transitionTo}`,
  petId,
  timestamp: Date.now()
});

Real-Time Stream Response

When you create a pet, the API returns a stream object immediately:

{
  "streamId": "trace-abc123",
  "updates": [
    {
      "type": "message",
      "timestamp": 1640995200000,
      "data": {
        "message": "Pet Buddy (ID: 1) created successfully - Species: dog, Age: 24 months, Status: new"
      }
    }
  ]
}

As background jobs execute, the stream receives real-time updates:

{
  "streamId": "trace-abc123",
  "updates": [
    {
      "type": "message",
      "timestamp": 1640995200000,
      "data": { "message": "Pet Buddy (ID: 1) created successfully..." }
    },
    {
      "type": "message",
      "timestamp": 1640995201500,
      "data": { "message": "Pet Buddy entered quarantine period" }
    },
    {
      "type": "message",
      "timestamp": 1640995203000,
      "data": { "message": "Health check passed for Buddy - no symptoms found" }
    },
    {
      "type": "enrichment_started",
      "timestamp": 1640995201000,
      "data": { "message": "AI enrichment started for Buddy" }
    },
    {
      "type": "progress_bio",
      "timestamp": 1640995202500,
      "data": { "message": "Generated bio for Buddy" }
    },
    {
      "type": "completed",
      "timestamp": 1640995204000,
      "data": { 
        "message": "AI enrichment completed for Buddy",
        "profile": { "bio": "...", "breedGuess": "...", ... }
      }
    },
    {
      "type": "message",
      "timestamp": 1640995205000,
      "data": { "message": "Buddy is healthy and ready for adoption! βœ…" }
    }
  ]
}

Testing Level 5 - Streaming

Test 1: Basic Streaming (Healthy Pet)

# Create a healthy pet without symptoms
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Luna",
    "species": "cat",
    "ageMonths": 18
  }'

Expected Stream Updates:

  1. βœ… "Pet Luna (ID: X) created successfully"
  2. πŸ”„ "Pet Luna entered quarantine period"
  3. πŸ€– "AI enrichment started for Luna"
  4. πŸ“ "Generated bio for Luna"
  5. πŸ“ "Generated breed guess for Luna"
  6. βœ… "AI enrichment completed for Luna"
  7. βœ… "Health check passed for Luna - no symptoms found"
  8. βœ… "Luna is healthy and ready for adoption! βœ…"

Test 2: Streaming with Symptoms (Sick Pet)

# Create a pet with symptoms
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Max",
    "species": "dog",
    "ageMonths": 36,
    "symptoms": ["coughing", "lethargy", "fever"],
    "weightKg": 25.5
  }'

Expected Stream Updates:

  1. βœ… "Pet Max (ID: X) created successfully"
  2. πŸ”„ "Pet Max entered quarantine period"
  3. πŸ€– "AI enrichment started for Max"
  4. πŸ“ AI profile generation progress updates
  5. βœ… "AI enrichment completed for Max"
  6. ⚠️ "Health check failed for Max - symptoms detected: coughing, lethargy, fever"
  7. ❌ "Max needs medical treatment ❌"

Test 3: Monitor Stream in Real-Time

You can also fetch the stream directly to see updates:

# Get stream updates (replace STREAM_ID with actual streamId from response)
curl http://localhost:3000/streams/STREAM_ID

Benefits of Streaming

Traditional API Streaming API
⏳ Wait for all processing βœ… Immediate response
🀷 No progress visibility πŸ“Š Real-time progress updates
❌ Timeout on long operations βœ… No timeout issues
😴 Poor user experience 😊 Engaging user experience
πŸ”‡ Silent background jobs πŸ“’ Visible workflow execution

Real-World Use Cases

  1. Order Processing: Show shipping, payment, fulfillment status in real-time
  2. Document Generation: Stream progress as AI generates reports/documents
  3. Data Import: Show validation, processing, completion status
  4. ML Model Training: Stream training progress, metrics, completion
  5. Video Processing: Stream transcoding, quality checks, upload progress

πŸ–ΌοΈ Workbench Screenshot

Complete Workflow Visualization

The Motia Workbench provides a visual representation of the entire workflow showing all 5 levels:

Pet Management Workflow - Complete System Architecture

This screenshot shows the complete TsPetManagement workflow including:

  1. API Steps (Left side) - Create, Read, Update, Delete operations
  2. Background Jobs - set-next-feeding-reminder (queue-based), deletion-reaper (cron-based)
  3. Orchestrator (Center) - pet-lifecycle-orchestrator managing all state transitions
  4. AI Agents - health-review-agent, adoption-review-agent, ai-profile-enrichment
  5. Stream ⭐ - pet-creation.stream providing real-time SSE updates
  6. Staff Actions - treatment-scheduler, adoption-posting, recovery-monitor
  7. Event Connections - Visual arrows showing data flow between all components

The diagram clearly visualizes how the progressive tutorial builds from simple APIs (Level 1) through background jobs (Level 2), orchestration (Level 3), AI agents (Level 4), and finally streaming (Level 5).


πŸ“ File Structure

steps/
β”œβ”€β”€ javascript/
β”‚   β”œβ”€β”€ create-pet.step.js              # POST /js/pets (with streaming)
β”‚   β”œβ”€β”€ get-pets.step.js                # GET /js/pets
β”‚   β”œβ”€β”€ get-pet.step.js                 # GET /js/pets/:id
β”‚   β”œβ”€β”€ update-pet.step.js              # PUT /js/pets/:id
β”‚   β”œβ”€β”€ delete-pet.step.js              # DELETE /js/pets/:id
β”‚   β”œβ”€β”€ set-next-feeding-reminder.job.step.js  # Background job (streams updates)
β”‚   β”œβ”€β”€ deletion-reaper.cron.step.js    # Cron job (daily cleanup)
β”‚   β”œβ”€β”€ pet-lifecycle-orchestrator.step.js     # Workflow orchestrator (streams transitions)
β”‚   β”œβ”€β”€ ai-profile-enrichment.step.js   # AI profile generation (streams progress)
β”‚   β”œβ”€β”€ health-review-agent.step.js     # POST /js/pets/:id/health-review
β”‚   β”œβ”€β”€ adoption-review-agent.step.js   # POST /js/pets/:id/adoption-review
β”‚   β”œβ”€β”€ treatment-scheduler.step.js     # Staff action automation
β”‚   β”œβ”€β”€ adoption-posting.step.js        # Staff action automation
β”‚   β”œβ”€β”€ recovery-monitor.step.js        # Staff action automation
β”‚   β”œβ”€β”€ pet-creation.stream.js          # Stream configuration ⭐
β”‚   └── js-store.js                     # Data persistence layer
β”‚
β”œβ”€β”€ typescript/
β”‚   β”œβ”€β”€ create-pet.step.ts              # POST /ts/pets (with streaming)
β”‚   β”œβ”€β”€ get-pets.step.ts                # GET /ts/pets
β”‚   β”œβ”€β”€ get-pet.step.ts                 # GET /ts/pets/:id
β”‚   β”œβ”€β”€ update-pet.step.ts              # PUT /ts/pets/:id
β”‚   β”œβ”€β”€ delete-pet.step.ts              # DELETE /ts/pets/:id
β”‚   β”œβ”€β”€ set-next-feeding-reminder.job.step.ts  # Background job (streams updates)
β”‚   β”œβ”€β”€ deletion-reaper.cron.step.ts    # Cron job (daily cleanup)
β”‚   β”œβ”€β”€ pet-lifecycle-orchestrator.step.ts     # Workflow orchestrator (streams transitions)
β”‚   β”œβ”€β”€ ai-profile-enrichment.step.ts   # AI profile generation (streams progress)
β”‚   β”œβ”€β”€ health-review-agent.step.ts     # POST /ts/pets/:id/health-review
β”‚   β”œβ”€β”€ adoption-review-agent.step.ts   # POST /ts/pets/:id/adoption-review
β”‚   β”œβ”€β”€ treatment-scheduler.step.ts     # Staff action automation
β”‚   β”œβ”€β”€ adoption-posting.step.ts        # Staff action automation
β”‚   β”œβ”€β”€ recovery-monitor.step.ts        # Staff action automation
β”‚   β”œβ”€β”€ pet-creation.stream.ts          # Stream configuration ⭐
β”‚   β”œβ”€β”€ agent-decision-framework.ts     # Shared agent decision logic
β”‚   └── ts-store.ts                     # Data persistence layer
β”‚
β”œβ”€β”€ python/
β”‚   β”œβ”€β”€ create_pet_step.py              # POST /py/pets (with streaming)
β”‚   β”œβ”€β”€ get_pets_step.py                # GET /py/pets
β”‚   β”œβ”€β”€ get_pet_step.py                 # GET /py/pets/:id
β”‚   β”œβ”€β”€ update_pet_step.py              # PUT /py/pets/:id
β”‚   β”œβ”€β”€ delete_pet_step.py              # DELETE /py/pets/:id
β”‚   β”œβ”€β”€ set_next_feeding_reminder.job_step.py  # Background job (streams updates)
β”‚   β”œβ”€β”€ deletion_reaper.cron_step.py    # Cron job (daily cleanup)
β”‚   β”œβ”€β”€ pet_lifecycle_orchestrator_step.py     # Workflow orchestrator (streams transitions)
β”‚   β”œβ”€β”€ ai_profile_enrichment_step.py   # AI profile generation (streams progress)
β”‚   β”œβ”€β”€ health_review_agent_step.py     # POST /py/pets/:id/health-review
β”‚   β”œβ”€β”€ adoption_review_agent_step.py   # POST /py/pets/:id/adoption-review
β”‚   β”œβ”€β”€ treatment_scheduler_step.py     # Staff action automation
β”‚   β”œβ”€β”€ adoption_posting_step.py        # Staff action automation
β”‚   β”œβ”€β”€ recovery_monitor_step.py        # Staff action automation
β”‚   β”œβ”€β”€ pet_creation.stream.py          # Stream configuration ⭐
β”‚   └── services/
β”‚       β”œβ”€β”€ pet_store.py                # Data persistence layer
β”‚       └── types.py                    # Type definitions
β”‚
└── motia-workbench.json                # Workflow configuration

πŸŽ“ Complete Tutorial Walkthrough

Follow this step-by-step guide to experience all 5 levels in sequence:

Step 1: Basic Pet Creation (Levels 1-2)

# Create a healthy pet
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "Buddy", "species": "dog", "ageMonths": 24}'

# Expected: Immediate API response with stream
# Expected: Background job processes feeding reminder (watch console)
# Expected: AI enrichment generates profile (watch console)

Step 2: Verify Streaming Updates (Level 5)

# Get the pet to see final state
curl http://localhost:3000/ts/pets/1

# Expected: Pet with status "available", complete profile, notes, nextFeedingAt

Step 3: Test Orchestrator (Level 3)

# Try invalid transition
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "adopted"}'

# Expected: Transition rejected (can't go directly from available to adopted)

# Valid transition: pending adoption
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "pending"}'

# Valid transition: complete adoption
curl -X PUT http://localhost:3000/ts/pets/1 \
  -H "Content-Type: application/json" \
  -d '{"status": "adopted"}'

Step 4: Test AI Health Review (Level 4)

# Create a pet with symptoms
curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Max",
    "species": "dog",
    "ageMonths": 36,
    "symptoms": ["coughing", "lethargy", "fever"],
    "weightKg": 25.5
  }'

# Wait for pet to reach "healthy" or "available" status
# Then trigger AI health review
curl -X POST http://localhost:3000/ts/pets/2/health-review

# Expected: AI agent chooses "treatment_required"
# Expected: Orchestrator automatically transitions: healthy β†’ ill β†’ under_treatment

Step 5: Test AI Adoption Review (Level 4)

# Trigger adoption review on available pet
curl -X POST http://localhost:3000/ts/pets/2/adoption-review

# Expected: AI agent evaluates readiness
# Expected: Decision artifact with rationale

Step 6: Test Multi-Language Parity

# Test JavaScript implementation
curl -X POST http://localhost:3000/js/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "JS_Pet", "species": "cat", "ageMonths": 12}'

curl -X POST http://localhost:3000/js/pets/1/health-review

# Test Python implementation
curl -X POST http://localhost:3000/py/pets \
  -H "Content-Type: application/json" \
  -d '{"name": "Py_Pet", "species": "bird", "ageMonths": 6}'

curl -X POST http://localhost:3000/py/pets/1/health-review

# Expected: Identical behavior across all three languages

πŸ”‘ Key Learning Points

This progressive tutorial demonstrates:

Level 1 - API Endpoints

βœ… RESTful API design with proper HTTP methods
βœ… Request validation using Zod
βœ… File-based data persistence
βœ… Multi-language implementation (TypeScript, JavaScript, Python)

Level 2 - Background Jobs

βœ… Queue-based jobs (event-triggered)
βœ… Cron-based jobs (scheduled)
βœ… Non-blocking API responses
βœ… Event-driven job triggering
βœ… Soft delete pattern with cleanup

Level 3 - Workflow Orchestrator

βœ… Centralized state management
βœ… Guard enforcement for valid transitions
βœ… Automatic progressions vs staff decisions
βœ… Status validation and rejection handling
βœ… Event-driven workflow control

Level 4 - AI Agents

βœ… AI Profile Enrichment (content generation)
βœ… Agentic Decision Making (routing decisions)
βœ… Structured decision artifacts with rationale
βœ… Idempotent decision caching
βœ… OpenAI integration patterns

Level 5 - Streaming AI Agents ⭐

βœ… Real-time Server-Sent Events (SSE)
βœ… Progressive updates during async processing
βœ… Stream initialization and management
βœ… Multi-step workflow streaming
βœ… Non-blocking user experience
βœ… Visible workflow execution


🎯 Tutorial Objectives Achieved

By completing this tutorial, you've learned:

  1. Progressive Complexity: How to build from simple APIs to advanced streaming AI agents
  2. Motia Patterns: API steps, event steps, cron steps, streams, and agent patterns
  3. Real-World Architecture: Orchestrators, background jobs, AI integration, streaming
  4. Multi-Language Development: Identical functionality across TypeScript, JavaScript, Python
  5. Production-Ready Features: Validation, error handling, idempotency, guard enforcement
  6. Modern UX: Real-time feedback and progressive updates

πŸš€ Next Steps

Now that you've completed the tutorial, you can:

  1. Customize the Workflow: Add your own status states and transition rules
  2. Extend AI Agents: Create new agent endpoints with custom decision logic
  3. Add More Streams: Implement streaming in other workflows
  4. Build Your Own: Apply these patterns to your own domain and use cases
  5. Explore Motia Docs: Learn about advanced features and patterns

πŸ“š Additional Resources


🀝 Contributing

This is a demonstration project for Motia workflow capabilities. Feel free to use it as a template for your own projects!


Built with ❀️ using Motia - The Modern Workflow Platform

About

Build your First Motia App repository

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •