AI Agent for Electronics Manufacturing: Automate SMT Lines, Quality Inspection & Supply Chain
The global electronics manufacturing services (EMS) industry produces over $600 billion in assembled PCBs annually, yet most factories still react to defects instead of preventing them. A single tombstoned component on a high-density BGA board can trigger a costly rework cycle, and a surprise allocation on a critical MLCC can halt an entire production line for weeks. These failure modes are predictable with the right data infrastructure and AI agents that reason across the full manufacturing chain.
Unlike traditional statistical process control (SPC) that flags when a parameter drifts out of spec, AI agents for electronics manufacturing correlate data across solder paste inspection, pick-and-place telemetry, reflow profiles, AOI/AXI results, and ICT/FCT outcomes simultaneously. They identify the root cause of a tombstone defect not as "component shifted" but as "nozzle 7 on head 2 showing 12-micron placement drift combined with paste deposit volume 18% below target on pad B7 due to stencil aperture wear at 43,000 prints."
This guide covers six areas where AI agents deliver measurable yield improvement and cost reduction in electronics manufacturing, with production-ready Python code for each module. Whether you run a single SMT line or a 50-line mega-factory, these patterns scale to your operation.
Table of Contents
1. SMT Line Optimization
Surface mount technology lines are the heartbeat of any electronics factory, and their efficiency determines throughput, yield, and ultimately margin. An AI agent monitoring the three critical stages — solder paste printing, component placement, and reflow soldering — can predict defects before they happen and optimize process parameters in real time. The agent ingests solder paste inspection (SPI) volumetric data, pick-and-place machine telemetry (nozzle vacuum levels, placement accuracy, feeder error rates), and reflow oven thermal profiles to build a holistic model of line health.
Solder Paste Inspection Data Analysis
SPI systems measure paste deposit volume, height, area, and offset for every pad on every board. The challenge is not collecting this data — modern SPI machines generate gigabytes per shift — but correlating it with downstream defects. An AI agent tracks rolling Cpk values for paste volume by stencil aperture size class (0201, 0402, 0603, fine-pitch QFP, BGA), detects when aperture clogging begins (gradual volume decrease across consecutive prints), and predicts when a stencil wipe or replacement is needed based on print count and paste rheology degradation.
Pick-and-place optimization is equally critical. The agent analyzes nozzle assignment efficiency (matching nozzle tip size to component body dimensions), feeder setup sequencing (minimizing head travel distance by clustering components used on the same board region), and placement accuracy trending per nozzle. A nozzle showing increasing X/Y offset over time signals vacuum seal degradation that will cause placement defects within hundreds of cycles — well before it triggers the machine's built-in alarm threshold.
Reflow Profile Optimization
Reflow soldering involves precise thermal management across multiple zones: preheat ramp (typically 1-3 C/sec), thermal soak (150-200C for 60-120 seconds to activate flux), peak reflow (liquidus at 217C for SAC305, with 20-40 seconds time above liquidus), and controlled cooling. The AI agent monitors thermocouple profiles from profiling boards and production-inline pyrometers, detecting zone temperature drift, conveyor speed inconsistencies, and board-to-board thermal variation caused by panel position effects or component thermal mass differences.
import numpy as np
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime
from enum import Enum
class ApertureClass(Enum):
BGA = "bga"
FINE_PITCH = "fine_pitch"
CHIP_0201 = "0201"
CHIP_0402 = "0402"
CHIP_0603 = "0603"
QFP = "qfp"
@dataclass
class SPIMeasurement:
board_id: str
pad_ref: str
aperture_class: ApertureClass
volume_percent: float # % of nominal volume
height_um: float # paste height in microns
area_percent: float # % of nominal area
x_offset_um: float # offset from pad center
y_offset_um: float
print_number: int # stencil print count
timestamp: datetime
@dataclass
class NozzleTelemetry:
nozzle_id: str
head_id: str
tip_size_mm: float
vacuum_level_kpa: float # typical -60 to -80 kPa
placement_x_offset_um: float
placement_y_offset_um: float
pickup_retry_count: int
cycle_count: int
@dataclass
class ReflowZone:
zone_id: int
set_temp_c: float
actual_temp_c: float
board_temp_c: float # measured at board surface
conveyor_speed_cm_min: float
class SMTLineOptimizationAgent:
"""AI agent for SPI analysis, P&P optimization, and reflow control."""
VOLUME_CPK_TARGET = 1.33
PASTE_VOLUME_LOW = 70.0 # % — below triggers alert
PASTE_VOLUME_HIGH = 150.0 # % — above triggers alert
NOZZLE_OFFSET_LIMIT_UM = 50 # placement accuracy threshold
VACUUM_DEGRADED_KPA = -55 # nozzle vacuum degradation
STENCIL_WIPE_INTERVAL = 5 # prints between auto-wipes
TAL_MIN_SEC = 20 # minimum time above liquidus
TAL_MAX_SEC = 60 # maximum time above liquidus
PEAK_TEMP_MAX_C = 250 # component damage threshold
def __init__(self):
self.spi_history: Dict[str, List[SPIMeasurement]] = {}
self.nozzle_stats: Dict[str, List[NozzleTelemetry]] = {}
self.stencil_print_count = 0
def analyze_spi_trend(self, measurements: List[SPIMeasurement]) -> dict:
"""Detect paste volume degradation and predict stencil maintenance."""
by_class = {}
for m in measurements:
cls = m.aperture_class.value
if cls not in by_class:
by_class[cls] = []
by_class[cls].append(m)
alerts = []
cpk_report = {}
for cls, data in by_class.items():
volumes = [d.volume_percent for d in data]
mean_vol = np.mean(volumes)
std_vol = np.std(volumes)
# Cpk calculation: min of upper and lower capability
usl, lsl = 150.0, 70.0
cpu = (usl - mean_vol) / (3 * std_vol) if std_vol > 0 else 99
cpl = (mean_vol - lsl) / (3 * std_vol) if std_vol > 0 else 99
cpk = min(cpu, cpl)
cpk_report[cls] = round(cpk, 3)
if cpk < self.VOLUME_CPK_TARGET:
alerts.append({
"type": "cpk_below_target",
"aperture_class": cls,
"cpk": round(cpk, 3),
"target": self.VOLUME_CPK_TARGET,
"mean_volume_pct": round(mean_vol, 1)
})
# Detect volume trend (linear regression on last 50 prints)
recent = sorted(data, key=lambda d: d.print_number)[-50:]
if len(recent) >= 20:
x = np.array([d.print_number for d in recent])
y = np.array([d.volume_percent for d in recent])
slope = np.polyfit(x, y, 1)[0]
if slope < -0.05: # losing > 0.05% per print
prints_to_limit = (mean_vol - lsl) / abs(slope)
alerts.append({
"type": "volume_declining",
"aperture_class": cls,
"slope_per_print": round(slope, 4),
"prints_until_ool": int(prints_to_limit),
"action": "schedule_stencil_clean_or_replace"
})
return {"cpk_by_class": cpk_report, "alerts": alerts,
"total_measurements": len(measurements)}
def optimize_nozzle_assignment(self,
nozzles: List[NozzleTelemetry],
bom_components: List[dict]) -> dict:
"""Match nozzle tips to components and flag degraded nozzles."""
degraded = []
assignments = []
for nozzle in nozzles:
# Check vacuum degradation
if nozzle.vacuum_level_kpa > self.VACUUM_DEGRADED_KPA:
degraded.append({
"nozzle_id": nozzle.nozzle_id,
"head": nozzle.head_id,
"vacuum_kpa": nozzle.vacuum_level_kpa,
"threshold": self.VACUUM_DEGRADED_KPA,
"action": "replace_nozzle_tip"
})
# Check placement accuracy drift
offset = np.sqrt(nozzle.placement_x_offset_um ** 2
+ nozzle.placement_y_offset_um ** 2)
if offset > self.NOZZLE_OFFSET_LIMIT_UM:
degraded.append({
"nozzle_id": nozzle.nozzle_id,
"offset_um": round(offset, 1),
"limit_um": self.NOZZLE_OFFSET_LIMIT_UM,
"cycles": nozzle.cycle_count,
"action": "recalibrate_or_replace"
})
# Assign nozzles to components by tip size match
available = [n for n in nozzles
if n.nozzle_id not in [d["nozzle_id"] for d in degraded]]
for comp in bom_components:
best = min(available,
key=lambda n: abs(n.tip_size_mm - comp["body_width_mm"]),
default=None)
if best:
assignments.append({
"component": comp["ref_des"],
"package": comp["package"],
"nozzle": best.nozzle_id,
"tip_mm": best.tip_size_mm
})
return {"assignments": assignments, "degraded_nozzles": degraded,
"healthy_nozzles": len(available), "total_nozzles": len(nozzles)}
def validate_reflow_profile(self,
zones: List[ReflowZone],
alloy: str = "SAC305") -> dict:
"""Validate reflow profile against IPC-7530 guidelines."""
liquidus = 217 if alloy == "SAC305" else 183 # SnPb
board_temps = [z.board_temp_c for z in zones]
peak_temp = max(board_temps)
# Calculate time above liquidus (TAL)
above_liquidus = [z for z in zones if z.board_temp_c > liquidus]
tal_seconds = len(above_liquidus) * 15 # ~15sec per zone transit
# Preheat ramp rate
ramp_rate = (zones[2].board_temp_c - zones[0].board_temp_c) / 30
violations = []
if tal_seconds < self.TAL_MIN_SEC:
violations.append(f"TAL too short: {tal_seconds}s < {self.TAL_MIN_SEC}s")
if tal_seconds > self.TAL_MAX_SEC:
violations.append(f"TAL too long: {tal_seconds}s > {self.TAL_MAX_SEC}s")
if peak_temp > self.PEAK_TEMP_MAX_C:
violations.append(f"Peak temp {peak_temp}C exceeds {self.PEAK_TEMP_MAX_C}C")
if ramp_rate > 3.0:
violations.append(f"Preheat ramp {ramp_rate:.1f}C/s exceeds 3.0C/s max")
# Zone deviation check
zone_drift = []
for z in zones:
delta = abs(z.actual_temp_c - z.set_temp_c)
if delta > 5:
zone_drift.append({"zone": z.zone_id, "set": z.set_temp_c,
"actual": z.actual_temp_c, "delta": round(delta, 1)})
return {
"alloy": alloy, "peak_temp_c": round(peak_temp, 1),
"tal_seconds": tal_seconds, "ramp_rate_c_per_s": round(ramp_rate, 2),
"profile_valid": len(violations) == 0,
"violations": violations, "zone_drift": zone_drift
}
2. Automated Optical & X-Ray Inspection
Automated optical inspection (AOI) and automated X-ray inspection (AXI) generate thousands of images per board, but the real value lies in defect classification and root cause correlation. Modern AOI systems detect defects — bridges, tombstones, insufficient solder, shifted components, missing parts, polarity reversals — but they produce significant false call rates, often 500-2000 ppm on complex assemblies. An AI agent that learns the boundary between real defects and false calls from operator verification data can reduce false call rates by 60-80%, dramatically cutting the verification labor burden.
X-ray inspection is essential for hidden joints beneath BGA, QFN, and LGA packages where optical inspection cannot see. The agent analyzes void percentage in BGA solder balls (IPC-7095 allows up to 25% voiding for standard applications, but many OEMs specify tighter limits of 10-15%), detects head-in-pillow defects from grayscale intensity patterns, and monitors void distribution across the array to identify systematic issues like warpage-induced non-wet opens at package corners.
The most powerful capability is defect-to-root-cause correlation. When the agent detects a cluster of bridging defects on 0.5mm-pitch QFP leads, it cross-references SPI data (paste volume and offset on those specific pads), stencil print count, paste lot and age, and placement offset data to determine whether the root cause is stencil aperture wear, excessive paste deposit, placement misalignment, or a combination. This transforms AOI from a detection tool into a process control feedback loop.
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime
from enum import Enum
import numpy as np
class DefectType(Enum):
BRIDGE = "solder_bridge"
TOMBSTONE = "tombstone"
INSUFFICIENT = "insufficient_solder"
SHIFTED = "component_shifted"
MISSING = "missing_component"
POLARITY = "polarity_reversed"
BILLBOARD = "billboard"
VOID_BGA = "bga_void"
HEAD_IN_PILLOW = "head_in_pillow"
NON_WET_OPEN = "non_wet_open"
class RootCause(Enum):
STENCIL_WEAR = "stencil_aperture_wear"
PASTE_VOLUME = "excess_paste_volume"
PASTE_DRY = "paste_drying_out"
NOZZLE_DRIFT = "nozzle_placement_drift"
NOZZLE_VACUUM = "nozzle_vacuum_loss"
REFLOW_PROFILE = "reflow_profile_deviation"
BOARD_WARPAGE = "pcb_warpage"
PAD_DESIGN = "pad_design_issue"
COMPONENT_COPLANARITY = "component_coplanarity"
@dataclass
class AOIDefect:
board_id: str
ref_des: str
package: str
defect_type: DefectType
confidence: float # 0-1, AOI algorithm confidence
image_path: str
pad_location: Tuple[float, float]
operator_verified: Optional[bool] = None # True=real, False=false_call
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class XRayResult:
board_id: str
ref_des: str
package: str # "BGA", "QFN", "LGA"
ball_count: int
void_percentages: List[float] # per-ball void %
max_void_pct: float
mean_void_pct: float
corner_void_pattern: bool # voids concentrated at corners
class InspectionAnalysisAgent:
"""AI agent for AOI false call reduction and defect root cause analysis."""
FALSE_CALL_CONFIDENCE_THRESHOLD = 0.72
BGA_VOID_LIMIT_PCT = 25.0 # IPC-7095 standard
BGA_VOID_TIGHT_PCT = 15.0 # OEM tightened spec
BRIDGE_CLUSTER_THRESHOLD = 3 # defects on same board region
CORRELATION_WINDOW_BOARDS = 50 # boards to analyze for patterns
def __init__(self):
self.defect_history: List[AOIDefect] = []
self.false_call_model: Dict[str, float] = {}
self.root_cause_rules: List[dict] = self._init_root_cause_rules()
def classify_with_false_call_filter(self,
defects: List[AOIDefect]) -> dict:
"""Filter likely false calls using historical verification data."""
real_defects = []
likely_false = []
for d in defects:
# Check against learned false call patterns
fc_key = f"{d.package}_{d.defect_type.value}"
historical_fc_rate = self.false_call_model.get(fc_key, 0.5)
# Combine AOI confidence with historical false call rate
adjusted_confidence = d.confidence * (1 - historical_fc_rate)
if adjusted_confidence >= self.FALSE_CALL_CONFIDENCE_THRESHOLD:
real_defects.append(d)
else:
likely_false.append({
"ref_des": d.ref_des,
"defect_type": d.defect_type.value,
"aoi_confidence": round(d.confidence, 3),
"adjusted_confidence": round(adjusted_confidence, 3),
"recommendation": "skip_verification"
})
return {
"real_defects": len(real_defects),
"filtered_false_calls": len(likely_false),
"reduction_pct": round(
len(likely_false) / max(len(defects), 1) * 100, 1),
"defects": real_defects,
"false_calls": likely_false
}
def analyze_xray_voids(self, results: List[XRayResult],
spec_limit: float = None) -> dict:
"""Analyze BGA/QFN void patterns for process issues."""
limit = spec_limit or self.BGA_VOID_LIMIT_PCT
failures = []
warnings = []
corner_warpage_suspects = []
for r in results:
if r.max_void_pct > limit:
failures.append({
"board": r.board_id, "ref_des": r.ref_des,
"max_void": round(r.max_void_pct, 1),
"mean_void": round(r.mean_void_pct, 1),
"limit": limit
})
elif r.mean_void_pct > limit * 0.6:
warnings.append({
"board": r.board_id, "ref_des": r.ref_des,
"mean_void": round(r.mean_void_pct, 1),
"trending_toward_limit": True
})
if r.corner_void_pattern:
corner_warpage_suspects.append({
"board": r.board_id, "ref_des": r.ref_des,
"package": r.package,
"root_cause": "pcb_or_component_warpage",
"action": "check_coplanarity_and_board_flatness"
})
return {
"total_inspected": len(results),
"void_failures": len(failures),
"void_warnings": len(warnings),
"warpage_suspects": corner_warpage_suspects,
"failures": failures,
"yield_pct": round(
(1 - len(failures) / max(len(results), 1)) * 100, 2)
}
def correlate_defect_to_root_cause(self,
defects: List[AOIDefect],
spi_data: Dict[str, dict],
nozzle_data: Dict[str, dict]) -> List[dict]:
"""Map defect patterns to manufacturing root causes."""
correlations = []
# Group defects by type and location
by_type = {}
for d in defects:
if d.defect_type not in by_type:
by_type[d.defect_type] = []
by_type[d.defect_type].append(d)
for defect_type, group in by_type.items():
for rule in self.root_cause_rules:
if rule["defect_type"] != defect_type:
continue
evidence = []
for d in group:
spi = spi_data.get(d.ref_des, {})
nozzle = nozzle_data.get(d.ref_des, {})
if rule["check"](spi, nozzle):
evidence.append(d.ref_des)
if len(evidence) >= rule.get("min_evidence", 2):
correlations.append({
"defect_type": defect_type.value,
"root_cause": rule["root_cause"].value,
"confidence": min(len(evidence) / len(group), 1.0),
"affected_refs": evidence,
"corrective_action": rule["action"]
})
return sorted(correlations, key=lambda c: c["confidence"], reverse=True)
def _init_root_cause_rules(self) -> List[dict]:
return [
{
"defect_type": DefectType.BRIDGE,
"root_cause": RootCause.STENCIL_WEAR,
"check": lambda spi, _: spi.get("volume_pct", 100) > 130,
"action": "inspect_stencil_apertures_for_wear",
"min_evidence": 3
},
{
"defect_type": DefectType.TOMBSTONE,
"root_cause": RootCause.PASTE_VOLUME,
"check": lambda spi, _: abs(
spi.get("pad_a_vol", 100) - spi.get("pad_b_vol", 100)
) > 30,
"action": "check_paste_balance_between_pads",
"min_evidence": 2
},
{
"defect_type": DefectType.SHIFTED,
"root_cause": RootCause.NOZZLE_DRIFT,
"check": lambda _, noz: noz.get("offset_um", 0) > 40,
"action": "recalibrate_nozzle_or_check_vacuum",
"min_evidence": 2
},
{
"defect_type": DefectType.HEAD_IN_PILLOW,
"root_cause": RootCause.BOARD_WARPAGE,
"check": lambda spi, _: spi.get("corner_position", False),
"action": "measure_board_warpage_and_component_coplanarity",
"min_evidence": 2
},
]
def update_false_call_model(self, verified: List[AOIDefect]):
"""Learn from operator verification to improve false call filtering."""
by_key = {}
for d in verified:
if d.operator_verified is None:
continue
key = f"{d.package}_{d.defect_type.value}"
if key not in by_key:
by_key[key] = {"total": 0, "false": 0}
by_key[key]["total"] += 1
if not d.operator_verified:
by_key[key]["false"] += 1
for key, counts in by_key.items():
self.false_call_model[key] = counts["false"] / counts["total"]
3. Component Supply Chain Intelligence
Electronics supply chain disruptions have become the norm rather than the exception. The 2020-2023 semiconductor shortage demonstrated how a single-source MOSFET going on 52-week lead time can halt production across an entire product family. An AI agent that continuously monitors BOM risk, identifies alternate parts before shortages hit, and forecasts demand against supplier allocation provides a strategic advantage that goes far beyond traditional MRP systems.
BOM risk scoring assigns a numeric risk level to every component in every active product based on multiple factors: single-source vs. multi-source availability, current lead time versus historical baseline, lifecycle status (active, NRND — not recommended for new designs, EOL — end of life), allocation status from distributor APIs, and geographic concentration of manufacturing (a component made exclusively in one fab carries higher risk). The agent updates these scores daily and escalates when a component crosses from "monitor" to "action required" status.
Alternate part matching goes beyond simple parametric equivalence. The agent checks not only electrical specifications (capacitance, voltage rating, tolerance, ESR for capacitors; RDS(on), Vgs threshold, package for MOSFETs) but also footprint compatibility (land pattern dimensions, pin assignment), moisture sensitivity level (MSL), and qualification status with end customers. A parametrically equivalent part that requires a new PCB revision or customer requalification is not a viable production alternate.
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
import statistics
class LifecycleStatus(Enum):
ACTIVE = "active"
NRND = "not_recommended_new_design"
EOL = "end_of_life"
OBSOLETE = "obsolete"
class RiskLevel(Enum):
LOW = "low"
MODERATE = "moderate"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class BOMComponent:
mpn: str # manufacturer part number
manufacturer: str
description: str
package: str # e.g., "0402", "SOIC-8", "BGA-256"
annual_usage: int
unit_cost_usd: float
lead_time_weeks: int
lifecycle: LifecycleStatus
num_sources: int # number of qualified sources
safety_stock_weeks: int
current_stock: int
allocated: bool # on allocation from supplier
msl: int # moisture sensitivity level 1-6
@dataclass
class AlternatePart:
mpn: str
manufacturer: str
parametric_match_pct: float # how close electrically
footprint_compatible: bool
pin_compatible: bool
lead_time_weeks: int
unit_cost_usd: float
qualification_required: bool
msl: int
class SupplyChainIntelligenceAgent:
"""BOM risk scoring, alternate part matching, demand forecasting."""
LEAD_TIME_RISK_MULTIPLIER = 2.0 # 2x historical = high risk
SINGLE_SOURCE_PENALTY = 30 # risk points for single source
EOL_PENALTY = 50
NRND_PENALTY = 25
ALLOCATION_PENALTY = 40
STOCK_COVERAGE_WARN_WEEKS = 4
def __init__(self):
self.historical_lead_times: Dict[str, List[int]] = {}
self.demand_forecasts: Dict[str, List[int]] = {}
def score_bom_risk(self, bom: List[BOMComponent]) -> dict:
"""Assign risk score 0-100 to each BOM component."""
scored = []
for comp in bom:
risk_score = 0
factors = []
# Single/dual source risk
if comp.num_sources == 1:
risk_score += self.SINGLE_SOURCE_PENALTY
factors.append("single_source")
elif comp.num_sources == 2:
risk_score += 10
factors.append("dual_source_only")
# Lifecycle risk
if comp.lifecycle == LifecycleStatus.EOL:
risk_score += self.EOL_PENALTY
factors.append("end_of_life")
elif comp.lifecycle == LifecycleStatus.NRND:
risk_score += self.NRND_PENALTY
factors.append("nrnd_status")
# Lead time risk vs historical
hist = self.historical_lead_times.get(comp.mpn, [])
if hist:
avg_lt = statistics.mean(hist)
if comp.lead_time_weeks > avg_lt * self.LEAD_TIME_RISK_MULTIPLIER:
risk_score += 20
factors.append(f"lead_time_{comp.lead_time_weeks}w_vs_avg_{avg_lt:.0f}w")
# Allocation risk
if comp.allocated:
risk_score += self.ALLOCATION_PENALTY
factors.append("on_allocation")
# Stock coverage
weekly_usage = comp.annual_usage / 52
stock_weeks = comp.current_stock / max(weekly_usage, 1)
if stock_weeks < self.STOCK_COVERAGE_WARN_WEEKS:
risk_score += 15
factors.append(f"stock_coverage_{stock_weeks:.1f}_weeks")
risk_score = min(risk_score, 100)
level = (RiskLevel.CRITICAL if risk_score >= 70 else
RiskLevel.HIGH if risk_score >= 45 else
RiskLevel.MODERATE if risk_score >= 20 else
RiskLevel.LOW)
scored.append({
"mpn": comp.mpn, "manufacturer": comp.manufacturer,
"package": comp.package, "risk_score": risk_score,
"risk_level": level.value, "factors": factors,
"stock_weeks_remaining": round(stock_weeks, 1),
"annual_spend": round(comp.annual_usage * comp.unit_cost_usd, 0)
})
scored.sort(key=lambda s: s["risk_score"], reverse=True)
critical_count = sum(1 for s in scored if s["risk_level"] == "critical")
return {
"total_components": len(scored),
"critical_risk": critical_count,
"high_risk": sum(1 for s in scored if s["risk_level"] == "high"),
"components": scored,
"total_bom_cost": round(
sum(c.annual_usage * c.unit_cost_usd for c in bom), 0),
"at_risk_spend": round(sum(
s["annual_spend"] for s in scored
if s["risk_level"] in ("critical", "high")), 0)
}
def find_alternates(self, component: BOMComponent,
candidates: List[AlternatePart]) -> List[dict]:
"""Rank alternate parts by drop-in compatibility and cost."""
ranked = []
for alt in candidates:
suitability_score = alt.parametric_match_pct
# Footprint and pin compatibility bonuses
if alt.footprint_compatible and alt.pin_compatible:
suitability_score *= 1.2 # 20% bonus for true drop-in
elif not alt.footprint_compatible:
suitability_score *= 0.3 # massive penalty — needs PCB change
# Lead time advantage
if alt.lead_time_weeks < component.lead_time_weeks:
suitability_score *= 1.1
# Cost comparison
cost_delta_pct = ((alt.unit_cost_usd - component.unit_cost_usd)
/ max(component.unit_cost_usd, 0.001)) * 100
# MSL compatibility
msl_compatible = alt.msl <= component.msl
ranked.append({
"mpn": alt.mpn, "manufacturer": alt.manufacturer,
"suitability_score": round(min(suitability_score, 100), 1),
"footprint_drop_in": alt.footprint_compatible and alt.pin_compatible,
"cost_delta_pct": round(cost_delta_pct, 1),
"lead_time_weeks": alt.lead_time_weeks,
"qualification_needed": alt.qualification_required,
"msl_compatible": msl_compatible
})
ranked.sort(key=lambda r: r["suitability_score"], reverse=True)
return ranked
def forecast_demand_with_alerts(self, mpn: str,
historical_monthly: List[int],
planned_builds: List[int]) -> dict:
"""Forecast component demand and detect allocation risk."""
avg_monthly = statistics.mean(historical_monthly[-6:])
trend = (statistics.mean(historical_monthly[-3:])
- statistics.mean(historical_monthly[-6:-3]))
forecast_3m = [int(avg_monthly + trend * (i + 1)) for i in range(3)]
planned_3m = planned_builds[:3] if len(planned_builds) >= 3 else planned_builds
gap = [max(0, p - f) for p, f in zip(planned_3m, forecast_3m)]
shortage_risk = any(g > 0 for g in gap)
return {
"mpn": mpn, "avg_monthly_usage": round(avg_monthly, 0),
"trend_monthly": round(trend, 0),
"forecast_next_3m": forecast_3m,
"planned_next_3m": planned_3m,
"potential_gap": gap,
"shortage_risk": shortage_risk,
"action": "place_buffer_order" if shortage_risk else "monitor"
}
4. Test Engineering Automation
In-circuit test (ICT) and functional test (FCT) are the last line of defense before a product ships to the customer, yet test engineering remains one of the most manual disciplines in electronics manufacturing. Test programs are hand-tuned, failure analysis depends on technician experience, and test data analytics rarely goes beyond basic yield reporting. An AI agent that optimizes test coverage, automates failure analysis, and provides real-time SPC on test parameters transforms test from a cost center into a process intelligence tool.
ICT/FCT test optimization starts with coverage analysis — mapping which components and circuit nodes are tested and which are not. Many test programs evolve organically over years, accumulating redundant tests while missing coverage gaps. The agent analyzes the correlation between individual test steps and field failure modes, identifying tests that have never caught a defect (candidates for removal to reduce cycle time) and failure modes that no test catches (coverage gaps requiring new test development). Test sequence optimization then reorders the remaining tests to put high-fail-rate tests first, enabling early abort that saves cycle time on defective units.
Failure analysis automation applies fault tree logic to test results. When a functional test fails, the agent traces the circuit path, identifies which component or solder joint is most likely responsible based on the specific failure signature (voltage out of range, timing violation, current consumption anomaly), and cross-references with AOI/SPI data from that specific board serial number to provide a targeted rework instruction rather than a generic "debug" disposition.
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime
import statistics
import numpy as np
@dataclass
class TestStep:
step_id: str
test_type: str # "ICT", "FCT", "boundary_scan"
description: str
node_or_signal: str
lower_limit: float
upper_limit: float
unit: str
avg_execution_ms: int
historical_fail_rate: float # 0-1
@dataclass
class TestResult:
board_serial: str
step_id: str
measured_value: float
pass_fail: bool
timestamp: datetime
execution_ms: int
@dataclass
class FaultSignature:
test_step: str
failure_mode: str # "high", "low", "open", "short", "timeout"
measured_value: float
expected_range: Tuple[float, float]
circuit_path: List[str] # component ref_des in signal path
class TestEngineeringAgent:
"""Optimize test coverage, automate failure analysis, run SPC analytics."""
CPK_TARGET = 1.33
CPK_WARNING = 1.0
PARETO_TOP_N = 10
REDUNDANCY_THRESHOLD = 0.0001 # fail rate below this = candidate for removal
EARLY_ABORT_FAIL_RATE = 0.005 # 0.5% — test steps above this run first
def __init__(self):
self.test_program: List[TestStep] = []
self.results_db: List[TestResult] = []
def optimize_test_sequence(self,
program: List[TestStep],
history: List[TestResult]) -> dict:
"""Reorder tests for early abort and remove redundant steps."""
step_stats = {}
for step in program:
results = [r for r in history if r.step_id == step.step_id]
fails = sum(1 for r in results if not r.pass_fail)
total = len(results)
fail_rate = fails / max(total, 1)
step_stats[step.step_id] = {
"fail_rate": fail_rate,
"avg_time_ms": step.avg_execution_ms,
"total_runs": total,
"description": step.description
}
# Identify redundant tests (never fail)
redundant = [sid for sid, s in step_stats.items()
if s["fail_rate"] < self.REDUNDANCY_THRESHOLD
and s["total_runs"] > 1000]
# Optimized sequence: high-fail-rate, fast tests first
active_steps = [s for s in program if s.step_id not in redundant]
active_steps.sort(
key=lambda s: (
-step_stats[s.step_id]["fail_rate"], # highest fail first
step_stats[s.step_id]["avg_time_ms"] # then fastest
)
)
# Calculate time saved by early abort
original_time = sum(s.avg_execution_ms for s in program)
optimized_time = sum(s.avg_execution_ms for s in active_steps)
early_abort_savings = sum(
s.avg_execution_ms for s in active_steps
if step_stats[s.step_id]["fail_rate"] > self.EARLY_ABORT_FAIL_RATE
) * 0.3 # ~30% of boards fail early on average
return {
"original_steps": len(program),
"optimized_steps": len(active_steps),
"removed_redundant": len(redundant),
"redundant_steps": redundant,
"original_cycle_ms": original_time,
"optimized_cycle_ms": optimized_time,
"early_abort_savings_ms": int(early_abort_savings),
"sequence": [s.step_id for s in active_steps],
"time_reduction_pct": round(
(1 - optimized_time / max(original_time, 1)) * 100, 1)
}
def analyze_failures_pareto(self,
results: List[TestResult]) -> dict:
"""Pareto analysis of test failures with fault tree mapping."""
failures = [r for r in results if not r.pass_fail]
by_step = {}
for f in failures:
if f.step_id not in by_step:
by_step[f.step_id] = {"count": 0, "values": []}
by_step[f.step_id]["count"] += 1
by_step[f.step_id]["values"].append(f.measured_value)
total_fails = len(failures)
pareto = sorted(by_step.items(), key=lambda x: x[1]["count"],
reverse=True)[:self.PARETO_TOP_N]
cumulative = 0
pareto_result = []
for step_id, data in pareto:
cumulative += data["count"]
pareto_result.append({
"step_id": step_id,
"fail_count": data["count"],
"fail_pct": round(data["count"] / max(total_fails, 1) * 100, 1),
"cumulative_pct": round(cumulative / max(total_fails, 1) * 100, 1),
"value_mean": round(statistics.mean(data["values"]), 4),
"value_stdev": round(statistics.stdev(data["values"]), 4)
if len(data["values"]) > 1 else 0
})
total_boards = len(set(r.board_serial for r in results))
failed_boards = len(set(f.board_serial for f in failures))
return {
"total_boards_tested": total_boards,
"failed_boards": failed_boards,
"first_pass_yield_pct": round(
(1 - failed_boards / max(total_boards, 1)) * 100, 2),
"total_failures": total_fails,
"pareto_top_n": pareto_result
}
def spc_analysis(self, step_id: str,
results: List[TestResult]) -> dict:
"""Statistical process control: Cpk, control limits, trend detection."""
step_results = [r for r in results if r.step_id == step_id]
values = [r.measured_value for r in step_results]
if len(values) < 30:
return {"error": "Insufficient data", "min_required": 30}
mean = statistics.mean(values)
stdev = statistics.stdev(values)
# Find limits from test program
step = next((s for s in self.test_program if s.step_id == step_id), None)
if not step:
return {"error": f"Step {step_id} not found in program"}
usl, lsl = step.upper_limit, step.lower_limit
cpu = (usl - mean) / (3 * stdev) if stdev > 0 else 99
cpl = (mean - lsl) / (3 * stdev) if stdev > 0 else 99
cpk = min(cpu, cpl)
# Control limits (3-sigma)
ucl = mean + 3 * stdev
lcl = mean - 3 * stdev
# Trend detection (last 50 measurements)
recent = values[-50:]
x = np.arange(len(recent))
slope = np.polyfit(x, recent, 1)[0]
drift_per_100 = slope * 100
# Nelson rules check (simplified: 8 consecutive above/below mean)
consecutive_above = 0
max_consecutive = 0
for v in recent:
if v > mean:
consecutive_above += 1
max_consecutive = max(max_consecutive, consecutive_above)
else:
consecutive_above = 0
nelson_violation = max_consecutive >= 8
status = "capable" if cpk >= self.CPK_TARGET else (
"marginal" if cpk >= self.CPK_WARNING else "not_capable")
return {
"step_id": step_id, "sample_size": len(values),
"mean": round(mean, 4), "stdev": round(stdev, 4),
"cpk": round(cpk, 3), "cpu": round(cpu, 3), "cpl": round(cpl, 3),
"ucl": round(ucl, 4), "lcl": round(lcl, 4),
"usl": usl, "lsl": lsl,
"drift_per_100_units": round(drift_per_100, 4),
"nelson_rule_violation": nelson_violation,
"status": status,
"action": "investigate_drift" if nelson_violation
else "improve_process" if status == "not_capable"
else "monitor"
}
5. Traceability & Compliance
Electronics manufacturing traceability extends from incoming component lot codes through every process step to the final shipped serial number. Regulatory requirements (RoHS, REACH, conflict minerals), industry standards (IPC/J-STD-001 workmanship, IPC-A-610 acceptability), and customer-specific mandates (automotive IATF 16949, medical ISO 13485, aerospace AS9100) all impose documentation requirements that are expensive to maintain manually and catastrophic to fail during an audit.
An AI agent for traceability continuously validates that every component on the line has a valid lot code linked to a certificate of conformance (CoC), that moisture-sensitive devices (MSD) are within their floor life exposure window per IPC/JEDEC J-STD-033, and that material declarations (RoHS compliance, REACH SVHC substance lists) are current for every part number. When a component lot is loaded onto a feeder, the agent verifies its MSD exposure timer — a BGA at MSL-3 has a maximum floor life of 168 hours after bag opening, and exceeding this requires baking at 125C for a duration that depends on package thickness before it can be safely reflowed.
Customer-specific requirements add another layer of complexity. Automotive customers may require PPAP (Production Part Approval Process) documentation for any process change, medical customers demand full device history records (DHR) traceable to individual component lots, and defense customers require DFARS-compliant sourcing with country-of-origin documentation. The agent maintains a requirements matrix per customer and validates compliance at each production step, flagging deviations before they become audit findings or field escapes.
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
class ComplianceStandard(Enum):
ROHS = "RoHS_2011_65_EU"
REACH = "REACH_SVHC"
JSTD_033 = "IPC_JEDEC_J-STD-033D"
JSTD_001 = "J-STD-001"
IPC_A_610 = "IPC-A-610"
IATF_16949 = "IATF_16949"
ISO_13485 = "ISO_13485"
AS9100 = "AS9100D"
@dataclass
class ComponentLot:
mpn: str
lot_code: str
date_code: str
quantity: int
msl: int # moisture sensitivity level
bag_opened: Optional[datetime]
floor_life_hours: int # per J-STD-033 table
baked: bool
bake_end_time: Optional[datetime]
coc_on_file: bool # certificate of conformance
rohs_compliant: bool
reach_compliant: bool
country_of_origin: str
@dataclass
class CustomerRequirement:
customer_id: str
standard: ComplianceStandard
additional_rules: List[str]
ppap_required: bool
full_lot_traceability: bool
country_restrictions: List[str]
max_component_age_months: Optional[int]
class TraceabilityComplianceAgent:
"""Component lot tracking, MSD management, and compliance verification."""
# J-STD-033D floor life table (hours at <30C / 60% RH)
MSL_FLOOR_LIFE = {
1: float("inf"), # unlimited
2: 8760, # 1 year
2.5: 8760, # 1 year (equivalent handling)
3: 168, # 7 days
4: 72, # 3 days
5: 48, # 2 days
5.5: 24, # 1 day
6: 6 # must reflow within 6 hours
}
BAKE_TEMPS = {
"standard": {"temp_c": 125, "hours_per_mm": 12},
"low_temp": {"temp_c": 40, "hours_per_mm": 192}
}
def __init__(self):
self.active_lots: Dict[str, ComponentLot] = {}
self.customer_reqs: Dict[str, CustomerRequirement] = {}
self.exposure_log: Dict[str, float] = {} # lot -> cumulative hours
def check_msd_floor_life(self, lot: ComponentLot) -> dict:
"""Verify moisture-sensitive device is within floor life per J-STD-033."""
if lot.msl <= 1:
return {"lot": lot.lot_code, "msl": lot.msl,
"status": "unlimited", "action": "none"}
max_hours = self.MSL_FLOOR_LIFE.get(lot.msl, 168)
if lot.bag_opened is None:
return {"lot": lot.lot_code, "status": "sealed",
"floor_life_hours": max_hours, "action": "none"}
# Reset floor life if baked
start_time = lot.bake_end_time if lot.baked else lot.bag_opened
elapsed = (datetime.now() - start_time).total_seconds() / 3600
remaining = max_hours - elapsed
remaining_pct = (remaining / max_hours) * 100
if remaining <= 0:
return {
"lot": lot.lot_code, "mpn": lot.mpn, "msl": lot.msl,
"status": "EXPIRED",
"elapsed_hours": round(elapsed, 1),
"floor_life_hours": max_hours,
"action": "bake_before_use",
"bake_profile": self.BAKE_TEMPS["standard"],
"severity": "critical"
}
elif remaining_pct < 20:
return {
"lot": lot.lot_code, "mpn": lot.mpn, "msl": lot.msl,
"status": "expiring_soon",
"remaining_hours": round(remaining, 1),
"remaining_pct": round(remaining_pct, 1),
"action": "prioritize_consumption_or_reseal"
}
return {
"lot": lot.lot_code, "mpn": lot.mpn, "msl": lot.msl,
"status": "ok",
"remaining_hours": round(remaining, 1),
"remaining_pct": round(remaining_pct, 1)
}
def validate_compliance(self, lot: ComponentLot,
customer_id: str) -> dict:
"""Check component lot against customer-specific requirements."""
req = self.customer_reqs.get(customer_id)
if not req:
return {"error": f"No requirements defined for {customer_id}"}
violations = []
warnings = []
# RoHS check
if not lot.rohs_compliant:
violations.append({
"standard": "RoHS",
"detail": f"MPN {lot.mpn} lot {lot.lot_code} not RoHS compliant",
"severity": "blocking"
})
# REACH check
if not lot.reach_compliant:
violations.append({
"standard": "REACH",
"detail": f"SVHC declaration missing for {lot.mpn}",
"severity": "blocking"
})
# CoC check
if not lot.coc_on_file and req.full_lot_traceability:
violations.append({
"standard": req.standard.value,
"detail": f"Certificate of Conformance missing for lot {lot.lot_code}",
"severity": "blocking"
})
# Country of origin restrictions
if lot.country_of_origin in req.country_restrictions:
violations.append({
"standard": "DFARS/country_restriction",
"detail": f"Component sourced from restricted country: "
f"{lot.country_of_origin}",
"severity": "blocking"
})
# Component age check
if req.max_component_age_months:
try:
year = int("20" + lot.date_code[:2])
week = int(lot.date_code[2:])
mfg_date = datetime(year, 1, 1) + timedelta(weeks=week - 1)
age_months = (datetime.now() - mfg_date).days / 30
if age_months > req.max_component_age_months:
violations.append({
"standard": "component_age",
"detail": f"Component age {age_months:.0f} months "
f"exceeds {req.max_component_age_months} month limit",
"severity": "blocking"
})
except (ValueError, IndexError):
warnings.append(f"Cannot parse date code: {lot.date_code}")
# MSD floor life
msd_status = self.check_msd_floor_life(lot)
if msd_status.get("status") == "EXPIRED":
violations.append({
"standard": "J-STD-033",
"detail": f"MSL-{lot.msl} floor life expired for lot {lot.lot_code}",
"severity": "blocking"
})
compliant = len(violations) == 0
return {
"lot": lot.lot_code, "mpn": lot.mpn,
"customer": customer_id,
"compliant": compliant,
"violations": violations,
"warnings": warnings,
"standards_checked": [
"RoHS", "REACH", req.standard.value, "J-STD-033"
]
}
def generate_lot_trace_report(self,
board_serial: str,
component_lots: Dict[str, str],
process_records: Dict[str, dict]) -> dict:
"""Full traceability report for a single board serial number."""
trace = {
"board_serial": board_serial,
"generated": datetime.now().isoformat(),
"component_lots": {},
"process_history": {},
"compliance_status": "pass"
}
for ref_des, lot_code in component_lots.items():
lot = self.active_lots.get(lot_code)
if lot:
trace["component_lots"][ref_des] = {
"mpn": lot.mpn, "lot_code": lot.lot_code,
"date_code": lot.date_code,
"country_of_origin": lot.country_of_origin,
"rohs": lot.rohs_compliant
}
else:
trace["compliance_status"] = "incomplete"
trace["process_history"] = process_records
return trace
6. ROI Analysis: Mid-Size EMS (5 SMT Lines)
For an EMS provider evaluating AI agent deployment, the return on investment depends on current yield levels, product mix complexity, and supply chain exposure. Below is a detailed breakdown for a mid-size EMS factory running 5 high-speed SMT lines, producing a mix of consumer electronics, industrial controls, and automotive modules at approximately 25,000 boards per day.
Assumptions
- Current first-pass yield: 98.5% (industry average for mixed product)
- Target first-pass yield: 99.4% (achievable with AI-driven process control)
- Average board value: $45 (component + PCB cost)
- Rework cost per defect: $12 (labor + materials)
- Annual production: 6.25 million boards (25,000/day x 250 days)
- Annual component spend: $180M
- NPI (new product introduction) cycles: 40 per year
| Category | Improvement | Annual Savings |
|---|---|---|
| Yield Improvement (98.5% to 99.4%) | 56,250 fewer defective boards/year | $675,000 - $1,125,000 |
| Reduced Scrap & Rework | 60% fewer rework cycles | $450,000 - $810,000 |
| AOI False Call Reduction | 70% fewer false calls = verification labor saved | $320,000 - $480,000 |
| Test Cycle Time Optimization | 20% faster test = higher throughput | $540,000 - $960,000 |
| Supply Chain Risk Avoidance | Prevent 3-5 line-down events per year | $900,000 - $2,400,000 |
| Faster NPI | 30% faster first-article to production ramp | $600,000 - $1,200,000 |
| Compliance & Audit Savings | Automated traceability reduces audit prep 80% | $180,000 - $360,000 |
| Predictive Stencil/Nozzle Maintenance | 40% fewer unplanned line stops | $360,000 - $720,000 |
| Total Annual Savings | $4,025,000 - $8,055,000 |
Implementation Cost vs. Return
from dataclasses import dataclass
@dataclass
class EMSFactoryROIModel:
"""Calculate ROI for AI agent deployment in electronics manufacturing."""
smt_lines: int = 5
boards_per_day: int = 25000
production_days_year: int = 250
avg_board_value_usd: float = 45.0
rework_cost_per_defect: float = 12.0
current_fpy_pct: float = 98.5
target_fpy_pct: float = 99.4
annual_component_spend: float = 180_000_000
npi_cycles_year: int = 40
aoi_false_calls_per_day: int = 350
verification_labor_cost_hr: float = 35.0
def calculate_yield_savings(self) -> dict:
"""Savings from first-pass yield improvement."""
annual_boards = self.boards_per_day * self.production_days_year
defects_before = annual_boards * (1 - self.current_fpy_pct / 100)
defects_after = annual_boards * (1 - self.target_fpy_pct / 100)
defects_avoided = defects_before - defects_after
rework_savings = defects_avoided * self.rework_cost_per_defect
scrap_savings = defects_avoided * 0.05 * self.avg_board_value_usd
return {
"defects_avoided": int(defects_avoided),
"rework_savings_usd": round(rework_savings, 0),
"scrap_savings_usd": round(scrap_savings, 0),
"total_yield_savings": round(rework_savings + scrap_savings, 0)
}
def calculate_aoi_savings(self, false_call_reduction: float = 0.70) -> dict:
"""Labor savings from reduced AOI false call verification."""
daily_false_calls_eliminated = (
self.aoi_false_calls_per_day * false_call_reduction)
minutes_per_verification = 2.5
daily_hours_saved = daily_false_calls_eliminated * minutes_per_verification / 60
annual_savings = (daily_hours_saved * self.production_days_year
* self.verification_labor_cost_hr)
return {
"false_calls_eliminated_daily": int(daily_false_calls_eliminated),
"hours_saved_daily": round(daily_hours_saved, 1),
"annual_savings_usd": round(annual_savings, 0)
}
def calculate_supply_chain_savings(self,
line_down_events_prevented: int = 4,
avg_line_down_cost_usd: float = 180_000) -> dict:
"""Savings from preventing supply chain disruptions."""
disruption_savings = line_down_events_prevented * avg_line_down_cost_usd
# Better alternate sourcing saves 0.5-1% on component spend
sourcing_savings = self.annual_component_spend * 0.005
return {
"disruption_savings_usd": round(disruption_savings, 0),
"sourcing_optimization_usd": round(sourcing_savings, 0),
"total_supply_chain_usd": round(
disruption_savings + sourcing_savings, 0)
}
def calculate_npi_acceleration(self,
time_reduction_pct: float = 0.30,
avg_npi_value_usd: float = 85_000) -> dict:
"""Revenue from faster new product introduction."""
weeks_saved = self.npi_cycles_year * time_reduction_pct * 2
revenue_acceleration = weeks_saved * avg_npi_value_usd / 8
return {
"npi_cycles": self.npi_cycles_year,
"total_weeks_saved": round(weeks_saved, 0),
"revenue_acceleration_usd": round(revenue_acceleration, 0)
}
def full_roi_analysis(self) -> dict:
"""Complete ROI model for AI agent EMS deployment."""
yield_sav = self.calculate_yield_savings()
aoi_sav = self.calculate_aoi_savings()
supply_sav = self.calculate_supply_chain_savings()
npi_sav = self.calculate_npi_acceleration()
# Implementation costs
setup_cost = 350_000 # integration, training, customization
annual_license = 180_000 # AI platform SaaS
annual_support = 85_000 # support + model retraining
total_annual_cost = annual_license + annual_support
total_year1_cost = setup_cost + total_annual_cost
total_annual_benefit = (
yield_sav["total_yield_savings"]
+ aoi_sav["annual_savings_usd"]
+ supply_sav["total_supply_chain_usd"]
+ npi_sav["revenue_acceleration_usd"]
)
roi_year1 = ((total_annual_benefit - total_year1_cost)
/ total_year1_cost) * 100
roi_year2 = ((total_annual_benefit - total_annual_cost)
/ total_annual_cost) * 100
payback_months = (total_year1_cost / total_annual_benefit) * 12
return {
"factory_profile": {
"smt_lines": self.smt_lines,
"daily_output": self.boards_per_day,
"annual_boards": self.boards_per_day * self.production_days_year,
"fpy_improvement": f"{self.current_fpy_pct}% -> {self.target_fpy_pct}%"
},
"annual_benefits": {
"yield_improvement": yield_sav["total_yield_savings"],
"aoi_optimization": aoi_sav["annual_savings_usd"],
"supply_chain": supply_sav["total_supply_chain_usd"],
"npi_acceleration": npi_sav["revenue_acceleration_usd"],
"total": round(total_annual_benefit, 0)
},
"costs": {
"year_1_total": total_year1_cost,
"annual_recurring": total_annual_cost
},
"returns": {
"roi_year_1_pct": round(roi_year1, 0),
"roi_year_2_pct": round(roi_year2, 0),
"payback_months": round(payback_months, 1),
"net_benefit_year_1": round(
total_annual_benefit - total_year1_cost, 0)
}
}
# Run the analysis
model = EMSFactoryROIModel(smt_lines=5, boards_per_day=25000)
results = model.full_roi_analysis()
print(f"Factory: {results['factory_profile']['smt_lines']} SMT lines")
print(f"Annual Output: {results['factory_profile']['annual_boards']:,} boards")
print(f"FPY Improvement: {results['factory_profile']['fpy_improvement']}")
print(f"Total Annual Benefits: ${results['annual_benefits']['total']:,.0f}")
print(f"Year 1 Cost: ${results['costs']['year_1_total']:,.0f}")
print(f"Year 1 ROI: {results['returns']['roi_year_1_pct']}%")
print(f"Year 2 ROI: {results['returns']['roi_year_2_pct']}%")
print(f"Payback Period: {results['returns']['payback_months']} months")
Getting Started: Implementation Roadmap
Deploying AI agents across electronics manufacturing should follow a phased approach, starting with the modules that deliver fastest ROI and generate the training data needed for more advanced capabilities:
- Month 1-2: SPI/AOI data integration. Connect SPI and AOI machines to a central data pipeline. Begin collecting structured defect and measurement data. Deploy the false call reduction model using historical verification data.
- Month 3-4: Supply chain intelligence. Import all active BOMs. Configure distributor API connections for real-time lead time and stock monitoring. Establish risk scores for every component and begin alternate part qualification workflows.
- Month 5-6: Process optimization. Deploy the SMT line optimization agent with stencil maintenance prediction, nozzle health monitoring, and reflow profile validation. Connect to pick-and-place machine telemetry feeds.
- Month 7-8: Test engineering and traceability. Integrate ICT/FCT test data. Optimize test sequences. Deploy automated floor life management and compliance verification modules.
- Month 9-12: Closed-loop feedback and NPI acceleration. Connect all agents into a unified factory intelligence layer. Enable closed-loop feedback from AOI to SPI to process parameters. Use accumulated data to accelerate NPI first-article runs.
The critical success factor is data infrastructure first, AI second. Most EMS factories have the data locked in machine vendor silos — Koh Young SPI data in one format, Yamaha pick-and-place logs in another, Viscom AOI results in a third. The first step is always building the unified data layer that lets the AI agent correlate across machines and process steps. Once that foundation is in place, the intelligence layers compound rapidly.
Build Your Own AI Agents for Manufacturing
Get the complete playbook with templates, architecture patterns, and deployment checklists for industrial AI agents.
Playbook — $19