Running tests on multiple devices under test simultaneously cuts cycle time and reduces the cost per unit. This guide shows how to structure a Python test station that runs independent OpenHTF test instances in parallel, each logging results to TofuPilot.
Why Parallel DUT Testing Matters
Sequential testing on a multi-fixture station leaves test capacity on the table. If your fixture holds four DUTs but your software tests one at a time, you're paying for hardware you're not using.
| Strategy | 4-DUT Fixture, 30s Test | Throughput (units/hour) |
|---|---|---|
| Sequential | 4 x 30s = 120s per cycle | 30 |
| Parallel (4 workers) | ~32s per cycle (overhead) | ~112 |
Parallel testing is most valuable when:
- Test time is dominated by I/O waits (serial reads, power supply settling, RF measurements)
- Your fixture physically holds multiple DUTs simultaneously
- Takt time is a bottleneck in manufacturing
Architecture Overview
Each DUT gets its own OpenHTF test instance running in a separate process. Results for each DUT are logged independently to TofuPilot.
Station Controller
├── DUT 0 → Process 0 → OpenHTF Test → TofuPilot (serial: SN-001)
├── DUT 1 → Process 1 → OpenHTF Test → TofuPilot (serial: SN-002)
├── DUT 2 → Process 2 → OpenHTF Test → TofuPilot (serial: SN-003)
└── DUT 3 → Process 3 → OpenHTF Test → TofuPilot (serial: SN-004)Choosing a Parallelism Model
| Model | Best For | Shared State | GIL Impact |
|---|---|---|---|
multiprocessing | CPU-bound or instrument-isolated DUTs | Requires IPC (Queue, Manager) | None |
threading | I/O-bound tests, shared instrument objects | Direct (with locks) | Applies |
ProcessPoolExecutor | Simple parallel dispatch, result collection | Queue / Manager | None |
ThreadPoolExecutor | Lightweight I/O-bound test phases | Direct (with locks) | Applies |
For most hardware test stations, multiprocessing with one process per DUT is the safest choice. It isolates faults, avoids GIL contention, and keeps each DUT's logs and state fully independent.
Step 1: Define Test Phases
import openhtf as htf
from openhtf.plugs import BasePlug
from openhtf.util import units
class PowerSupplyPlug(BasePlug):
"""Mock power supply for a single DUT slot."""
def setUp(self):
self._voltage = 5.01
self._current = 0.042
def measure_voltage(self) -> float:
return self._voltage
def measure_current(self) -> float:
return self._current
def tearDown(self):
pass
class SerialPortPlug(BasePlug):
"""Mock serial connection to a single DUT."""
def setUp(self):
self._version = "2.4.1"
def query(self, command: str) -> str:
return self._version
def tearDown(self):
pass
@htf.measures(
htf.Measurement("supply_voltage")
.in_range(minimum=4.8, maximum=5.2)
.with_units(units.VOLT)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_supply_voltage(test, psu):
test.measurements.supply_voltage = psu.measure_voltage()
@htf.measures(
htf.Measurement("idle_current")
.in_range(minimum=0.010, maximum=0.150)
.with_units(units.AMPERE)
)
@htf.plug(psu=PowerSupplyPlug)
def measure_idle_current(test, psu):
test.measurements.idle_current = psu.measure_current()
@htf.measures(
htf.Measurement("firmware_version").equals("2.4.1")
)
@htf.plug(serial=SerialPortPlug)
def check_firmware(test, serial):
version = serial.query("VERSION?").strip()
test.measurements.firmware_version = versionStep 2: Create the Worker Process
Each worker creates its own OpenHTF Test object and runs it to completion.
import openhtf as htf
from tofupilot.openhtf import TofuPilot
from station.test_phases import (
measure_supply_voltage,
measure_idle_current,
check_firmware,
)
def run_test_for_dut(slot_index: int, serial_number: str) -> str:
"""Entry point for a single DUT worker process."""
test = htf.Test(
measure_supply_voltage,
measure_idle_current,
check_firmware,
test_name=f"Production Test - Slot {slot_index}",
)
with TofuPilot(test):
test.execute(test_start=lambda: serial_number)
return serial_numberStep 3: Launch All DUTs in Parallel
The controller scans DUT serials from the fixture, then dispatches one process per slot.
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from station.worker import run_test_for_dut
def scan_serials() -> dict[int, str]:
"""Read serial numbers from the fixture."""
return {
0: "SN-2024-001",
1: "SN-2024-002",
2: "SN-2024-003",
3: "SN-2024-004",
}
def run_parallel_cycle():
serials = scan_serials()
slots = list(serials.keys())
print(f"Starting parallel test cycle for {len(slots)} DUTs...")
with ProcessPoolExecutor(max_workers=len(slots)) as executor:
futures = {
executor.submit(run_test_for_dut, slot, serials[slot]): slot
for slot in slots
}
for future in as_completed(futures):
slot = futures[future]
try:
sn = future.result()
print(f"Slot {slot} ({sn}): complete")
except Exception as exc:
print(f"Slot {slot}: FAILED with {exc}")
if __name__ == "__main__":
multiprocessing.set_start_method("spawn")
run_parallel_cycle()Handling Partial Fixture Occupancy
Not all fixture slots may be occupied on every cycle. Guard against empty slots before submitting futures.
def detect_present_duts() -> dict[int, str]:
"""Return only slots where a DUT is physically present."""
present = {}
for slot in range(4):
if fixture_sense_pin_active(slot):
present[slot] = read_barcode(slot)
return presentComparison of Parallel Strategies
| Approach | Fault Isolation | Shared Instrument Access | Recommended For |
|---|---|---|---|
ProcessPoolExecutor | Full (separate processes) | Requires IPC or per-channel locks | Most production stations |
ThreadPoolExecutor | None (shared process) | Direct (locks required) | Lightweight I/O-bound tests |
asyncio with async VISA | None | Cooperative (no preemption) | Fully async instrument drivers |
| One process per DUT (manual) | Full | Requires IPC | Custom fixture controllers |
Common Pitfalls
Process start method on macOS and Windows. Always call multiprocessing.set_start_method("spawn") before launching workers. The default fork method on macOS can deadlock when forking after VISA or serial port initialization.
Plug tearDown on test failure. OpenHTF calls tearDown() on plugs even when a phase raises an exception. Make sure tearDown is idempotent and doesn't raise.
Serial number collision. If two processes receive the same serial number, TofuPilot creates two runs for the same unit. Validate serial uniqueness in scan_serials() before dispatching.
Instrument timeout under load. Shared instruments may time out when multiple channels are queried simultaneously. Tune per-channel lock timeouts and instrument read timeouts to match your slowest measurement.