Dialer SaaS multi-tenant con PJSUA2 e FastAPI: architettura e pitfalls

Come ho costruito un dialer SaaS multi-tenant da zero usando PJSUA2 come engine SIP, FastAPI per le API REST, Redis per la coda chiamate e PostgreSQL per la persistenza. Architettura, scelte tecniche e problemi reali.

Cos'è un dialer SaaS e perché costruirne uno

Un dialer è un sistema che effettua chiamate telefoniche in modo automatico — tipicamente usato per campagne outbound, promemoria, notifiche vocali. La parte "SaaS multi-tenant" significa che la stessa istanza del software serve più clienti (tenant) in modo isolato, ognuno con le proprie campagne, contatti e configurazioni SIP.

Ho costruito questo sistema per integrarlo con infrastrutture VoIP esistenti, senza dipendere da provider commerciali come Twilio o Vonage che hanno costi variabili e limitazioni sulla personalizzazione.

Lo stack scelto:

  • PJSUA2 — libreria SIP/media in Python, wrapper di PJSIP
  • FastAPI — framework REST asincrono
  • PostgreSQL — persistenza (tenant, campagne, contatti, log)
  • Redis — coda chiamate e stato in tempo reale
  • Celery — worker asincroni per l'esecuzione delle campagne

Architettura generale

Il sistema è diviso in tre componenti principali che comunicano tra loro:

┌─────────────────┐     REST API      ┌──────────────────┐
│   Frontend      │ ◄────────────── ► │   FastAPI        │
│   (Next.js)     │                   │   (API layer)    │
└─────────────────┘                   └────────┬─────────┘
                                               │
                              ┌────────────────┼────────────────┐
                              │                │                │
                    ┌─────────▼──────┐ ┌───────▼──────┐ ┌──────▼──────┐
                    │  PostgreSQL    │ │    Redis     │ │   Celery    │
                    │  (dati)        │ │  (code/stato)│ │  (workers)  │
                    └────────────────┘ └──────────────┘ └──────┬──────┘
                                                               │
                                                    ┌──────────▼──────────┐
                                                    │   PJSUA2 Engine     │
                                                    │   (chiamate SIP)    │
                                                    └─────────────────────┘

Il frontend non parla mai direttamente con PJSUA2 — tutto passa per FastAPI, che espone endpoint REST per creare campagne, avviare/fermare chiamate e recuperare statistiche.


Multi-tenancy: come isolare i tenant

La multi-tenancy è la parte più delicata. Ogni tenant deve vedere solo i propri dati e usare il proprio trunk SIP. Ho scelto un approccio schema condiviso con discriminatore — tutti i tenant stanno nello stesso database PostgreSQL, ma ogni tabella ha una colonna tenant_id.

Modello dati base

-- Tenant
CREATE TABLE tenants (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name TEXT NOT NULL,
    sip_host TEXT NOT NULL,
    sip_username TEXT NOT NULL,
    sip_password TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Campagne
CREATE TABLE campaigns (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID REFERENCES tenants(id) ON DELETE CASCADE,
    name TEXT NOT NULL,
    status TEXT DEFAULT 'draft', -- draft, running, paused, completed
    caller_id TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Contatti da chiamare
CREATE TABLE contacts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID REFERENCES tenants(id) ON DELETE CASCADE,
    campaign_id UUID REFERENCES campaigns(id) ON DELETE CASCADE,
    phone TEXT NOT NULL,
    status TEXT DEFAULT 'pending', -- pending, calling, answered, failed, busy
    called_at TIMESTAMPTZ,
    duration_seconds INTEGER
);

Dependency injection del tenant in FastAPI

Ogni richiesta API porta un token JWT che identifica il tenant. FastAPI estrae il tenant_id tramite una dependency:

from fastapi import Depends, HTTPException, Header
from jose import jwt, JWTError

SECRET_KEY = "cambia-questa-chiave"

async def get_current_tenant(authorization: str = Header(...)) -> str:
    try:
        token = authorization.split("Bearer ")[-1]
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        tenant_id = payload.get("tenant_id")
        if not tenant_id:
            raise HTTPException(status_code=401, detail="Token non valido")
        return tenant_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Token non valido")

Uso nelle route

@app.get("/campaigns") async def list_campaigns( tenant_id: str = Depends(get_current_tenant), db: AsyncSession = Depends(get_db) ): result = await db.execute( select(Campaign).where(Campaign.tenant_id == tenant_id) ) return result.scalars().all()

In questo modo è impossibile che un tenant veda i dati di un altro — il filtro è applicato a livello di codice su ogni query.


La coda chiamate con Redis

Quando una campagna viene avviata, i contatti da chiamare vengono caricati in una lista Redis. Il worker Celery li consuma uno alla volta rispettando il limite di chiamate concorrenti configurato per tenant.

import redis.asyncio as redis

r = redis.from_url("redis://localhost:6379")

async def enqueue_campaign(campaign_id: str, contacts: list[str]):
    # Chiave specifica per campagna: evita collisioni tra tenant
    key = f"queue:campaign:{campaign_id}"
    await r.delete(key)
    if contacts:
        await r.rpush(key, *contacts)

async def dequeue_contact(campaign_id: str) -> str | None:
    key = f"queue:campaign:{campaign_id}"
    # LPOP è atomico — nessun rischio di doppia chiamata allo stesso numero
    result = await r.lpop(key)
    return result.decode() if result else None

Lo stato in tempo reale (quante chiamate attive, quante completate) viene tenuto in Redis con chiavi separate:

async def update_campaign_stats(campaign_id: str, field: str, increment: int = 1):
    key = f"stats:campaign:{campaign_id}"
    await r.hincrby(key, field, increment)
    await r.expire(key, 86400)  # TTL 24h

PJSUA2: setup e pitfalls principali

PJSUA2 è la parte più complessa del sistema. È una libreria C++ con binding Python — potente ma con alcuni comportamenti non ovvi.

Inizializzazione

import pjsua2 as pj

class SIPEngine:
    def __init__(self):
        self.ep = pj.Endpoint()
        ep_cfg = pj.EpConfig()
        ep_cfg.logConfig.level = 0  # silenzia i log in produzione
        self.ep.libCreate()
        self.ep.libInit(ep_cfg)

        # Transport UDP
        tcp_cfg = pj.TransportConfig()
        tcp_cfg.port = 5060
        self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, tcp_cfg)
        self.ep.libStart()

