Esempi Python - API Salabam Solutions
Questa sezione fornisce esempi completi per integrare le API Salabam Solutions in applicazioni Python.
Prerequisiti
- Python: 3.8 o superiore
- pip per gestione dipendenze
- Conoscenza base di Python e API REST
Installazione dipendenze
Per progetti asincroni:
Per progetti Django/Flask:
Client API Base
1. Client sincrono
# salabam_client.py
import requests
import time
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class SalabamAPIError(Exception):
"""Eccezione personalizzata per errori API Salabam"""
def __init__(self, message: str, status_code: Optional[int] = None, response: Optional[Dict] = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class SalabamAPIClient:
"""Client per le API Salabam Solutions"""
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.salabam.solutions/v3"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: Optional[datetime] = None
# Configurazione session
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'SalabamPythonClient/1.0'
})
# Timeout e retry
self.timeout = 30
self.max_retries = 3
def authenticate(self) -> Dict[str, Any]:
"""Esegue autenticazione e ottiene token"""
try:
response = self.session.post(
f"{self.base_url}/auth/login",
json={
'client_id': self.client_id,
'client_secret': self.client_secret
},
timeout=self.timeout
)
if not response.ok:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
raise SalabamAPIError(
f"Authentication failed: {error_data.get('message', response.text)}",
response.status_code,
error_data
)
data = response.json()
self.access_token = data['access_token']
self.refresh_token = data.get('refresh_token')
expires_in = data.get('expires_in', 3600)
self.token_expiry = datetime.now() + timedelta(seconds=expires_in)
logger.info("Successfully authenticated with Salabam API")
return data
except requests.exceptions.RequestException as e:
raise SalabamAPIError(f"Network error during authentication: {str(e)}")
def refresh_access_token(self) -> Dict[str, Any]:
"""Refresh del token di accesso"""
if not self.refresh_token:
logger.warning("No refresh token available, re-authenticating")
return self.authenticate()
try:
response = self.session.post(
f"{self.base_url}/auth/refresh",
json={'refresh_token': self.refresh_token},
timeout=self.timeout
)
if not response.ok:
logger.warning("Refresh token failed, re-authenticating")
return self.authenticate()
data = response.json()
self.access_token = data['access_token']
expires_in = data.get('expires_in', 3600)
self.token_expiry = datetime.now() + timedelta(seconds=expires_in)
logger.info("Successfully refreshed access token")
return data
except requests.exceptions.RequestException:
logger.warning("Network error during token refresh, re-authenticating")
return self.authenticate()
def _is_token_expired(self) -> bool:
"""Controlla se il token è scaduto o vicino alla scadenza"""
if not self.token_expiry:
return True
# Refresh 5 minuti prima della scadenza
return datetime.now() > (self.token_expiry - timedelta(minutes=5))
def _ensure_valid_token(self):
"""Assicura che ci sia un token valido"""
if not self.access_token or self._is_token_expired():
if self.refresh_token:
self.refresh_access_token()
else:
self.authenticate()
def request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Esegue richiesta autenticata con retry automatico"""
self._ensure_valid_token()
url = f"{self.base_url}{endpoint}"
headers = kwargs.pop('headers', {})
headers['Authorization'] = f"Bearer {self.access_token}"
for attempt in range(self.max_retries):
try:
response = self.session.request(
method=method,
url=url,
headers=headers,
timeout=self.timeout,
**kwargs
)
# Auto-retry su 401 (token scaduto)
if response.status_code == 401 and attempt == 0:
logger.info("Token expired, refreshing...")
self.refresh_access_token()
headers['Authorization'] = f"Bearer {self.access_token}"
continue
# Log rate limiting
self._log_rate_limit(response)
return response
except requests.exceptions.RequestException as e:
if attempt == self.max_retries - 1:
raise SalabamAPIError(f"Request failed after {self.max_retries} attempts: {str(e)}")
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"Request failed (attempt {attempt + 1}), retrying in {wait_time}s...")
time.sleep(wait_time)
raise SalabamAPIError("Max retries exceeded")
def _log_rate_limit(self, response: requests.Response):
"""Log informazioni rate limiting"""
remaining = response.headers.get('X-RateLimit-Remaining')
reset = response.headers.get('X-RateLimit-Reset')
if remaining and int(remaining) < 10:
logger.warning(f"Rate limit warning: {remaining} requests remaining")
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET request"""
response = self.request('GET', endpoint, params=params)
if not response.ok:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
raise SalabamAPIError(
f"GET {endpoint} failed: {error_data.get('message', response.text)}",
response.status_code,
error_data
)
return response.json()
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""POST request"""
response = self.request('POST', endpoint, json=data)
if not response.ok:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
raise SalabamAPIError(
f"POST {endpoint} failed: {error_data.get('message', response.text)}",
response.status_code,
error_data
)
return response.json()
def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""PUT request"""
response = self.request('PUT', endpoint, json=data)
if not response.ok:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
raise SalabamAPIError(
f"PUT {endpoint} failed: {error_data.get('message', response.text)}",
response.status_code,
error_data
)
return response.json()
def delete(self, endpoint: str) -> bool:
"""DELETE request"""
response = self.request('DELETE', endpoint)
if response.status_code == 204:
return True
elif not response.ok:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
raise SalabamAPIError(
f"DELETE {endpoint} failed: {error_data.get('message', response.text)}",
response.status_code,
error_data
)
return True
2. Modelli dati con Pydantic
# models.py
from pydantic import BaseModel, EmailStr, validator
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
class BeneficiaryStatus(str, Enum):
PENDING = "pending"
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
REJECTED = "rejected"
class Address(BaseModel):
street: str
city: str
postal_code: str
country: str = "IT"
province: Optional[str] = None
class Preferences(BaseModel):
newsletter: bool = True
sms_notifications: bool = False
language: str = "it"
class BeneficiaryCreate(BaseModel):
first_name: str
last_name: str
email: EmailStr
phone: Optional[str] = None
date_of_birth: Optional[str] = None
fiscal_code: Optional[str] = None
address: Optional[Address] = None
metadata: Optional[Dict[str, Any]] = None
preferences: Optional[Preferences] = None
@validator('first_name', 'last_name')
def validate_names(cls, v):
if not v or len(v.strip()) < 2:
raise ValueError('Nome e cognome devono avere almeno 2 caratteri')
return v.strip()
@validator('phone')
def validate_phone(cls, v):
if v is None:
return v
import re
if not re.match(r'^\+\d{1,3}\s\d{3}\s\d{3}\s\d{4}$', v):
raise ValueError('Formato telefono non valido (usa +39 XXX XXX XXXX)')
return v
@validator('date_of_birth')
def validate_birth_date(cls, v):
if v is None:
return v
try:
birth_date = datetime.strptime(v, '%Y-%m-%d')
age = (datetime.now() - birth_date).days / 365.25
if age < 16:
raise ValueError('Età minima 16 anni')
return v
except ValueError as e:
if "does not match format" in str(e):
raise ValueError('Formato data non valido (usa YYYY-MM-DD)')
raise
class Beneficiary(BeneficiaryCreate):
id: int
status: BeneficiaryStatus
created_at: datetime
updated_at: datetime
last_activity: Optional[datetime] = None
class BeneficiaryUpdate(BaseModel):
first_name: Optional[str] = None
last_name: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
date_of_birth: Optional[str] = None
fiscal_code: Optional[str] = None
address: Optional[Address] = None
metadata: Optional[Dict[str, Any]] = None
preferences: Optional[Preferences] = None
@validator('first_name', 'last_name')
def validate_names(cls, v):
if v is not None and len(v.strip()) < 2:
raise ValueError('Nome e cognome devono avere almeno 2 caratteri')
return v.strip() if v else v
class PaginationMeta(BaseModel):
current_page: int
total_pages: int
total_count: int
per_page: int
class PaginationLinks(BaseModel):
first: Optional[str] = None
last: Optional[str] = None
prev: Optional[str] = None
next: Optional[str] = None
class BeneficiaryListResponse(BaseModel):
data: List[Beneficiary]
meta: PaginationMeta
links: PaginationLinks
3. Manager beneficiari
# beneficiary_manager.py
import logging
from typing import Optional, Dict, List, Any
from models import BeneficiaryCreate, BeneficiaryUpdate, Beneficiary, BeneficiaryListResponse
from salabam_client import SalabamAPIClient, SalabamAPIError
logger = logging.getLogger(__name__)
class BeneficiaryManager:
"""Manager per operazioni sui beneficiari"""
def __init__(self, api_client: SalabamAPIClient):
self.api_client = api_client
def create(self, beneficiary_data: BeneficiaryCreate) -> Beneficiary:
"""Crea un nuovo beneficiario"""
try:
# Validazione tramite Pydantic
validated_data = beneficiary_data.dict(exclude_none=True)
result = self.api_client.post('/beneficiaries', validated_data)
logger.info(f"Beneficiario creato: ID {result['id']}, Email: {beneficiary_data.email}")
return Beneficiary(**result)
except SalabamAPIError as e:
logger.error(f"Errore API creazione beneficiario: {e}")
raise
except Exception as e:
logger.error(f"Errore creazione beneficiario: {e}")
raise SalabamAPIError(f"Errore durante creazione: {str(e)}")
def get_by_id(self, beneficiary_id: int) -> Beneficiary:
"""Ottieni beneficiario per ID"""
try:
result = self.api_client.get(f'/beneficiaries/{beneficiary_id}')
return Beneficiary(**result)
except SalabamAPIError as e:
if e.status_code == 404:
raise SalabamAPIError(f"Beneficiario {beneficiary_id} non trovato", 404)
raise
def search(self, filters: Optional[Dict[str, Any]] = None) -> BeneficiaryListResponse:
"""Ricerca beneficiari con filtri"""
params = filters or {}
# Valori di default
if 'page' not in params:
params['page'] = 1
if 'per_page' not in params:
params['per_page'] = 20
# Rimuovi parametri vuoti
params = {k: v for k, v in params.items() if v is not None and v != ''}
try:
result = self.api_client.get('/beneficiaries', params)
return BeneficiaryListResponse(**result)
except SalabamAPIError as e:
logger.error(f"Errore ricerca beneficiari: {e}")
raise
def update(self, beneficiary_id: int, update_data: BeneficiaryUpdate) -> Beneficiary:
"""Aggiorna beneficiario esistente"""
try:
validated_data = update_data.dict(exclude_none=True)
if not validated_data:
raise ValueError("Nessun dato da aggiornare")
result = self.api_client.put(f'/beneficiaries/{beneficiary_id}', validated_data)
logger.info(f"Beneficiario {beneficiary_id} aggiornato")
return Beneficiary(**result)
except SalabamAPIError as e:
logger.error(f"Errore aggiornamento beneficiario {beneficiary_id}: {e}")
raise
def delete(self, beneficiary_id: int) -> bool:
"""Elimina beneficiario"""
try:
self.api_client.delete(f'/beneficiaries/{beneficiary_id}')
logger.info(f"Beneficiario {beneficiary_id} eliminato")
return True
except SalabamAPIError as e:
logger.error(f"Errore eliminazione beneficiario {beneficiary_id}: {e}")
raise
def autocomplete(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
"""Ricerca con autocomplete"""
if len(query) < 2:
return []
try:
filters = {
'search': query,
'per_page': limit
}
response = self.search(filters)
return [
{
'id': b.id,
'label': f"{b.first_name} {b.last_name}",
'email': b.email,
'value': b.dict()
}
for b in response.data
]
except SalabamAPIError as e:
logger.error(f"Errore autocomplete: {e}")
return []
def bulk_import(self, beneficiaries: List[BeneficiaryCreate],
batch_size: int = 5, delay: float = 0.2,
on_progress: Optional[callable] = None) -> Dict[str, Any]:
"""Import di massa con controllo rate limiting"""
import time
results = {
'processed': 0,
'created': 0,
'errors': 0,
'duplicates': 0,
'details': []
}
total = len(beneficiaries)
for i in range(0, total, batch_size):
batch = beneficiaries[i:i + batch_size]
for j, beneficiary in enumerate(batch):
global_index = i + j
results['processed'] += 1
try:
# Aggiungi metadata di import
if not beneficiary.metadata:
beneficiary.metadata = {}
beneficiary.metadata.update({
'source': 'bulk_import',
'import_batch': datetime.now().isoformat(),
'original_index': global_index
})
result = self.create(beneficiary)
results['created'] += 1
results['details'].append({
'index': global_index,
'status': 'success',
'id': result.id,
'email': beneficiary.email
})
logger.info(f"✓ Importato: {beneficiary.email} (ID: {result.id})")
except SalabamAPIError as e:
results['errors'] += 1
if 'già esistente' in str(e) or 'already exists' in str(e):
results['duplicates'] += 1
status = 'duplicate'
logger.warning(f"⚠ Duplicato: {beneficiary.email}")
else:
status = 'error'
logger.error(f"✗ Errore: {beneficiary.email} - {e}")
results['details'].append({
'index': global_index,
'status': status,
'email': beneficiary.email,
'error': str(e)
})
# Progress callback
if on_progress:
on_progress({
'processed': results['processed'],
'total': total,
'created': results['created'],
'errors': results['errors']
})
# Pausa tra batch per rate limiting
if i + batch_size < total:
time.sleep(delay)
return results
Esempi pratici
1. Script CLI di base
# cli_example.py
#!/usr/bin/env python3
import os
import sys
import argparse
import logging
from datetime import datetime
from dotenv import load_dotenv
from salabam_client import SalabamAPIClient, SalabamAPIError
from beneficiary_manager import BeneficiaryManager
from models import BeneficiaryCreate, BeneficiaryUpdate, Address
# Carica variabili d'ambiente
load_dotenv()
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def setup_client():
"""Inizializza client API"""
client_id = os.getenv('SALABAM_CLIENT_ID')
client_secret = os.getenv('SALABAM_CLIENT_SECRET')
api_url = os.getenv('SALABAM_API_URL', 'https://api.salabam.solutions/v3')
if not client_id or not client_secret:
logger.error("Variabili d'ambiente SALABAM_CLIENT_ID e SALABAM_CLIENT_SECRET richieste")
sys.exit(1)
api_client = SalabamAPIClient(client_id, client_secret, api_url)
return BeneficiaryManager(api_client)
def create_beneficiary(args):
"""Crea nuovo beneficiario"""
manager = setup_client()
try:
# Prepara dati beneficiario
beneficiary_data = BeneficiaryCreate(
first_name=args.first_name,
last_name=args.last_name,
email=args.email,
phone=args.phone,
date_of_birth=args.birth_date
)
# Aggiungi indirizzo se fornito
if args.address:
parts = args.address.split(',')
if len(parts) >= 3:
beneficiary_data.address = Address(
street=parts[0].strip(),
city=parts[1].strip(),
postal_code=parts[2].strip()
)
result = manager.create(beneficiary_data)
print(f"✅ Beneficiario creato: ID {result.id}")
print(f" Nome: {result.first_name} {result.last_name}")
print(f" Email: {result.email}")
print(f" Stato: {result.status}")
except SalabamAPIError as e:
logger.error(f"Errore API: {e}")
sys.exit(1)
except Exception as e:
logger.error(f"Errore: {e}")
sys.exit(1)
def search_beneficiaries(args):
"""Ricerca beneficiari"""
manager = setup_client()
try:
filters = {}
if args.search:
filters['search'] = args.search
if args.status:
filters['status'] = args.status
if args.page:
filters['page'] = args.page
response = manager.search(filters)
print(f"📊 Trovati {response.meta.total_count} beneficiari")
print(f" Pagina {response.meta.current_page} di {response.meta.total_pages}")
print()
for beneficiary in response.data:
print(f"🆔 {beneficiary.id} - {beneficiary.first_name} {beneficiary.last_name}")
print(f" 📧 {beneficiary.email}")
print(f" 📱 {beneficiary.phone or 'N/A'}")
print(f" 📊 {beneficiary.status}")
print(f" 📅 {beneficiary.created_at.strftime('%d/%m/%Y')}")
print()
except SalabamAPIError as e:
logger.error(f"Errore ricerca: {e}")
sys.exit(1)
def import_csv(args):
"""Import da file CSV"""
import csv
manager = setup_client()
if not os.path.exists(args.csv_file):
logger.error(f"File CSV non trovato: {args.csv_file}")
sys.exit(1)
beneficiaries = []
try:
with open(args.csv_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
beneficiary = BeneficiaryCreate(
first_name=row['nome'].strip(),
last_name=row['cognome'].strip(),
email=row['email'].strip().lower(),
phone=row.get('telefono', '').strip() or None,
date_of_birth=row.get('data_nascita', '') or None
)
# Aggiungi metadata
beneficiary.metadata = {
'source': 'csv_import',
'filename': os.path.basename(args.csv_file)
}
beneficiaries.append(beneficiary)
print(f"📄 CSV caricato: {len(beneficiaries)} beneficiari")
# Progress callback
def show_progress(progress):
percent = int((progress['processed'] / progress['total']) * 100)
print(f"\r📊 Progresso: {percent}% ({progress['processed']}/{progress['total']})", end='')
results = manager.bulk_import(
beneficiaries,
batch_size=args.batch_size,
delay=args.delay,
on_progress=show_progress
)
print(f"\n\n✅ Import completato!")
print(f" Processati: {results['processed']}")
print(f" Creati: {results['created']}")
print(f" Errori: {results['errors']}")
print(f" Duplicati: {results['duplicates']}")
# Mostra dettagli errori
errors = [d for d in results['details'] if d['status'] == 'error']
if errors:
print(f"\n❌ Dettagli errori:")
for error in errors[:10]: # Mostra solo i primi 10
print(f" {error['email']}: {error['error']}")
if len(errors) > 10:
print(f" ... e altri {len(errors) - 10} errori")
except Exception as e:
logger.error(f"Errore durante import: {e}")
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description='CLI per API Salabam Solutions')
subparsers = parser.add_subparsers(dest='command', help='Comandi disponibili')
# Comando create
create_parser = subparsers.add_parser('create', help='Crea nuovo beneficiario')
create_parser.add_argument('--first-name', required=True, help='Nome')
create_parser.add_argument('--last-name', required=True, help='Cognome')
create_parser.add_argument('--email', required=True, help='Email')
create_parser.add_argument('--phone', help='Telefono (+39 XXX XXX XXXX)')
create_parser.add_argument('--birth-date', help='Data nascita (YYYY-MM-DD)')
create_parser.add_argument('--address', help='Indirizzo (strada,città,cap)')
# Comando search
search_parser = subparsers.add_parser('search', help='Ricerca beneficiari')
search_parser.add_argument('--search', help='Testo da cercare')
search_parser.add_argument('--status', choices=['active', 'pending', 'inactive'], help='Filtra per stato')
search_parser.add_argument('--page', type=int, default=1, help='Numero pagina')
# Comando import
import_parser = subparsers.add_parser('import', help='Import da CSV')
import_parser.add_argument('csv_file', help='File CSV da importare')
import_parser.add_argument('--batch-size', type=int, default=5, help='Dimensione batch')
import_parser.add_argument('--delay', type=float, default=0.2, help='Delay tra batch (secondi)')
args = parser.parse_args()
if args.command == 'create':
create_beneficiary(args)
elif args.command == 'search':
search_beneficiaries(args)
elif args.command == 'import':
import_csv(args)
else:
parser.print_help()
if __name__ == '__main__':
main()
2. Integrazione Flask
# flask_app.py
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import os
import logging
from dotenv import load_dotenv
from salabam_client import SalabamAPIClient, SalabamAPIError
from beneficiary_manager import BeneficiaryManager
from models import BeneficiaryCreate, BeneficiaryUpdate
load_dotenv()
app = Flask(__name__)
CORS(app)
# Configurazione logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Inizializzazione client API
api_client = SalabamAPIClient(
os.getenv('SALABAM_CLIENT_ID'),
os.getenv('SALABAM_CLIENT_SECRET'),
os.getenv('SALABAM_API_URL', 'https://api.salabam.solutions/v3')
)
beneficiary_manager = BeneficiaryManager(api_client)
@app.errorhandler(SalabamAPIError)
def handle_api_error(e):
"""Gestione errori API"""
return jsonify({
'error': str(e),
'status_code': e.status_code
}), e.status_code or 500
@app.errorhandler(ValueError)
def handle_validation_error(e):
"""Gestione errori di validazione"""
return jsonify({
'error': 'Validation error',
'message': str(e)
}), 422
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat()
})
@app.route('/api/beneficiaries', methods=['GET'])
def get_beneficiaries():
"""Lista beneficiari con filtri"""
try:
filters = {
'search': request.args.get('search'),
'status': request.args.get('status'),
'page': request.args.get('page', 1, type=int),
'per_page': min(request.args.get('per_page', 20, type=int), 100)
}
# Rimuovi filtri vuoti
filters = {k: v for k, v in filters.items() if v is not None and v != ''}
response = beneficiary_manager.search(filters)
return jsonify({
'data': [b.dict() for b in response.data],
'meta': response.meta.dict(),
'links': response.links.dict()
})
except SalabamAPIError as e:
logger.error(f"Error fetching beneficiaries: {e}")
raise
@app.route('/api/beneficiaries', methods=['POST'])
def create_beneficiary():
"""Crea nuovo beneficiario"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'JSON data required'}), 400
# Aggiungi metadata della richiesta
if 'metadata' not in data:
data['metadata'] = {}
data['metadata'].update({
'source': 'flask_api',
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent', '')
})
beneficiary_data = BeneficiaryCreate(**data)
result = beneficiary_manager.create(beneficiary_data)
return jsonify(result.dict()), 201
except ValueError as e:
logger.warning(f"Validation error: {e}")
raise
except SalabamAPIError as e:
logger.error(f"API error creating beneficiary: {e}")
raise
@app.route('/api/beneficiaries/<int:beneficiary_id>', methods=['GET'])
def get_beneficiary(beneficiary_id):
"""Ottieni beneficiario per ID"""
try:
beneficiary = beneficiary_manager.get_by_id(beneficiary_id)
return jsonify(beneficiary.dict())
except SalabamAPIError as e:
if e.status_code == 404:
abort(404)
raise
@app.route('/api/beneficiaries/<int:beneficiary_id>', methods=['PUT'])
def update_beneficiary(beneficiary_id):
"""Aggiorna beneficiario"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'JSON data required'}), 400
update_data = BeneficiaryUpdate(**data)
result = beneficiary_manager.update(beneficiary_id, update_data)
return jsonify(result.dict())
except ValueError as e:
logger.warning(f"Validation error: {e}")
raise
except SalabamAPIError as e:
logger.error(f"API error updating beneficiary {beneficiary_id}: {e}")
raise
@app.route('/api/beneficiaries/<int:beneficiary_id>', methods=['DELETE'])
def delete_beneficiary(beneficiary_id):
"""Elimina beneficiario"""
try:
beneficiary_manager.delete(beneficiary_id)
return '', 204
except SalabamAPIError as e:
logger.error(f"API error deleting beneficiary {beneficiary_id}: {e}")
raise
@app.route('/api/beneficiaries/autocomplete', methods=['GET'])
def autocomplete_beneficiaries():
"""Autocomplete beneficiari"""
query = request.args.get('q', '')
limit = min(request.args.get('limit', 10, type=int), 50)
if len(query) < 2:
return jsonify([])
try:
results = beneficiary_manager.autocomplete(query, limit)
return jsonify(results)
except SalabamAPIError as e:
logger.error(f"Error in autocomplete: {e}")
return jsonify([])
@app.route('/api/beneficiaries/bulk-import', methods=['POST'])
def bulk_import_beneficiaries():
"""Import di massa"""
try:
data = request.get_json()
if not data or 'beneficiaries' not in data:
return jsonify({'error': 'beneficiaries array required'}), 400
beneficiaries_data = data['beneficiaries']
options = data.get('options', {})
# Validazione dati
beneficiaries = []
for i, b_data in enumerate(beneficiaries_data):
try:
beneficiary = BeneficiaryCreate(**b_data)
beneficiaries.append(beneficiary)
except ValueError as e:
return jsonify({
'error': f'Validation error at index {i}',
'message': str(e)
}), 422
# Progress tracking (in una implementazione reale useresti WebSockets o SSE)
progress_data = []
def track_progress(progress):
progress_data.append(progress.copy())
results = beneficiary_manager.bulk_import(
beneficiaries,
batch_size=options.get('batch_size', 5),
delay=options.get('delay', 0.2),
on_progress=track_progress
)
return jsonify({
'results': results,
'progress': progress_data
})
except Exception as e:
logger.error(f"Error in bulk import: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)
3. Client asincrono
# async_client.py
import asyncio
import aiohttp
import time
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class AsyncSalabamAPIClient:
"""Client asincrono per le API Salabam Solutions"""
def __init__(self, client_id: str, client_secret: str, base_url: str = "https://api.salabam.solutions/v3"):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = base_url.rstrip('/')
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.token_expiry: Optional[datetime] = None
# Session asincrona
self.session: Optional[aiohttp.ClientSession] = None
# Rate limiting
self._last_request_time = 0
self._min_request_interval = 0.1 # 100ms tra richieste
async def __aenter__(self):
"""Context manager entry"""
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(
timeout=timeout,
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
'User-Agent': 'SalabamAsyncPythonClient/1.0'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
if self.session:
await self.session.close()
async def _rate_limit(self):
"""Applica rate limiting"""
now = time.time()
elapsed = now - self._last_request_time
if elapsed < self._min_request_interval:
await asyncio.sleep(self._min_request_interval - elapsed)
self._last_request_time = time.time()
async def authenticate(self) -> Dict[str, Any]:
"""Autenticazione asincrona"""
await self._rate_limit()
try:
async with self.session.post(
f"{self.base_url}/auth/login",
json={
'client_id': self.client_id,
'client_secret': self.client_secret
}
) as response:
if not response.ok:
error_text = await response.text()
raise aiohttp.ClientError(f"Authentication failed: {error_text}")
data = await response.json()
self.access_token = data['access_token']
self.refresh_token = data.get('refresh_token')
expires_in = data.get('expires_in', 3600)
self.token_expiry = datetime.now() + timedelta(seconds=expires_in)
logger.info("Successfully authenticated with Salabam API")
return data
except Exception as e:
logger.error(f"Authentication error: {e}")
raise
async def _ensure_valid_token(self):
"""Assicura token valido"""
if not self.access_token or self._is_token_expired():
await self.authenticate()
def _is_token_expired(self) -> bool:
"""Controlla se token è scaduto"""
if not self.token_expiry:
return True
return datetime.now() > (self.token_expiry - timedelta(minutes=5))
async def request(self, method: str, endpoint: str, **kwargs) -> aiohttp.ClientResponse:
"""Richiesta asincrona autenticata"""
await self._ensure_valid_token()
await self._rate_limit()
url = f"{self.base_url}{endpoint}"
headers = kwargs.pop('headers', {})
headers['Authorization'] = f"Bearer {self.access_token}"
try:
async with self.session.request(
method=method,
url=url,
headers=headers,
**kwargs
) as response:
return response
except Exception as e:
logger.error(f"Request failed for {endpoint}: {e}")
raise
async def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""GET asincrona"""
response = await self.request('GET', endpoint, params=params)
if not response.ok:
error_text = await response.text()
raise aiohttp.ClientError(f"GET {endpoint} failed: {error_text}")
return await response.json()
async def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
"""POST asincrona"""
response = await self.request('POST', endpoint, json=data)
if not response.ok:
error_text = await response.text()
raise aiohttp.ClientError(f"POST {endpoint} failed: {error_text}")
return await response.json()
class AsyncBeneficiaryManager:
"""Manager asincrono per beneficiari"""
def __init__(self, api_client: AsyncSalabamAPIClient):
self.api_client = api_client
async def create(self, beneficiary_data: Dict[str, Any]) -> Dict[str, Any]:
"""Crea beneficiario asincronamente"""
return await self.api_client.post('/beneficiaries', beneficiary_data)
async def search(self, filters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Ricerca asincrona"""
params = filters or {}
return await self.api_client.get('/beneficiaries', params)
async def bulk_create_concurrent(self, beneficiaries: List[Dict[str, Any]],
max_concurrent: int = 5) -> List[Dict[str, Any]]:
"""Creazione concorrente di beneficiari"""
semaphore = asyncio.Semaphore(max_concurrent)
async def create_with_semaphore(beneficiary_data):
async with semaphore:
try:
return await self.create(beneficiary_data)
except Exception as e:
logger.error(f"Error creating beneficiary {beneficiary_data.get('email')}: {e}")
return {'error': str(e), 'email': beneficiary_data.get('email')}
tasks = [create_with_semaphore(b) for b in beneficiaries]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# Esempio di utilizzo asincrono
async def async_example():
"""Esempio di utilizzo del client asincrono"""
async with AsyncSalabamAPIClient(
client_id=os.getenv('SALABAM_CLIENT_ID'),
client_secret=os.getenv('SALABAM_CLIENT_SECRET')
) as client:
manager = AsyncBeneficiaryManager(client)
# Ricerca asincrona
results = await manager.search({'status': 'active', 'per_page': 10})
print(f"Trovati {len(results['data'])} beneficiari attivi")
# Creazione concorrente
new_beneficiaries = [
{
'first_name': f'Test{i}',
'last_name': f'User{i}',
'email': f'test{i}@example.com',
'metadata': {'source': 'async_test'}
}
for i in range(5)
]
results = await manager.bulk_create_concurrent(new_beneficiaries, max_concurrent=3)
successful = [r for r in results if 'error' not in r]
errors = [r for r in results if 'error' in r]
print(f"Creati: {len(successful)}, Errori: {len(errors)}")
if __name__ == '__main__':
import os
from dotenv import load_dotenv
load_dotenv()
asyncio.run(async_example())
4. Integrazione Django
# django_integration/models.py
from django.db import models
from django.contrib.auth.models import User
class BeneficiarySync(models.Model):
"""Model per sincronizzazione beneficiari con API Salabam"""
salabam_id = models.IntegerField(unique=True)
email = models.EmailField()
first_name = models.CharField(max_length=100)
last_name = models.CharField(max_length=100)
status = models.CharField(max_length=20)
last_synced = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'beneficiary_sync'
indexes = [
models.Index(fields=['salabam_id']),
models.Index(fields=['email']),
]
# django_integration/services.py
from django.conf import settings
from django.core.cache import cache
import logging
from salabam_client import SalabamAPIClient, SalabamAPIError
from beneficiary_manager import BeneficiaryManager
from .models import BeneficiarySync
logger = logging.getLogger(__name__)
class DjangoSalabamService:
"""Servizio Django per integrazione API Salabam"""
def __init__(self):
self.api_client = SalabamAPIClient(
settings.SALABAM_CLIENT_ID,
settings.SALABAM_CLIENT_SECRET,
settings.SALABAM_API_URL
)
self.beneficiary_manager = BeneficiaryManager(self.api_client)
def sync_beneficiary_from_api(self, salabam_id: int) -> BeneficiarySync:
"""Sincronizza beneficiario dalla API"""
try:
# Controlla cache
cache_key = f"beneficiary_{salabam_id}"
cached_data = cache.get(cache_key)
if cached_data:
beneficiary_data = cached_data
else:
beneficiary = self.beneficiary_manager.get_by_id(salabam_id)
beneficiary_data = beneficiary.dict()
# Cache per 5 minuti
cache.set(cache_key, beneficiary_data, 300)
# Aggiorna o crea record locale
sync_obj, created = BeneficiarySync.objects.update_or_create(
salabam_id=salabam_id,
defaults={
'email': beneficiary_data['email'],
'first_name': beneficiary_data['first_name'],
'last_name': beneficiary_data['last_name'],
'status': beneficiary_data['status']
}
)
action = "created" if created else "updated"
logger.info(f"Beneficiary {salabam_id} {action} locally")
return sync_obj
except SalabamAPIError as e:
logger.error(f"Error syncing beneficiary {salabam_id}: {e}")
raise
# django_integration/views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils.decorators import method_decorator
from django.views import View
import json
from .services import DjangoSalabamService
from salabam_client import SalabamAPIError
@method_decorator(csrf_exempt, name='dispatch')
class BeneficiaryAPIView(View):
"""Vista API per gestione beneficiari"""
def __init__(self):
super().__init__()
self.service = DjangoSalabamService()
def get(self, request):
"""Lista beneficiari"""
try:
filters = {
'search': request.GET.get('search'),
'status': request.GET.get('status'),
'page': int(request.GET.get('page', 1)),
'per_page': min(int(request.GET.get('per_page', 20)), 100)
}
# Rimuovi filtri vuoti
filters = {k: v for k, v in filters.items() if v is not None and v != ''}
response = self.service.beneficiary_manager.search(filters)
return JsonResponse({
'data': [b.dict() for b in response.data],
'meta': response.meta.dict(),
'links': response.links.dict()
})
except SalabamAPIError as e:
return JsonResponse({'error': str(e)}, status=e.status_code or 500)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
def post(self, request):
"""Crea beneficiario"""
try:
data = json.loads(request.body)
# Aggiungi metadata Django
if 'metadata' not in data:
data['metadata'] = {}
data['metadata'].update({
'source': 'django_app',
'user_id': request.user.id if request.user.is_authenticated else None,
'ip_address': self._get_client_ip(request)
})
from models import BeneficiaryCreate
beneficiary_data = BeneficiaryCreate(**data)
result = self.service.beneficiary_manager.create(beneficiary_data)
# Sincronizza localmente
self.service.sync_beneficiary_from_api(result.id)
return JsonResponse(result.dict(), status=201)
except ValueError as e:
return JsonResponse({'error': str(e)}, status=422)
except SalabamAPIError as e:
return JsonResponse({'error': str(e)}, status=e.status_code or 500)
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
def _get_client_ip(self, request):
"""Ottieni IP del client"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
# django_integration/management/commands/sync_beneficiaries.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta
from ...services import DjangoSalabamService
from ...models import BeneficiarySync
class Command(BaseCommand):
help = 'Sincronizza beneficiari con API Salabam'
def add_arguments(self, parser):
parser.add_argument(
'--full',
action='store_true',
help='Sincronizzazione completa (ignora ultima sincronizzazione)',
)
parser.add_argument(
'--limit',
type=int,
default=100,
help='Numero massimo di beneficiari da sincronizzare',
)
def handle(self, *args, **options):
service = DjangoSalabamService()
try:
# Determina filtri
filters = {'per_page': 100}
if not options['full']:
# Solo beneficiari modificati nelle ultime 24h
yesterday = timezone.now() - timedelta(days=1)
# Nota: questo richiederebbe un filtro 'updated_since' nell'API
# filters['updated_since'] = yesterday.isoformat()
page = 1
total_synced = 0
while total_synced < options['limit']:
self.stdout.write(f"Sincronizzando pagina {page}...")
filters['page'] = page
response = service.beneficiary_manager.search(filters)
if not response.data:
break
for beneficiary in response.data:
if total_synced >= options['limit']:
break
try:
service.sync_beneficiary_from_api(beneficiary.id)
total_synced += 1
if total_synced % 10 == 0:
self.stdout.write(f"Sincronizzati: {total_synced}")
except Exception as e:
self.stderr.write(f"Errore sync beneficiario {beneficiary.id}: {e}")
page += 1
# Se abbiamo processato tutti i beneficiari della pagina corrente
if len(response.data) < filters['per_page']:
break
self.stdout.write(
self.style.SUCCESS(f'Sincronizzazione completata: {total_synced} beneficiari')
)
except Exception as e:
self.stderr.write(f"Errore durante sincronizzazione: {e}")
raise
Best Practices e Performance
1. Gestione errori robusta
# error_handling.py
import functools
import time
import random
from typing import Callable, Any
def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):
"""Decorator per retry con exponential backoff"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except SalabamAPIError as e:
if e.status_code == 429: # Rate limited
# Calcola delay con jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {delay:.2f}s...")
time.sleep(delay)
continue
elif e.status_code >= 500: # Server error
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Server error, retrying in {delay:.2f}s...")
time.sleep(delay)
continue
# Altri errori o ultimo tentativo
raise
except Exception as e:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f"Unexpected error, retrying in {delay:.2f}s...")
time.sleep(delay)
continue
raise
raise SalabamAPIError(f"Operation failed after {max_retries} attempts")
return wrapper
return decorator
# Utilizzo
@retry_with_backoff(max_retries=3, base_delay=0.5)
def create_beneficiary_with_retry(manager, beneficiary_data):
return manager.create(beneficiary_data)
2. Caching intelligente
# caching.py
import json
import hashlib
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class APICache:
"""Cache per risultati API"""
def __init__(self, default_ttl: int = 300):
self._cache = {}
self.default_ttl = default_ttl
def _generate_key(self, endpoint: str, params: Optional[Dict] = None) -> str:
"""Genera chiave cache da endpoint e parametri"""
cache_data = {
'endpoint': endpoint,
'params': params or {}
}
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()
def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Any]:
"""Recupera valore dalla cache"""
key = self._generate_key(endpoint, params)
if key in self._cache:
entry = self._cache[key]
if datetime.now() < entry['expires']:
return entry['data']
else:
del self._cache[key]
return None
def set(self, endpoint: str, data: Any, params: Optional[Dict] = None, ttl: Optional[int] = None):
"""Salva valore in cache"""
key = self._generate_key(endpoint, params)
expires = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
self._cache[key] = {
'data': data,
'expires': expires
}
def clear(self):
"""Pulisce cache"""
self._cache.clear()
class CachedBeneficiaryManager(BeneficiaryManager):
"""Manager con caching automatico"""
def __init__(self, api_client: SalabamAPIClient):
super().__init__(api_client)
self.cache = APICache(default_ttl=300) # 5 minuti
def search(self, filters: Optional[Dict[str, Any]] = None) -> BeneficiaryListResponse:
"""Ricerca con cache"""
# Controlla cache per ricerche di sola lettura
cache_key = '/beneficiaries'
cached_result = self.cache.get(cache_key, filters)
if cached_result:
return BeneficiaryListResponse(**cached_result)
# Esegui richiesta API
result = super().search(filters)
# Cache solo per filtri specifici (non ricerche generiche)
if filters and len(filters) > 0:
self.cache.set(cache_key, result.dict(), filters, ttl=180) # 3 minuti
return result
def get_by_id(self, beneficiary_id: int) -> Beneficiary:
"""Get con cache"""
cache_key = f'/beneficiaries/{beneficiary_id}'
cached_result = self.cache.get(cache_key)
if cached_result:
return Beneficiary(**cached_result)
result = super().get_by_id(beneficiary_id)
# Cache dettagli beneficiario per 10 minuti
self.cache.set(cache_key, result.dict(), ttl=600)
return result
def create(self, beneficiary_data: BeneficiaryCreate) -> Beneficiary:
"""Create con invalidazione cache"""
result = super().create(beneficiary_data)
# Invalida cache delle liste
self.cache.clear()
return result
Questa documentazione Python fornisce esempi completi e professionali per integrare efficacemente le API Salabam Solutions in applicazioni Python moderne, sia sincrone che asincrone, con particolare attenzione alle best practices e alla gestione degli errori.