Documentação de Eventos

WebSocket & Redis Streams

Visão Geral

A API oferece três formas de receber eventos em tempo real das instâncias do WhatsApp:

WebSocket

Conexão bidirecional em tempo real

Redis Streams

Sistema de mensagens em tempo real

Webhooks

HTTP POST para seu servidor

Dica: Você pode configurar individualmente quais eventos cada método deve receber através do Dashboard.

WebSocket

Conexão

Cada instância possui seu próprio endpoint WebSocket:

ws://SEU_HOST:PORTA/websocket/NOME_DA_INSTANCIA
wss://SEU_HOST:PORTA/websocket/NOME_DA_INSTANCIA  # Para HTTPS

Exemplo: ws://localhost:3003/websocket/minha-instancia

Formato dos Eventos

Os eventos são enviados em formato JSON:

{
  "type": "messages_upsert",
  "instance": "minha-instancia",
  "data": {
    "key": {
      "remoteJid": "[email protected]",
      "fromMe": false,
      "id": "3EB0C7F8..."
    },
    "message": {
      "conversation": "Olá, mundo!"
    },
    "messageTimestamp": 1234567890
  }
}

Exemplo JavaScript (Node.js)

const WebSocket = require('ws');

const ws = new WebSocket('ws://localhost:3003/websocket/minha-instancia');

ws.on('open', () => {
    console.log('✅ Conectado ao WebSocket');
});

ws.on('message', (data) => {
    const event = JSON.parse(data);
    console.log('📨 Evento recebido:', event.type);
    console.log('📦 Dados:', event.data);

    // Processar diferentes tipos de eventos
    switch(event.type) {
        case 'messages_upsert':
            console.log('Nova mensagem:', event.data.message);
            break;
        case 'qrcode_updated':
            console.log('QR Code:', event.data.qrcode);
            break;
        case 'connection_update':
            console.log('Status:', event.data.state);
            break;
    }
});

ws.on('error', (error) => {
    console.error('❌ Erro:', error);
});

ws.on('close', () => {
    console.log('🔌 Desconectado');
});

Exemplo Python

import asyncio
import websockets
import json

async def connect_websocket():
    uri = "ws://localhost:3003/websocket/minha-instancia"

    async with websockets.connect(uri) as websocket:
        print("✅ Conectado ao WebSocket")

        async for message in websocket:
            event = json.loads(message)
            print(f"📨 Evento recebido: {event['type']}")

            if event['type'] == 'messages_upsert':
                print(f"Nova mensagem: {event['data']['message']}")
            elif event['type'] == 'qrcode_updated':
                print(f"QR Code atualizado")
            elif event['type'] == 'connection_update':
                print(f"Status: {event['data']['state']}")

asyncio.run(connect_websocket())

Redis Streams

Os eventos são publicados em Redis Streams com o formato de chave:

stream:NOME_DA_INSTANCIA

Exemplos de streams:

  • stream:minha-instancia - Todos os eventos da instância
  • stream:instancia-2 - Todos os eventos da instancia-2

Cada mensagem contém os campos: event_type, payload e timestamp

Configuração

Configure o Redis no arquivo .env:

REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=""

Exemplo Node.js (ioredis)

const Redis = require('ioredis');

const redis = new Redis({
    host: 'localhost',
    port: 6379
});

async function consumeEvents(instanceName) {
    const streamKey = `stream:${instanceName}`;
    const groupName = `consumers:${instanceName}`;
    const consumerName = 'consumer-1';

    // Criar consumer group se não existir
    try {
        await redis.xgroup('CREATE', streamKey, groupName, '0', 'MKSTREAM');
    } catch (err) {
        // Group já existe
    }

    console.log(`✅ Consumindo eventos de ${streamKey}...`);

    while (true) {
        const messages = await redis.xreadgroup(
            'GROUP', groupName, consumerName,
            'BLOCK', 5000,
            'COUNT', 10,
            'STREAMS', streamKey, '>'
        );

        if (messages) {
            for (const [stream, streamMessages] of messages) {
                for (const [id, fields] of streamMessages) {
                    const eventType = fields[1]; // event_type
                    const payload = JSON.parse(fields[3]); // payload

                    console.log('📨 Event Type:', eventType);
                    console.log('📦 Payload:', payload);

                    // Confirmar processamento
                    await redis.xack(streamKey, groupName, id);
                }
            }
        }
    }
}