Pitfall 1: PJSUA2 non è thread-safe

Il problema più comune. PJSUA2 ha il suo event loop interno — non puoi chiamare metodi da thread arbitrari. La soluzione è usare un thread dedicato e comunicare tramite queue:

import threading
import queue

call_queue = queue.Queue()

def pjsua2_thread():
    engine = SIPEngine()
    while True:
        try:
            task = call_queue.get(timeout=1)
            if task['action'] == 'call':
                engine.make_call(task['number'], task['account'])
        except queue.Empty:
            pass
        # Importante: dai tempo all'event loop di PJSUA2
        engine.ep.libHandleEvents(10)

threading.Thread(target=pjsua2_thread, daemon=True).start()

Pitfall 2: un account SIP per tenant

Ogni tenant ha le proprie credenziali SIP. Non puoi registrare tutti sull'account di default. PJSUA2 supporta account multipli — creane uno per ogni tenant attivo e memorizza il riferimento:

accounts: dict[str, pj.Account] = {}

def register_tenant_account(tenant_id: str, sip_config: dict) -> pj.Account:
    acc_cfg = pj.AccountConfig()
    acc_cfg.idUri = f"sip:{sip_config['username']}@{sip_config['host']}"
    acc_cfg.regConfig.registrarUri = f"sip:{sip_config['host']}"

    cred = pj.AuthCredInfo()
    cred.scheme = "digest"
    cred.realm = "*"
    cred.username = sip_config['username']
    cred.dataType = 0
    cred.data = sip_config['password']
    acc_cfg.sipConfig.authCreds.append(cred)

    account = pj.Account()
    account.create(acc_cfg)
    accounts[tenant_id] = account
    return account

Pitfall 3: gestione degli errori nelle chiamate

Le chiamate falliscono per mille motivi — numero occupato, non raggiungibile, rifiutato. PJSUA2 notifica lo stato tramite callback. Devi sempre gestire tutti i casi:

class CallHandler(pj.Call):
    def onCallState(self, prm):
        info = self.getInfo()
        if info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
            cause = info.lastStatusCode
            if cause == 486:    # Busy Here
                update_contact_status(self.contact_id, 'busy')
            elif cause == 404:  # Not Found
                update_contact_status(self.contact_id, 'invalid')
            else:
                update_contact_status(self.contact_id, 'failed')

Integrazione FastAPI ↔ Celery ↔ PJSUA2

Il flusso completo quando si avvia una campagna:

  1. Il frontend chiama POST /campaigns/{id}/start
  2. FastAPI valida la richiesta e pubblica un task su Celery
  3. Il worker Celery carica i contatti in Redis e avvia il loop di chiamate
  4. Per ogni contatto, il worker invia un comando alla queue di PJSUA2
  5. PJSUA2 effettua la chiamata e notifica lo stato tramite callback
  6. La callback aggiorna PostgreSQL e Redis con il risultato
# FastAPI route
@app.post("/campaigns/{campaign_id}/start")
async def start_campaign(
    campaign_id: str,
    tenant_id: str = Depends(get_current_tenant),
    db: AsyncSession = Depends(get_db)
):
    campaign = await get_campaign(db, campaign_id, tenant_id)
    if campaign.status != 'draft':
        raise HTTPException(400, "Campagna già avviata o completata")

    # Aggiorna stato
    campaign.status = 'running'
    await db.commit()

    # Lancia il worker Celery in background
    run_campaign.delay(campaign_id, tenant_id)
    return {"status": "started"}

Celery task

@celery_app.task def run_campaign(campaign_id: str, tenant_id: str): contacts = fetch_pending_contacts(campaign_id) enqueue_campaign_sync(campaign_id, [c.phone for c in contacts]) while True: phone = dequeue_contact_sync(campaign_id) if not phone: break call_queue.put({ 'action': 'call', 'number': phone, 'campaign_id': campaign_id, 'tenant_id': tenant_id }) time.sleep(2) # intervallo tra una chiamata e la successiva

Conclusioni

Il progetto ha richiesto circa tre settimane di sviluppo full-time. I punti che richiedono più attenzione sono la gestione del thread PJSUA2 e l'isolamento corretto dei tenant — errori in questi due punti portano a bug difficili da riprodurre in produzione.

Il vantaggio rispetto a usare un servizio cloud è il controllo totale: nessun costo per chiamata, nessun limite di concorrenza imposto dal fornitore, integrazione diretta con i trunk SIP aziendali.

← Articolo precedente
FreeRADIUS + MikroTik PPPoE: setup completo da zero
Articolo successivo →
FreePBX module development: struttura corretta per sopravvivere agli update
← Torna al blog