📡 Realtime Research Workflow Documentation

ระบบ Research Workflow แบบ Realtime ที่ช่วยให้ client สามารถติดตามความคืบหน้าการค้นหาข้อมูลและการวิเคราะห์แบบเรียลไทม์ผ่าน WebSocket

🎯 จุดประสงค์หลัก

🏗️ สถาปัตยกรรมระบบ

📱 Frontend (Vue.js)

  • pusher-js client
  • Event subscription & handling
  • Real-time UI updates
  • Progress visualization

🌐 API Gateway (nginx)

  • HTTP API proxying
  • WebSocket proxying
  • Load balancing
  • SSL termination

🐍 Research Service (FastAPI)

  • Background task processing
  • Event emission
  • Database operations
  • Multi-provider search

📡 Realtime Server (soketi)

  • Pusher protocol compatible
  • WebSocket connections
  • Event broadcasting
  • Channel management

🔄 การทำงานของระบบ

sequenceDiagram
    participant C as Client (Vue.js)
    participant N as nginx
    participant R as Research Service
    participant S as soketi
    participant E as External APIs

    Note over C,E: 1. Client Preparation Phase
    C->>S: Connect WebSocket
    C->>S: Subscribe to channel 'inference.{id}'
    S-->C: subscription_succeeded

    Note over C,E: 2. Research Initiation
    C->>N: POST /research (with channel param)
    N->>R: Forward request
    R->>R: Create research record
    R-->>C: Return research_id
    
    Note over C,E: 3. Background Processing
    R->>S: emit('started') with delay
    S->>C: Event: started
    
    R->>E: Multi-provider search
    E-->>R: Search results
    R->>S: emit('phase', 'searching')
    S->>C: Event: phase (searching)
    R->>S: emit('progress', search_count)
    S->>C: Event: progress (search results)
    
    R->>E: Web scraping
    E-->>R: Scraped content
    loop For each page
        R->>S: emit('progress', scraped_count)
        S->>C: Event: progress (scraping)
    end
    
    R->>E: Ollama analysis
    E-->>R: Analysis result
    R->>S: emit('phase', 'analyzing')
    S->>C: Event: phase (analyzing)
    R->>S: emit('progress', analysis_ratio)
    S->>C: Event: progress (analyzing)
    
    R->>S: emit('completed')
    S->>C: Event: completed
    
    Note over C,E: 4. Late Subscriber Handling
    R->>S: emit('progress', late_snapshot=true)
    S->>C: Event: progress (snapshot)
            

🔌 API Reference

POST /research

เริ่มต้นการค้นหาข้อมูล

{
  "entity_name": "การไฟฟ้าส่วนภูมิภาค",
  "max_pages": 5,
  "search_language": "th",
  "additional_keywords": ["ข้อมูล", "รายงาน"],
  "channel": "inference.123"  // ⚠️ สำคัญ: ระบุเพื่อเปิด realtime
}

Response:

{
  "success": true,
  "research_id": 123,
  "entity_name": "การไฟฟ้าส่วนภูมิภาค",
  "search_results_count": 0,
  "scraped_pages_count": 0,
  "processing_time": 0.0,
  "created_at": "2025-10-03T10:30:00"
}

POST /debug/realtime/{research_id}

ทดสอบการส่ง event แบบ manual

// Response
{
  "ok": true,
  "sent": true,
  "research_id": 123
}

GET /research/{research_id}

ดูรายละเอียดการค้นหา

GET /history

ดูประวัติการค้นหา

📨 Realtime Events

Event Name Description Payload Example
started เริ่มต้นการประมวลผล
{
  "v": 1,
  "research_id": 123,
  "entity_name": "การไฟฟ้า",
  "state": "started"
}
phase เปลี่ยนขั้นตอนการทำงาน
{
  "v": 1,
  "research_id": 123,
  "phase": "searching",
  "type": "phase"
}
progress อัปเดตความคืบหน้า
{
  "v": 1,
  "research_id": 123,
  "phase": "scraping",
  "type": "progress",
  "metrics": {
    "scraped_pages_count": 3,
    "total_pages_target": 5
  }
}
completed เสร็จสิ้นการประมวลผล
{
  "v": 1,
  "research_id": 123,
  "state": "completed",
  "processing_time": 45.67,
  "success": true
}
error เกิดข้อผิดพลาด
{
  "v": 1,
  "research_id": 123,
  "state": "error",
  "phase": "scraping",
  "message": "Network timeout"
}

🛠️ การตั้งค่าระบบ

Environment Variables

Variable Default Description
REALTIME_ENABLED true เปิด/ปิดการส่ง realtime events
REALTIME_VERBOSE false แสดง log การส่ง events
REALTIME_INITIAL_DELAY_MS 0 Delay ก่อนส่ง event แรก (ms)
SOKETI_HOST soketi Hostname ของ soketi server
SOKETI_PORT 6001 Port ของ soketi server
SOKETI_KEY key-app1 App key สำหรับ soketi
SOKETI_SECRET secret-app1 App secret สำหรับ soketi

💡 การใช้งานขั้นสูง

🔄 Race Condition Handling

ระบบมีกลไกป้องกัน race condition หลายระดับ:

🎛️ Client Implementation

// 1. Connect และ Subscribe
const pusher = new Pusher('key-app1', {
  cluster: 'mt1',
  wsHost: 'soketi.example.com',
  wsPort: 443,
  forceTLS: true,
  enabledTransports: ['ws', 'wss']
});

const channel = pusher.subscribe('inference.123');

// 2. Bind Events
channel.bind_global((eventName, data) => {
  console.log('Event:', eventName, data);
  // Handle different event types
});

// 3. Call API with Channel
fetch('/research', {
  method: 'POST',
  headers: {'Content-Type': 'application/json'},
  body: JSON.stringify({
    entity_name: 'การไฟฟ้า',
    channel: 'inference.123'  // Important!
  })
});

🔧 Troubleshooting

📋 ตัวอย่างการใช้งาน

การทำงานแบบ Realtime

// Client Code
const channel = pusher.subscribe('inference.123');
channel.bind_global((event, data) => {
  updateUI(event, data);
});

// API Call
const response = await fetch('/research', {
  method: 'POST',
  body: JSON.stringify({
    entity_name: 'การไฟฟ้าส่วนภูมิภาค',
    channel: 'inference.123'  // Enable realtime
  })
});

// จะได้รับ events: started → phase → progress → completed

การทำงานแบบ Synchronous (เดิม)

// API Call (ไม่ระบุ channel)
const response = await fetch('/research', {
  method: 'POST',
  body: JSON.stringify({
    entity_name: 'การไฟฟ้าส่วนภูมิภาค'
    // ไม่ระบุ channel = ไม่ส่ง events
  })
});

// ใช้ polling หรือรอ response แบบเดิม