Test Station Setup

Tests parallèles sur plusieurs DUT

Apprenez à exécuter des instances de test OpenHTF indépendantes en parallèle sur plusieurs DUT avec Python multiprocessing, et la journalisation des.

JJulien Buteau
advanced12 min de lecture14 mars 2026

Exécuter des tests sur plusieurs dispositifs sous test simultanément réduit le temps de cycle et diminue le coût par unité. Ce guide montre comment structurer une station de test Python qui exécute des instances de test OpenHTF indépendantes en parallèle, chacune journalisant ses résultats dans TofuPilot.

Pourquoi le test parallèle de DUT est important

Le test séquentiel sur une station multi-fixture laisse de la capacité de test inutilisée. Si votre fixture accueille quatre DUT mais que votre logiciel les teste un par un, vous payez du matériel que vous n'utilisez pas.

StratégieFixture 4 DUT, test de 30 sDébit (unités/heure)
Séquentiel4 x 30 s = 120 s par cycle30
Parallèle (4 workers)~32 s par cycle (overhead)~112

Le test parallèle est le plus utile quand :

  • Le temps de test est dominé par les attentes d'E/S (lectures série, stabilisation de l'alimentation, mesures RF)
  • Votre fixture accueille physiquement plusieurs DUT simultanément
  • Le takt time est un goulot d'étranglement en fabrication

Vue d'ensemble de l'architecture

Chaque DUT a sa propre instance de test OpenHTF exécutée dans un processus séparé. Les résultats de chaque DUT sont journalisés indépendamment dans TofuPilot.

architecture.txt
Contrôleur de station
├── DUT 0 → Processus 0 → Test OpenHTF → TofuPilot (série : SN-001)
├── DUT 1 → Processus 1 → Test OpenHTF → TofuPilot (série : SN-002)
├── DUT 2 → Processus 2 → Test OpenHTF → TofuPilot (série : SN-003)
└── DUT 3 → Processus 3 → Test OpenHTF → TofuPilot (série : SN-004)

Choisir un modèle de parallélisme

ModèleIdéal pourÉtat partagéImpact du GIL
multiprocessingDUT CPU-bound ou isolés par instrumentNécessite IPC (Queue, Manager)Aucun
threadingTests I/O-bound, objets instrument partagésDirect (avec verrous)S'applique
ProcessPoolExecutorDispatch parallèle simple, collecte de résultatsQueue / ManagerAucun
ThreadPoolExecutorPhases de test I/O-bound légèresDirect (avec verrous)S'applique

Pour la plupart des stations de test hardware, multiprocessing avec un processus par DUT est le choix le plus sûr. Il isole les défauts, évite la contention du GIL et garde les logs et l'état de chaque DUT totalement indépendants.

Étape 1 : Définir les phases de test

station/test_phases.py
import openhtf as htf
from openhtf.plugs import BasePlug
from openhtf.util import units


class PowerSupplyPlug(BasePlug):
    """Alimentation simulée pour un emplacement DUT unique."""

    def setUp(self):
        self._voltage = 5.01
        self._current = 0.042

    def measure_voltage(self) -> float:
        return self._voltage

    def measure_current(self) -> float:
        return self._current

    def tearDown(self):
        pass


class SerialPortPlug(BasePlug):
    """Connexion série simulée vers un DUT unique."""

    def setUp(self):
        self._version = "2.4.1"

    def query(self, command: str) -> str:
        return self._version

    def tearDown(self):
        pass


