Skip to content
Test Station Setup

Run Parallel Tests on Multiple DUTs

Learn how to run independent OpenHTF test instances in parallel across multiple DUTs using Python multiprocessing, with per-DUT result logging to TofuPilot.

JJulien Buteau
advanced12 min readMarch 14, 2026

Running tests on multiple devices under test simultaneously cuts cycle time and reduces the cost per unit. This guide shows how to structure a Python test station that runs independent OpenHTF test instances in parallel, each logging results to TofuPilot.

Why Parallel DUT Testing Matters

Sequential testing on a multi-fixture station leaves test capacity on the table. If your fixture holds four DUTs but your software tests one at a time, you're paying for hardware you're not using.

Strategy4-DUT Fixture, 30s TestThroughput (units/hour)
Sequential4 x 30s = 120s per cycle30
Parallel (4 workers)~32s per cycle (overhead)~112

Parallel testing is most valuable when:

  • Test time is dominated by I/O waits (serial reads, power supply settling, RF measurements)
  • Your fixture physically holds multiple DUTs simultaneously
  • Takt time is a bottleneck in manufacturing

Architecture Overview

Each DUT gets its own OpenHTF test instance running in a separate process. Results for each DUT are logged independently to TofuPilot.

architecture.txt
Station Controller
├── DUT 0 → Process 0 → OpenHTF Test → TofuPilot (serial: SN-001)
├── DUT 1 → Process 1 → OpenHTF Test → TofuPilot (serial: SN-002)
├── DUT 2 → Process 2 → OpenHTF Test → TofuPilot (serial: SN-003)
└── DUT 3 → Process 3 → OpenHTF Test → TofuPilot (serial: SN-004)

Choosing a Parallelism Model

ModelBest ForShared StateGIL Impact
multiprocessingCPU-bound or instrument-isolated DUTsRequires IPC (Queue, Manager)None
threadingI/O-bound tests, shared instrument objectsDirect (with locks)Applies
ProcessPoolExecutorSimple parallel dispatch, result collectionQueue / ManagerNone
ThreadPoolExecutorLightweight I/O-bound test phasesDirect (with locks)Applies

For most hardware test stations, multiprocessing with one process per DUT is the safest choice. It isolates faults, avoids GIL contention, and keeps each DUT's logs and state fully independent.

Step 1: Define Test Phases

station/test_phases.py
import openhtf as htf
from openhtf.plugs import BasePlug
from openhtf.util import units


class PowerSupplyPlug(BasePlug):
    """Mock power supply for a single DUT slot."""

    def setUp(self):
        self._voltage = 5.01
        self._current = 0.042

    def measure_voltage(self) -> float:
        return self._voltage

    def measure_current(self) -> float:
        return self._current

    def tearDown(self):
        pass


class SerialPortPlug(BasePlug):
    """Mock serial connection to a single DUT."""

    def setUp(self):
        self._version = "2.4.1"

    def query(self, command: str) -> str:
        return self._version

    def tearDown(self):
        pass


@htf.measures(
    htf.Measurement("supply_voltage")
    .in_range(minimum=4.8, maximum=5.2)
    .with_units(units.VOLT)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_supply_voltage(test, psu):
    test.measurements.supply_voltage = psu.measure_voltage()


@htf.measures(
    htf.Measurement("idle_current")
    .in_range(minimum=0.010, maximum=0.150)
    .with_units(units.AMPERE)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_idle_current(test, psu):
    test.measurements.idle_current = psu.measure_current()


@htf.measures(
    htf.Measurement("firmware_version").equals("2.4.1")
)
@htf.plug(serial=SerialPortPlug)
def check_firmware(test, serial):
    version = serial.query("VERSION?").strip()
    test.measurements.firmware_version = version

Step 2: Create the Worker Process

Each worker creates its own OpenHTF Test object and runs it to completion.

station/worker.py
import openhtf as htf
from tofupilot.openhtf import TofuPilot

from station.test_phases import (
    measure_supply_voltage,
    measure_idle_current,
    check_firmware,
)


def run_test_for_dut(slot_index: int, serial_number: str) -> str:
    """Entry point for a single DUT worker process."""
    test = htf.Test(
        measure_supply_voltage,
        measure_idle_current,
        check_firmware,
        test_name=f"Production Test - Slot {slot_index}",
    )

    with TofuPilot(test):
        test.execute(test_start=lambda: serial_number)

    return serial_number

Step 3: Launch All DUTs in Parallel

The controller scans DUT serials from the fixture, then dispatches one process per slot.

station/controller.py
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed

from station.worker import run_test_for_dut


def scan_serials() -> dict[int, str]:
    """Read serial numbers from the fixture."""
    return {
        0: "SN-2024-001",
        1: "SN-2024-002",
        2: "SN-2024-003",
        3: "SN-2024-004",
    }


def run_parallel_cycle():
    serials = scan_serials()
    slots = list(serials.keys())

    print(f"Starting parallel test cycle for {len(slots)} DUTs...")

    with ProcessPoolExecutor(max_workers=len(slots)) as executor:
        futures = {
            executor.submit(run_test_for_dut, slot, serials[slot]): slot
            for slot in slots
        }

        for future in as_completed(futures):
            slot = futures[future]
            try:
                sn = future.result()
                print(f"Slot {slot} ({sn}): complete")
            except Exception as exc:
                print(f"Slot {slot}: FAILED with {exc}")


if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    run_parallel_cycle()

Handling Partial Fixture Occupancy

Not all fixture slots may be occupied on every cycle. Guard against empty slots before submitting futures.

station/controller.py
def detect_present_duts() -> dict[int, str]:
    """Return only slots where a DUT is physically present."""
    present = {}
    for slot in range(4):
        if fixture_sense_pin_active(slot):
            present[slot] = read_barcode(slot)
    return present

Comparison of Parallel Strategies

ApproachFault IsolationShared Instrument AccessRecommended For
ProcessPoolExecutorFull (separate processes)Requires IPC or per-channel locksMost production stations
ThreadPoolExecutorNone (shared process)Direct (locks required)Lightweight I/O-bound tests
asyncio with async VISANoneCooperative (no preemption)Fully async instrument drivers
One process per DUT (manual)FullRequires IPCCustom fixture controllers

Common Pitfalls

Process start method on macOS and Windows. Always call multiprocessing.set_start_method("spawn") before launching workers. The default fork method on macOS can deadlock when forking after VISA or serial port initialization.

Plug tearDown on test failure. OpenHTF calls tearDown() on plugs even when a phase raises an exception. Make sure tearDown is idempotent and doesn't raise.

Serial number collision. If two processes receive the same serial number, TofuPilot creates two runs for the same unit. Validate serial uniqueness in scan_serials() before dispatching.

Instrument timeout under load. Shared instruments may time out when multiple channels are queried simultaneously. Tune per-channel lock timeouts and instrument read timeouts to match your slowest measurement.

More Guides

Put this guide into practice