consumeEvents('minha-instancia').catch(console.error);

Exemplo Python (redis-py)

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

instance_name = 'minha-instancia'
stream_key = f'stream:{instance_name}'
group_name = f'consumers:{instance_name}'
consumer_name = 'consumer-1'

# Criar consumer group
try:
    r.xgroup_create(stream_key, group_name, '0', mkstream=True)
except redis.exceptions.ResponseError:
    pass  # Group já existe

print(f'✅ Consumindo eventos de {stream_key}...')

while True:
    messages = r.xreadgroup(group_name, consumer_name, {stream_key: '>'}, count=10, block=5000)

    for stream, msgs in messages:
        for msg_id, fields in msgs:
            event_type = fields['event_type']
            payload = json.loads(fields['payload'])

            print(f'📨 Event Type: {event_type}')
            print(f'📦 Payload: {payload}')

            # Confirmar processamento
            r.xack(stream_key, group_name, msg_id)

Redis Streams vs RabbitMQ

❌ RabbitMQ (Antigo)

Queue: minhainstancia.messages_upsert
Queue: minhainstancia.connection_update
Queue: minhainstancia.qrcode_updated
...
➡️ 1 queue por tipo de evento

✅ Redis Streams (Atual)

Stream: stream:minhainstancia
  ├─ event_type: messages_upsert
  ├─ event_type: connection_update
  ├─ event_type: qrcode_updated
  ...
➡️ 1 stream por instância

✅ Vantagens:

  • Menos overhead: 1 stream vs 30+ queues por instância
  • Mais performático: Redis é in-memory
  • Consumer Groups: Múltiplos consumers paralelos
  • Persistência: Mensagens ficam no stream (máx 10k)
  • Replay: Pode reprocessar mensagens antigas pelo ID

Consumindo Eventos Específicos

Para consumir apenas eventos específicos (como messages_upsert), você filtra pelo campo event_type dentro do consumer:

Node.js - Consumer com Filtro

const Redis = require('ioredis');
const redis = new Redis({ host: 'localhost', port: 6379 });

const instanceName = 'minhainstancia';
const streamKey = `stream:${instanceName}`;
const groupName = `consumer:${instanceName}`;
const consumerName = 'consumer-1';

// Criar consumer group (apenas uma vez)
async function createConsumerGroup() {
  try {
    await redis.xgroup('CREATE', streamKey, groupName, '0', 'MKSTREAM');
    console.log('✅ Consumer group criado');
  } catch (err) {
    if (err.message.includes('BUSYGROUP')) {
      console.log('ℹ️  Consumer group já existe');
    }
  }
}

