Exécuter des tests sur plusieurs dispositifs sous test simultanément réduit le temps de cycle et diminue le coût par unité. Ce guide montre comment structurer une station de test Python qui exécute des instances de test OpenHTF indépendantes en parallèle, chacune journalisant ses résultats dans TofuPilot.
Pourquoi le test parallèle de DUT est important
Le test séquentiel sur une station multi-fixture laisse de la capacité de test inutilisée. Si votre fixture accueille quatre DUT mais que votre logiciel les teste un par un, vous payez du matériel que vous n'utilisez pas.
| Stratégie | Fixture 4 DUT, test de 30 s | Débit (unités/heure) |
|---|---|---|
| Séquentiel | 4 x 30 s = 120 s par cycle | 30 |
| Parallèle (4 workers) | ~32 s par cycle (overhead) | ~112 |
Le test parallèle est le plus utile quand :
- Le temps de test est dominé par les attentes d'E/S (lectures série, stabilisation de l'alimentation, mesures RF)
- Votre fixture accueille physiquement plusieurs DUT simultanément
- Le takt time est un goulot d'étranglement en fabrication
Vue d'ensemble de l'architecture
Chaque DUT a sa propre instance de test OpenHTF exécutée dans un processus séparé. Les résultats de chaque DUT sont journalisés indépendamment dans TofuPilot.
Contrôleur de station
├── DUT 0 → Processus 0 → Test OpenHTF → TofuPilot (série : SN-001)
├── DUT 1 → Processus 1 → Test OpenHTF → TofuPilot (série : SN-002)
├── DUT 2 → Processus 2 → Test OpenHTF → TofuPilot (série : SN-003)
└── DUT 3 → Processus 3 → Test OpenHTF → TofuPilot (série : SN-004)Choisir un modèle de parallélisme
| Modèle | Idéal pour | État partagé | Impact du GIL |
|---|---|---|---|
multiprocessing | DUT CPU-bound ou isolés par instrument | Nécessite IPC (Queue, Manager) | Aucun |
threading | Tests I/O-bound, objets instrument partagés | Direct (avec verrous) | S'applique |
ProcessPoolExecutor | Dispatch parallèle simple, collecte de résultats | Queue / Manager | Aucun |
ThreadPoolExecutor | Phases de test I/O-bound légères | Direct (avec verrous) | S'applique |
Pour la plupart des stations de test hardware, multiprocessing avec un processus par DUT est le choix le plus sûr. Il isole les défauts, évite la contention du GIL et garde les logs et l'état de chaque DUT totalement indépendants.
Étape 1 : Définir les phases de test
import openhtf as htf
from openhtf.plugs import BasePlug
from openhtf.util import units
class PowerSupplyPlug(BasePlug):
"""Alimentation simulée pour un emplacement DUT unique."""
def setUp(self):
self._voltage = 5.01
self._current = 0.042
def measure_voltage(self) -> float:
return self._voltage
def measure_current(self) -> float:
return self._current
def tearDown(self):
pass
class SerialPortPlug(BasePlug):
"""Connexion série simulée vers un DUT unique."""
def setUp(self):
self._version = "2.4.1"
def query(self, command: str) -> str:
return self._version
def tearDown(self):
pass
@htf.measures(
htf.Measurement("supply_voltage")
.in_range(minimum=4.8, maximum=5.2)
.with_units(units.VOLT)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_supply_voltage(test, psu):
test.measurements.supply_voltage = psu.measure_voltage()
@htf.measures(
htf.Measurement("idle_current")
.in_range(minimum=0.010, maximum=0.150)
.with_units(units.AMPERE)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_idle_current(test, psu):
test.measurements.idle_current = psu.measure_current()
@htf.measures(
htf.Measurement("firmware_version").equals("2.4.1")
)
@htf.plug(serial=SerialPortPlug)
def check_firmware(test, serial):
version = serial.query("VERSION?").strip()
test.measurements.firmware_version = versionÉtape 2 : Créer le processus worker
Chaque worker crée son propre objet Test OpenHTF et l'exécute jusqu'à la fin.
import openhtf as htf
from tofupilot.openhtf import TofuPilot
from station.test_phases import (
measure_supply_voltage,
measure_idle_current,
check_firmware,
)
def run_test_for_dut(slot_index: int, serial_number: str) -> str:
"""Point d'entrée pour un processus worker de DUT unique."""
test = htf.Test(
measure_supply_voltage,
measure_idle_current,
check_firmware,
test_name=f"Test de production - Emplacement {slot_index}",
)
with TofuPilot(test):
test.execute(test_start=lambda: serial_number)
return serial_numberÉtape 3 : Lancer tous les DUT en parallèle
Le contrôleur scanne les numéros de série depuis la fixture, puis dispatche un processus par emplacement.
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from station.worker import run_test_for_dut
def scan_serials() -> dict[int, str]:
"""Lire les numéros de série depuis la fixture."""
return {
0: "SN-2024-001",
1: "SN-2024-002",
2: "SN-2024-003",
3: "SN-2024-004",
}
def run_parallel_cycle():
serials = scan_serials()
slots = list(serials.keys())
print(f"Démarrage du cycle de test parallèle pour {len(slots)} DUT...")
with ProcessPoolExecutor(max_workers=len(slots)) as executor:
futures = {
executor.submit(run_test_for_dut, slot, serials[slot]): slot
for slot in slots
}
for future in as_completed(futures):
slot = futures[future]
try:
sn = future.result()
print(f"Emplacement {slot} ({sn}) : terminé")
except Exception as exc:
print(f"Emplacement {slot} : ÉCHOUÉ avec {exc}")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
run_parallel_cycle()Gestion de l'occupation partielle de la fixture
Tous les emplacements de la fixture ne sont pas forcément occupés à chaque cycle. Protégez-vous contre les emplacements vides avant de soumettre les futures.
def detect_present_duts() -> dict[int, str]:
"""Retourner uniquement les emplacements où un DUT est physiquement présent."""
present = {}
for slot in range(4):
if fixture_sense_pin_active(slot):
present[slot] = read_barcode(slot)
return presentComparaison des stratégies de parallélisme
| Approche | Isolation des défauts | Accès instrument partagé | Recommandé pour |
|---|---|---|---|
ProcessPoolExecutor | Complète (processus séparés) | Nécessite IPC ou verrous par canal | La plupart des stations de production |
ThreadPoolExecutor | Aucune (processus partagé) | Direct (verrous requis) | Tests I/O-bound légers |
asyncio avec async VISA | Aucune | Coopératif (pas de préemption) | Drivers d'instruments entièrement async |
| Un processus par DUT (manuel) | Complète | Nécessite IPC | Contrôleurs de fixture personnalisés |
Pièges courants
Méthode de démarrage des processus sur macOS et Windows. Appelez toujours multiprocessing.set_start_method("spawn") avant de lancer les workers. La méthode par défaut fork sur macOS peut provoquer des deadlocks lors du fork après l'initialisation VISA ou du port série.
tearDown des plugs en cas d'échec du test. OpenHTF appelle tearDown() sur les plugs même lorsqu'une phase lève une exception. Assurez-vous que tearDown est idempotent et ne lève pas d'exception.
Collision de numéros de série. Si deux processus reçoivent le même numéro de série, TofuPilot crée deux exécutions pour la même unité. Validez l'unicité des numéros de série dans scan_serials() avant le dispatch.
Timeout d'instrument sous charge. Les instruments partagés peuvent expirer lorsque plusieurs canaux sont interrogés simultanément. Ajustez les timeouts de verrou par canal et les timeouts de lecture d'instrument pour correspondre à votre mesure la plus lente.