@htf.measures(
    htf.Measurement("supply_voltage")
    .in_range(minimum=4.8, maximum=5.2)
    .with_units(units.VOLT)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_supply_voltage(test, psu):
    test.measurements.supply_voltage = psu.measure_voltage()


@htf.measures(
    htf.Measurement("idle_current")
    .in_range(minimum=0.010, maximum=0.150)
    .with_units(units.AMPERE)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_idle_current(test, psu):
    test.measurements.idle_current = psu.measure_current()


@htf.measures(
    htf.Measurement("firmware_version").equals("2.4.1")
)
@htf.plug(serial=SerialPortPlug)
def check_firmware(test, serial):
    version = serial.query("VERSION?").strip()
    test.measurements.firmware_version = version

Étape 2 : Créer le processus worker

Chaque worker crée son propre objet Test OpenHTF et l'exécute jusqu'à la fin.

station/worker.py
import openhtf as htf
from tofupilot.openhtf import TofuPilot

from station.test_phases import (
    measure_supply_voltage,
    measure_idle_current,
    check_firmware,
)


def run_test_for_dut(slot_index: int, serial_number: str) -> str:
    """Point d'entrée pour un processus worker de DUT unique."""
    test = htf.Test(
        measure_supply_voltage,
        measure_idle_current,
        check_firmware,
        test_name=f"Test de production - Emplacement {slot_index}",
    )

    with TofuPilot(test):
        test.execute(test_start=lambda: serial_number)

    return serial_number

Étape 3 : Lancer tous les DUT en parallèle

Le contrôleur scanne les numéros de série depuis la fixture, puis dispatche un processus par emplacement.

station/controller.py
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed

from station.worker import run_test_for_dut


def scan_serials() -> dict[int, str]:
    """Lire les numéros de série depuis la fixture."""
    return {
        0: "SN-2024-001",
        1: "SN-2024-002",
        2: "SN-2024-003",
        3: "SN-2024-004",
    }


def run_parallel_cycle():
    serials = scan_serials()
    slots = list(serials.keys())

    print(f"Démarrage du cycle de test parallèle pour {len(slots)} DUT...")

    with ProcessPoolExecutor(max_workers=len(slots)) as executor:
        futures = {
            executor.submit(run_test_for_dut, slot, serials[slot]): slot
            for slot in slots
        }

        for future in as_completed(futures):
            slot = futures[future]
            try:
                sn = future.result()
                print(f"Emplacement {slot} ({sn}) : terminé")
            except Exception as exc:
                print(f"Emplacement {slot} : ÉCHOUÉ avec {exc}")


if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    run_parallel_cycle()

Gestion de l'occupation partielle de la fixture

Tous les emplacements de la fixture ne sont pas forcément occupés à chaque cycle. Protégez-vous contre les emplacements vides avant de soumettre les futures.

station/controller.py
def detect_present_duts() -> dict[int, str]:
    """Retourner uniquement les emplacements où un DUT est physiquement présent."""
    present = {}
    for slot in range(4):
        if fixture_sense_pin_active(slot):
            present[slot] = read_barcode(slot)
    return present

Comparaison des stratégies de parallélisme

ApprocheIsolation des défautsAccès instrument partagéRecommandé pour
ProcessPoolExecutorComplète (processus séparés)Nécessite IPC ou verrous par canalLa plupart des stations de production
ThreadPoolExecutorAucune (processus partagé)Direct (verrous requis)Tests I/O-bound légers
asyncio avec async VISAAucuneCoopératif (pas de préemption)Drivers d'instruments entièrement async
Un processus par DUT (manuel)ComplèteNécessite IPCContrôleurs de fixture personnalisés

Pièges courants

Méthode de démarrage des processus sur macOS et Windows. Appelez toujours multiprocessing.set_start_method("spawn") avant de lancer les workers. La méthode par défaut fork sur macOS peut provoquer des deadlocks lors du fork après l'initialisation VISA ou du port série.

tearDown des plugs en cas d'échec du test. OpenHTF appelle tearDown() sur les plugs même lorsqu'une phase lève une exception. Assurez-vous que tearDown est idempotent et ne lève pas d'exception.

Collision de numéros de série. Si deux processus reçoivent le même numéro de série, TofuPilot crée deux exécutions pour la même unité. Validez l'unicité des numéros de série dans scan_serials() avant le dispatch.

Timeout d'instrument sous charge. Les instruments partagés peuvent expirer lorsque plusieurs canaux sont interrogés simultanément. Ajustez les timeouts de verrou par canal et les timeouts de lecture d'instrument pour correspondre à votre mesure la plus lente.

Plus de guides

Mettez ce guide en pratique