// Consumir apenas messages_upsert
async function consumeMessages() {
  while (true) {
    try {
      const results = await redis.xreadgroup(
        'GROUP', groupName, consumerName,
        'BLOCK', 5000,  // Aguardar 5 segundos
        'COUNT', 10,    // Máximo de 10 mensagens
        'STREAMS', streamKey, '>'
      );

      if (!results) continue;

      for (const [stream, messages] of results) {
        for (const [id, fields] of messages) {
          // Converter array [key, value, key, value] para objeto
          const data = {};
          for (let i = 0; i < fields.length; i += 2) {
            data[fields[i]] = fields[i + 1];
          }

          const eventType = data.event_type;
          const payload = JSON.parse(data.payload);
          const timestamp = data.timestamp;

          // 🔍 Filtrar apenas messages_upsert
          if (eventType === 'messages_upsert') {
            console.log('📨 Nova mensagem recebida:', {
              id, eventType, timestamp, payload
            });

            // Processar a mensagem
            await handleMessage(payload);
          }

          // Acknowledge (marcar como processada)
          await redis.xack(streamKey, groupName, id);
        }
      }
    } catch (err) {
      console.error('❌ Erro ao consumir:', err);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

async function handleMessage(payload) {
  console.log('Processando mensagem:', payload);
  // Sua lógica aqui
}

// Iniciar
createConsumerGroup().then(consumeMessages);

Python - Consumer com Filtro

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

instance_name = 'minhainstancia'
stream_key = f'stream:{instance_name}'
group_name = f'consumer:{instance_name}'
consumer_name = 'consumer-1'

# Criar consumer group
try:
    r.xgroup_create(stream_key, group_name, id='0', mkstream=True)
    print('✅ Consumer group criado')
except redis.exceptions.ResponseError as e:
    if 'BUSYGROUP' in str(e):
        print('ℹ️  Consumer group já existe')

# Consumir apenas messages_upsert
def consume_messages():
    while True:
        try:
            messages = r.xreadgroup(
                groupname=group_name,
                consumername=consumer_name,
                streams={stream_key: '>'},
                count=10,
                block=5000
            )

            if not messages:
                continue

            for stream, message_list in messages:
                for message_id, data in message_list:
                    event_type = data['event_type']
                    payload = json.loads(data['payload'])
                    timestamp = data['timestamp']

                    # 🔍 Filtrar apenas messages_upsert
                    if event_type == 'messages_upsert':
                        print(f'📨 Nova mensagem: {payload}')
                        handle_message(payload)

                    # Acknowledge
                    r.xack(stream_key, group_name, message_id)

        except Exception as e:
            print(f'❌ Erro: {e}')
            time.sleep(5)

def handle_message(payload):
    print(f'Processando: {payload}')
    # Sua lógica aqui

# Iniciar
consume_messages()

Go - Consumer com Filtro

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()

    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    instanceName := "minhainstancia"
    streamKey := fmt.Sprintf("stream:%s", instanceName)
    groupName := fmt.Sprintf("consumer:%s", instanceName)
    consumerName := "consumer-1"

    // Criar consumer group
    err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "0").Err()
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        log.Fatalf("Erro ao criar group: %v", err)
    }

    log.Println("✅ Consumindo eventos de", streamKey)

    // Consumir mensagens
    for {
        entries, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, ">"},
            Count:    10,
            Block:    5 * time.Second,
        }).Result()

        if err != nil {
            if err == redis.Nil {
                continue
            }
            log.Printf("Erro ao ler: %v", err)
            time.Sleep(5 * time.Second)
            continue
        }

        for _, entry := range entries {
            for _, message := range entry.Messages {
                eventType := message.Values["event_type"].(string)
                payloadStr := message.Values["payload"].(string)

                // 🔍 Filtrar apenas messages_upsert
                if eventType == "messages_upsert" {
                    var payload map[string]interface{}
                    json.Unmarshal([]byte(payloadStr), &payload)

                    fmt.Printf("📨 Nova mensagem: %v\n", payload)
                    handleMessage(payload)
                }

                // Acknowledge
                rdb.XAck(ctx, streamKey, groupName, message.ID)
            }
        }
    }
}

func handleMessage(payload map[string]interface{}) {
    fmt.Printf("Processando: %v\n", payload)
    // Sua lógica aqui
}

Consumir Múltiplos Tipos de Evento

Para processar vários tipos de evento da mesma instância:

// Filtrar múltiplos tipos
const allowedEvents = [
  'messages_upsert',
  'connection_update',
  'qrcode_updated'
];

if (allowedEvents.includes(eventType)) {
  console.log(`📨 Evento ${eventType}:`, payload);

  switch(eventType) {
    case 'messages_upsert':
      await handleMessage(payload);
      break;
    case 'connection_update':
      await handleConnection(payload);
      break;
    case 'qrcode_updated':
      await handleQRCode(payload);
      break;
  }

  await redis.xack(streamKey, groupName, id);
}

