Cos'è un dialer SaaS e perché costruirne uno
Un dialer è un sistema che effettua chiamate telefoniche in modo automatico — tipicamente usato per campagne outbound, promemoria, notifiche vocali. La parte "SaaS multi-tenant" significa che la stessa istanza del software serve più clienti (tenant) in modo isolato, ognuno con le proprie campagne, contatti e configurazioni SIP.
Ho costruito questo sistema per integrarlo con infrastrutture VoIP esistenti, senza dipendere da provider commerciali come Twilio o Vonage che hanno costi variabili e limitazioni sulla personalizzazione.
Lo stack scelto:
- PJSUA2 — libreria SIP/media in Python, wrapper di PJSIP
- FastAPI — framework REST asincrono
- PostgreSQL — persistenza (tenant, campagne, contatti, log)
- Redis — coda chiamate e stato in tempo reale
- Celery — worker asincroni per l'esecuzione delle campagne
Architettura generale
Il sistema è diviso in tre componenti principali che comunicano tra loro:
┌─────────────────┐ REST API ┌──────────────────┐
│ Frontend │ ◄────────────── ► │ FastAPI │
│ (Next.js) │ │ (API layer) │
└─────────────────┘ └────────┬─────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────▼──────┐ ┌───────▼──────┐ ┌──────▼──────┐
│ PostgreSQL │ │ Redis │ │ Celery │
│ (dati) │ │ (code/stato)│ │ (workers) │
└────────────────┘ └──────────────┘ └──────┬──────┘
│
┌──────────▼──────────┐
│ PJSUA2 Engine │
│ (chiamate SIP) │
└─────────────────────┘
Il frontend non parla mai direttamente con PJSUA2 — tutto passa per FastAPI, che espone endpoint REST per creare campagne, avviare/fermare chiamate e recuperare statistiche.
Multi-tenancy: come isolare i tenant
La multi-tenancy è la parte più delicata. Ogni tenant deve vedere solo i propri dati e usare il proprio trunk SIP. Ho scelto un approccio schema condiviso con discriminatore — tutti i tenant stanno nello stesso database PostgreSQL, ma ogni tabella ha una colonna tenant_id.
Modello dati base
-- Tenant
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
sip_host TEXT NOT NULL,
sip_username TEXT NOT NULL,
sip_password TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Campagne
CREATE TABLE campaigns (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID REFERENCES tenants(id) ON DELETE CASCADE,
name TEXT NOT NULL,
status TEXT DEFAULT 'draft', -- draft, running, paused, completed
caller_id TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Contatti da chiamare
CREATE TABLE contacts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID REFERENCES tenants(id) ON DELETE CASCADE,
campaign_id UUID REFERENCES campaigns(id) ON DELETE CASCADE,
phone TEXT NOT NULL,
status TEXT DEFAULT 'pending', -- pending, calling, answered, failed, busy
called_at TIMESTAMPTZ,
duration_seconds INTEGER
);
Dependency injection del tenant in FastAPI
Ogni richiesta API porta un token JWT che identifica il tenant. FastAPI estrae il tenant_id tramite una dependency:
from fastapi import Depends, HTTPException, Header
from jose import jwt, JWTError
SECRET_KEY = "cambia-questa-chiave"
async def get_current_tenant(authorization: str = Header(...)) -> str:
try:
token = authorization.split("Bearer ")[-1]
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
tenant_id = payload.get("tenant_id")
if not tenant_id:
raise HTTPException(status_code=401, detail="Token non valido")
return tenant_id
except JWTError:
raise HTTPException(status_code=401, detail="Token non valido")
Uso nelle route
@app.get("/campaigns")
async def list_campaigns(
tenant_id: str = Depends(get_current_tenant),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(
select(Campaign).where(Campaign.tenant_id == tenant_id)
)
return result.scalars().all()
In questo modo è impossibile che un tenant veda i dati di un altro — il filtro è applicato a livello di codice su ogni query.
La coda chiamate con Redis
Quando una campagna viene avviata, i contatti da chiamare vengono caricati in una lista Redis. Il worker Celery li consuma uno alla volta rispettando il limite di chiamate concorrenti configurato per tenant.
import redis.asyncio as redis
r = redis.from_url("redis://localhost:6379")
async def enqueue_campaign(campaign_id: str, contacts: list[str]):
# Chiave specifica per campagna: evita collisioni tra tenant
key = f"queue:campaign:{campaign_id}"
await r.delete(key)
if contacts:
await r.rpush(key, *contacts)
async def dequeue_contact(campaign_id: str) -> str | None:
key = f"queue:campaign:{campaign_id}"
# LPOP è atomico — nessun rischio di doppia chiamata allo stesso numero
result = await r.lpop(key)
return result.decode() if result else None
Lo stato in tempo reale (quante chiamate attive, quante completate) viene tenuto in Redis con chiavi separate:
async def update_campaign_stats(campaign_id: str, field: str, increment: int = 1):
key = f"stats:campaign:{campaign_id}"
await r.hincrby(key, field, increment)
await r.expire(key, 86400) # TTL 24h
PJSUA2: setup e pitfalls principali
PJSUA2 è la parte più complessa del sistema. È una libreria C++ con binding Python — potente ma con alcuni comportamenti non ovvi.
Inizializzazione
import pjsua2 as pj
class SIPEngine:
def __init__(self):
self.ep = pj.Endpoint()
ep_cfg = pj.EpConfig()
ep_cfg.logConfig.level = 0 # silenzia i log in produzione
self.ep.libCreate()
self.ep.libInit(ep_cfg)
# Transport UDP
tcp_cfg = pj.TransportConfig()
tcp_cfg.port = 5060
self.ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, tcp_cfg)
self.ep.libStart()
Pitfall 1: PJSUA2 non è thread-safe
Il problema più comune. PJSUA2 ha il suo event loop interno — non puoi chiamare metodi da thread arbitrari. La soluzione è usare un thread dedicato e comunicare tramite queue:
import threading
import queue
call_queue = queue.Queue()
def pjsua2_thread():
engine = SIPEngine()
while True:
try:
task = call_queue.get(timeout=1)
if task['action'] == 'call':
engine.make_call(task['number'], task['account'])
except queue.Empty:
pass
# Importante: dai tempo all'event loop di PJSUA2
engine.ep.libHandleEvents(10)
threading.Thread(target=pjsua2_thread, daemon=True).start()
Pitfall 2: un account SIP per tenant
Ogni tenant ha le proprie credenziali SIP. Non puoi registrare tutti sull'account di default. PJSUA2 supporta account multipli — creane uno per ogni tenant attivo e memorizza il riferimento:
accounts: dict[str, pj.Account] = {}
def register_tenant_account(tenant_id: str, sip_config: dict) -> pj.Account:
acc_cfg = pj.AccountConfig()
acc_cfg.idUri = f"sip:{sip_config['username']}@{sip_config['host']}"
acc_cfg.regConfig.registrarUri = f"sip:{sip_config['host']}"
cred = pj.AuthCredInfo()
cred.scheme = "digest"
cred.realm = "*"
cred.username = sip_config['username']
cred.dataType = 0
cred.data = sip_config['password']
acc_cfg.sipConfig.authCreds.append(cred)
account = pj.Account()
account.create(acc_cfg)
accounts[tenant_id] = account
return account
Pitfall 3: gestione degli errori nelle chiamate
Le chiamate falliscono per mille motivi — numero occupato, non raggiungibile, rifiutato. PJSUA2 notifica lo stato tramite callback. Devi sempre gestire tutti i casi:
class CallHandler(pj.Call):
def onCallState(self, prm):
info = self.getInfo()
if info.state == pj.PJSIP_INV_STATE_DISCONNECTED:
cause = info.lastStatusCode
if cause == 486: # Busy Here
update_contact_status(self.contact_id, 'busy')
elif cause == 404: # Not Found
update_contact_status(self.contact_id, 'invalid')
else:
update_contact_status(self.contact_id, 'failed')
Integrazione FastAPI ↔ Celery ↔ PJSUA2
Il flusso completo quando si avvia una campagna:
- Il frontend chiama
POST /campaigns/{id}/start - FastAPI valida la richiesta e pubblica un task su Celery
- Il worker Celery carica i contatti in Redis e avvia il loop di chiamate
- Per ogni contatto, il worker invia un comando alla queue di PJSUA2
- PJSUA2 effettua la chiamata e notifica lo stato tramite callback
- La callback aggiorna PostgreSQL e Redis con il risultato
# FastAPI route
@app.post("/campaigns/{campaign_id}/start")
async def start_campaign(
campaign_id: str,
tenant_id: str = Depends(get_current_tenant),
db: AsyncSession = Depends(get_db)
):
campaign = await get_campaign(db, campaign_id, tenant_id)
if campaign.status != 'draft':
raise HTTPException(400, "Campagna già avviata o completata")
# Aggiorna stato
campaign.status = 'running'
await db.commit()
# Lancia il worker Celery in background
run_campaign.delay(campaign_id, tenant_id)
return {"status": "started"}
Celery task
@celery_app.task
def run_campaign(campaign_id: str, tenant_id: str):
contacts = fetch_pending_contacts(campaign_id)
enqueue_campaign_sync(campaign_id, [c.phone for c in contacts])
while True:
phone = dequeue_contact_sync(campaign_id)
if not phone:
break
call_queue.put({
'action': 'call',
'number': phone,
'campaign_id': campaign_id,
'tenant_id': tenant_id
})
time.sleep(2) # intervallo tra una chiamata e la successiva
Conclusioni
Il progetto ha richiesto circa tre settimane di sviluppo full-time. I punti che richiedono più attenzione sono la gestione del thread PJSUA2 e l'isolamento corretto dei tenant — errori in questi due punti portano a bug difficili da riprodurre in produzione.
Il vantaggio rispetto a usare un servizio cloud è il controllo totale: nessun costo per chiamata, nessun limite di concorrenza imposto dal fornitore, integrazione diretta con i trunk SIP aziendali.