Dicas Importantes

  • Consumer Groups: Permitem múltiplos consumers processarem mensagens em paralelo
  • XACK: Sempre confirme o processamento com xack() para não reprocessar
  • Blocking: Use BLOCK para aguardar novas mensagens (evita busy-wait)
  • Retry: Mensagens não confirmadas podem ser reprocessadas por outros consumers
  • MAXLEN ~10000: Streams mantêm no máximo 10.000 mensagens (automático)

Webhooks

Webhooks enviam eventos via HTTP POST para URLs configuradas. Configure através do Dashboard em Configurações da Instância → Webhooks.

Formato da Requisição

POST https://seu-servidor.com/webhook
Content-Type: application/json

{
  "instance": "minha-instancia",
  "eventType": "messages_upsert",
  "data": { ... }
}

Exemplo Express.js

const express = require('express');
const app = express();

app.use(express.json());

app.post('/webhook', (req, res) => {
    const { instance, eventType, data } = req.body;

    console.log(`📨 Evento: ${eventType} da instância: ${instance}`);

    // Processar o evento
    if (eventType === 'messages_upsert') {
        console.log('Nova mensagem:', data.message);
    }

    // IMPORTANTE: Responder rapidamente (< 5 segundos)
    res.status(200).json({ success: true });
});

app.listen(3000, () => {
    console.log('✅ Webhook server rodando na porta 3000');
});
Importante: Seu endpoint deve responder em menos de 5 segundos. Se demorar mais, a API tentará reenviar automaticamente.

Tipos de Eventos

messages_upsert

Nova mensagem recebida ou enviada

messages_update

Atualização de status de mensagem (lida, entregue, etc)

connection_update

Mudança no status de conexão da instância

qrcode_updated

Novo QR Code disponível para escaneamento

presence_update

Atualização de presença (online/offline/digitando)

groups_upsert

Novo grupo criado ou atualizado

group_participants_update

Mudança nos participantes de um grupo

Exemplos Práticos

Bot de Auto-Resposta

// WebSocket
ws.on('message', async (data) => {
    const event = JSON.parse(data);

    if (event.type === 'messages_upsert') {
        const msg = event.data;

        // Ignorar mensagens enviadas por mim
        if (msg.key.fromMe) return;

        const text = msg.message?.conversation || '';
        const from = msg.key.remoteJid;

        // Responder "oi" quando receber "olá"
        if (text.toLowerCase().includes('olá')) {
            await fetch('http://localhost:3003/messages/send/text', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'X-API-Key': 'SUA_API_KEY'
                },
                body: JSON.stringify({
                    instance: 'minha-instancia',
                    to: from,
                    text: 'Oi! Como posso ajudar?'
                })
            });
        }
    }
});

Salvar Mensagens no Banco de Dados

import asyncio
import websockets
import json
import sqlite3

# Conectar ao banco de dados
conn = sqlite3.connect('mensagens.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS mensagens
             (id TEXT, remetente TEXT, texto TEXT, timestamp INTEGER)''')

async def salvar_mensagens():
    uri = "ws://localhost:3003/websocket/minha-instancia"

    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            event = json.loads(message)

            if event['type'] == 'messages_upsert':
                data = event['data']
                msg_id = data['key']['id']
                from_jid = data['key']['remoteJid']
                text = data['message'].get('conversation', '')
                timestamp = data['messageTimestamp']

                c.execute(
                    "INSERT INTO mensagens VALUES (?, ?, ?, ?)",
                    (msg_id, from_jid, text, timestamp)
                )
                conn.commit()
                print(f"💾 Mensagem salva: {text[:30]}...")

asyncio.run(salvar_mensagens())

Precisa de ajuda?

Acesse o Dashboard para configurar eventos ou consulte a documentação da API.