AI Agent for Maritime & Shipping: Automate Fleet Management, Route Optimization & Port Operations

March 28, 2026 15 min read Maritime

The global shipping industry moves over 11 billion tons of cargo annually, yet most maritime operations still rely on spreadsheets, manual weather checks, and phone calls between port agents. Fuel alone accounts for 50-60% of a vessel's operating costs, and a single day of demurrage at a major port can exceed $40,000. These inefficiencies represent a massive opportunity for AI agents that can reason about complex, interconnected maritime systems.

Unlike simple rule-based automation, AI agents for maritime shipping can process real-time weather data, AIS feeds, market indices, and vessel telemetry simultaneously to make decisions that account for dozens of interdependent variables. From optimizing voyage routes around weather systems to predicting hull fouling degradation, these agents deliver measurable ROI from day one.

This guide covers six core areas where AI agents transform maritime operations, with production-ready Python code for each. Whether you manage a single vessel or a fleet of 200, these patterns scale to your operation.

Table of Contents

1. Voyage Planning & Route Optimization

Traditional voyage planning involves a master reviewing weather charts, plotting waypoints on an ECDIS, and manually calculating fuel estimates. An AI agent can evaluate thousands of potential routes simultaneously, factoring in wave height, wind speed, ocean currents, ECA zone boundaries, and CII rating impact to find the optimal path that balances speed, fuel cost, and regulatory compliance.

Weather Routing and Fuel Prediction

The core challenge in weather routing is that ocean conditions change constantly. A route that looks optimal at departure may encounter a developing low-pressure system 3 days into the voyage. AI agents continuously ingest GRIB weather data and recalculate routes every 6 hours, adjusting for wave height thresholds (typically keeping significant wave height below 4 meters for container vessels), wind-assisted propulsion opportunities, and favorable currents.

Fuel consumption prediction requires modeling the interaction between hull fouling (which increases drag over time), vessel draft (laden vs. ballast), speed-power curves specific to each vessel, and real-time sea state. A clean hull at 12 knots in calm water behaves very differently from a hull with 18 months of fouling pushing through 3-meter head seas.

ECA Zone Compliance and CII Optimization

Emission Control Areas (ECAs) in the North Sea, Baltic, and North American coasts enforce sulfur limits of 0.10%, requiring vessels to either switch to Low Sulfur Fuel Oil (LSFO) or operate exhaust gas scrubbers. The AI agent must calculate the cost trade-off: LSFO is more expensive per ton but avoids scrubber maintenance costs and washwater compliance issues. It also needs to time fuel switches precisely at ECA boundaries to minimize the use of premium fuel.

The IMO's Carbon Intensity Indicator (CII) assigns annual ratings from A to E. Vessels rated D for three consecutive years or E in any single year face operational restrictions. The agent tracks each vessel's running CII score and recommends speed adjustments, routing changes, or voyage clustering to maintain at least a C rating throughout the year.

import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Optional
import math

@dataclass
class WeatherCell:
    lat: float
    lon: float
    wave_height: float      # significant wave height (meters)
    wind_speed: float        # knots
    wind_direction: float    # degrees
    current_speed: float     # knots
    current_direction: float # degrees

@dataclass
class VesselProfile:
    name: str
    imo_number: str
    dwt: float
    hull_fouling_factor: float   # 1.0 = clean, 1.25 = 25% drag increase
    speed_power_curve: dict      # {speed_knots: power_kw}
    fuel_type: str               # "VLSFO" or "LSFO"
    scrubber_equipped: bool
    current_cii_score: float     # g CO2 / dwt-nm

@dataclass
class RouteWaypoint:
    lat: float
    lon: float
    eta: Optional[str] = None
    in_eca: bool = False

class VoyageOptimizationAgent:
    """AI agent for weather routing, fuel prediction, and CII optimization."""

    MAX_WAVE_HEIGHT = 4.0        # meters - container vessel threshold
    ECA_SULFUR_LIMIT = 0.001     # 0.10%
    CII_TARGET_RATING = "C"
    FUEL_SWITCH_LEAD_NM = 5      # switch fuel before ECA boundary

    def __init__(self, vessel: VesselProfile, weather_grid: List[WeatherCell]):
        self.vessel = vessel
        self.weather_grid = self._index_weather(weather_grid)

    def _index_weather(self, cells: List[WeatherCell]) -> dict:
        grid = {}
        for cell in cells:
            key = (round(cell.lat, 1), round(cell.lon, 1))
            grid[key] = cell
        return grid

    def calculate_fuel_consumption(self, speed_knots: float,
                                    wave_height: float,
                                    current_component: float) -> float:
        """Predict fuel consumption in MT/day including hull fouling and sea state."""
        base_power = self._interpolate_power(speed_knots)
        effective_speed = speed_knots - current_component

        # Hull fouling penalty: increases required power
        fouled_power = base_power * self.vessel.hull_fouling_factor

        # Sea state added resistance (Kwon's method simplified)
        wave_penalty = 1.0 + (0.03 * wave_height ** 2)
        total_power = fouled_power * wave_penalty

        # SFOC curve: g/kWh varies with engine load
        engine_load = total_power / max(self.vessel.speed_power_curve.values())
        sfoc = self._sfoc_curve(engine_load)

        fuel_mt_per_day = (total_power * sfoc * 24) / 1_000_000
        return round(fuel_mt_per_day, 2)

    def optimize_route(self, origin: RouteWaypoint,
                       destination: RouteWaypoint,
                       target_speed: float) -> dict:
        """Find optimal route considering weather, fuel, ECA, and CII."""
        candidates = self._generate_route_candidates(origin, destination)
        scored_routes = []

        for route in candidates:
            fuel_total = 0
            max_wave = 0
            eca_fuel_cost = 0
            cii_impact = 0

            for i in range(len(route) - 1):
                wp = route[i]
                weather = self._get_weather(wp.lat, wp.lon)
                leg_distance = self._haversine(
                    route[i].lat, route[i].lon,
                    route[i+1].lat, route[i+1].lon
                )
                current = self._resolve_current(weather, wp, route[i+1])
                fuel = self.calculate_fuel_consumption(
                    target_speed, weather.wave_height, current
                )
                leg_days = leg_distance / (target_speed * 24)
                fuel_total += fuel * leg_days
                max_wave = max(max_wave, weather.wave_height)

                if wp.in_eca and not self.vessel.scrubber_equipped:
                    eca_fuel_cost += leg_days * fuel * 120  # LSFO premium

                cii_impact += (fuel * leg_days * 3.114) / (
                    self.vessel.dwt * leg_distance
                )

            # Reject routes exceeding wave threshold
            if max_wave > self.MAX_WAVE_HEIGHT:
                continue

            score = (
                fuel_total * 550           # fuel cost at $550/MT
                + eca_fuel_cost
                + cii_impact * 10000       # CII penalty weighting
            )
            scored_routes.append({
                "route": route,
                "fuel_mt": round(fuel_total, 1),
                "max_wave_height": round(max_wave, 1),
                "eca_extra_cost": round(eca_fuel_cost, 0),
                "cii_delta": round(cii_impact, 4),
                "total_score": round(score, 0)
            })

        scored_routes.sort(key=lambda r: r["total_score"])
        return scored_routes[0] if scored_routes else None

    def _interpolate_power(self, speed: float) -> float:
        speeds = sorted(self.vessel.speed_power_curve.keys())
        for i in range(len(speeds) - 1):
            if speeds[i] <= speed <= speeds[i+1]:
                ratio = (speed - speeds[i]) / (speeds[i+1] - speeds[i])
                p1 = self.vessel.speed_power_curve[speeds[i]]
                p2 = self.vessel.speed_power_curve[speeds[i+1]]
                return p1 + ratio * (p2 - p1)
        return self.vessel.speed_power_curve[speeds[-1]]

    def _sfoc_curve(self, load_pct: float) -> float:
        """Specific fuel oil consumption (g/kWh) vs engine load."""
        if load_pct < 0.25:
            return 195
        elif load_pct < 0.50:
            return 180
        elif load_pct < 0.85:
            return 170
        return 175

    def _haversine(self, lat1, lon1, lat2, lon2) -> float:
        R = 3440.065  # Earth radius in nautical miles
        dlat = math.radians(lat2 - lat1)
        dlon = math.radians(lon2 - lon1)
        a = (math.sin(dlat/2)**2 +
             math.cos(math.radians(lat1)) *
             math.cos(math.radians(lat2)) *
             math.sin(dlon/2)**2)
        return R * 2 * math.asin(math.sqrt(a))

    def _get_weather(self, lat, lon) -> WeatherCell:
        key = (round(lat, 1), round(lon, 1))
        return self.weather_grid.get(key, WeatherCell(lat, lon, 1.5, 10, 0, 0.5, 0))

    def _resolve_current(self, weather, wp1, wp2) -> float:
        bearing = math.atan2(wp2.lon - wp1.lon, wp2.lat - wp1.lat)
        current_angle = math.radians(weather.current_direction) - bearing
        return weather.current_speed * math.cos(current_angle)

    def _generate_route_candidates(self, origin, dest) -> List[List[RouteWaypoint]]:
        """Generate 5 candidate routes with varying lateral offsets."""
        direct = [origin, dest]
        candidates = [direct]
        for offset_nm in [-60, -30, 30, 60]:
            mid_lat = (origin.lat + dest.lat) / 2 + offset_nm / 60
            mid_lon = (origin.lon + dest.lon) / 2
            candidates.append([origin, RouteWaypoint(mid_lat, mid_lon), dest])
        return candidates
Key insight: Hull fouling alone can increase fuel consumption by 15-25% between dry-docks. The agent's fouling factor multiplier lets you track degradation over time and trigger cleaning recommendations when the cost of extra fuel exceeds the cost of hull cleaning.

2. Fleet Management & Vessel Performance

Managing a fleet means monitoring dozens of vessels simultaneously, each with different maintenance cycles, charter commitments, and performance profiles. An AI agent that tracks hull performance degradation, engine health indicators, and market conditions can make proactive decisions that save millions annually.

Hull Performance Monitoring

Speed loss is the primary indicator of hull degradation. By comparing a vessel's actual speed at a given power output against its clean-hull baseline (from sea trials or the last dry-dock), the agent quantifies the speed loss percentage and translates it into a daily fuel penalty in dollars. When the accumulated fuel penalty exceeds the cost of hull cleaning or an early dry-dock, the agent flags the vessel for intervention.

Engine Health Analytics

Modern vessels transmit cylinder pressure readings, exhaust gas temperatures, turbocharger RPM, and scavenge air pressure via satellite. The agent monitors each cylinder's deviation from the fleet mean, detects turbocharger efficiency drops (which precede failures by weeks), and predicts when maintenance is needed. This shifts from calendar-based maintenance to condition-based maintenance, reducing both downtime and spare parts inventory.

Charter Rate Forecasting

Dry bulk and tanker charter rates are notoriously volatile. The agent ingests Baltic Exchange indices, port congestion data, seasonal trade patterns, and newbuilding order books to forecast rates 30-90 days out. This helps commercial teams decide whether to fix a vessel on a time charter or keep it in the spot market.

from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Optional
import statistics

@dataclass
class PerformanceSnapshot:
    timestamp: datetime
    vessel_id: str
    speed_knots: float
    power_kw: float
    fuel_consumption_mt_day: float
    draft_meters: float
    wind_speed_knots: float
    wave_height_m: float

@dataclass
class EngineReading:
    timestamp: datetime
    vessel_id: str
    cylinder_pressures: List[float]   # bar, per cylinder
    exhaust_temps: List[float]        # celsius, per cylinder
    turbo_rpm: float
    turbo_efficiency: float           # percentage
    scavenge_air_pressure: float      # bar

@dataclass
class DryDockPlan:
    vessel_id: str
    last_drydock: datetime
    next_scheduled: datetime
    estimated_cost_usd: float
    hull_cleaning_cost_usd: float

class FleetPerformanceAgent:
    """Monitor hull performance, engine health, and optimize dry-dock timing."""

    SPEED_LOSS_CLEANING_THRESHOLD = 8.0   # % speed loss triggers cleaning
    SPEED_LOSS_DRYDOCK_THRESHOLD = 15.0   # % speed loss triggers early drydock
    TURBO_EFFICIENCY_ALERT = 72.0         # % below this = alert
    EXHAUST_TEMP_DEVIATION = 30.0         # celsius deviation from mean

    def __init__(self, fleet_baselines: Dict[str, dict]):
        self.baselines = fleet_baselines  # {vessel_id: {speed: power_baseline}}
        self.performance_history = {}
        self.engine_history = {}

    def ingest_performance(self, snapshot: PerformanceSnapshot):
        vid = snapshot.vessel_id
        if vid not in self.performance_history:
            self.performance_history[vid] = []
        self.performance_history[vid].append(snapshot)

    def calculate_speed_loss(self, vessel_id: str,
                              window_days: int = 30) -> dict:
        """Calculate hull speed loss vs clean-hull baseline."""
        history = self.performance_history.get(vessel_id, [])
        cutoff = datetime.now() - timedelta(days=window_days)
        recent = [s for s in history if s.timestamp > cutoff]

        if not recent:
            return {"speed_loss_pct": 0, "fuel_penalty_usd_day": 0}

        baseline = self.baselines[vessel_id]
        losses = []
        for snap in recent:
            # Normalize to calm water: correct for wind and wave
            calm_power = snap.power_kw / (1 + 0.03 * snap.wave_height_m ** 2)
            expected_speed = self._baseline_speed(vessel_id, calm_power)
            actual_speed = snap.speed_knots
            if expected_speed > 0:
                loss = ((expected_speed - actual_speed) / expected_speed) * 100
                losses.append(max(0, loss))

        avg_loss = statistics.mean(losses) if losses else 0
        fuel_penalty = self._fuel_penalty_from_speed_loss(vessel_id, avg_loss)

        return {
            "vessel_id": vessel_id,
            "speed_loss_pct": round(avg_loss, 1),
            "fuel_penalty_usd_day": round(fuel_penalty, 0),
            "fuel_penalty_usd_month": round(fuel_penalty * 30, 0),
            "recommendation": self._hull_recommendation(avg_loss),
            "data_points": len(losses)
        }

    def analyze_engine_health(self, reading: EngineReading) -> dict:
        """Detect engine anomalies from cylinder and turbocharger data."""
        alerts = []

        # Cylinder pressure analysis - detect weak cylinders
        mean_pressure = statistics.mean(reading.cylinder_pressures)
        for i, pressure in enumerate(reading.cylinder_pressures):
            deviation = abs(pressure - mean_pressure)
            if deviation > mean_pressure * 0.08:
                alerts.append({
                    "type": "cylinder_pressure",
                    "cylinder": i + 1,
                    "value": pressure,
                    "mean": round(mean_pressure, 1),
                    "severity": "high" if deviation > mean_pressure * 0.15 else "medium"
                })

        # Exhaust temperature spread - indicates injector or valve issues
        mean_temp = statistics.mean(reading.exhaust_temps)
        for i, temp in enumerate(reading.exhaust_temps):
            if abs(temp - mean_temp) > self.EXHAUST_TEMP_DEVIATION:
                alerts.append({
                    "type": "exhaust_temp_deviation",
                    "cylinder": i + 1,
                    "value": temp,
                    "mean": round(mean_temp, 1),
                    "severity": "high" if abs(temp - mean_temp) > 50 else "medium"
                })

        # Turbocharger efficiency degradation
        if reading.turbo_efficiency < self.TURBO_EFFICIENCY_ALERT:
            alerts.append({
                "type": "turbocharger_efficiency",
                "value": reading.turbo_efficiency,
                "threshold": self.TURBO_EFFICIENCY_ALERT,
                "severity": "high",
                "action": "Schedule turbocharger washing within 48 hours"
            })

        return {
            "vessel_id": reading.vessel_id,
            "timestamp": reading.timestamp.isoformat(),
            "alerts": alerts,
            "overall_status": "critical" if any(
                a["severity"] == "high" for a in alerts
            ) else "normal" if not alerts else "watch"
        }

    def optimize_drydock_schedule(self, plans: List[DryDockPlan]) -> List[dict]:
        """Recommend early/delayed dry-docking based on hull performance."""
        recommendations = []
        for plan in plans:
            perf = self.calculate_speed_loss(plan.vessel_id)
            daily_penalty = perf["fuel_penalty_usd_day"]
            days_to_drydock = (plan.next_scheduled - datetime.now()).days

            cost_of_waiting = daily_penalty * days_to_drydock
            cleaning_roi = cost_of_waiting - plan.hull_cleaning_cost_usd

            if (perf["speed_loss_pct"] > self.SPEED_LOSS_DRYDOCK_THRESHOLD
                    and days_to_drydock > 90):
                action = "advance_drydock"
                savings = cost_of_waiting - plan.estimated_cost_usd
            elif cleaning_roi > 0 and days_to_drydock > 180:
                action = "hull_cleaning_now"
                savings = cleaning_roi
            else:
                action = "maintain_schedule"
                savings = 0

            recommendations.append({
                "vessel_id": plan.vessel_id,
                "current_speed_loss": perf["speed_loss_pct"],
                "days_to_scheduled_drydock": days_to_drydock,
                "action": action,
                "estimated_savings_usd": round(savings, 0)
            })
        return recommendations

    def forecast_charter_rate(self, vessel_type: str,
                               historical_rates: List[float],
                               congestion_index: float) -> dict:
        """Simple rate forecast using trend + congestion signal."""
        if len(historical_rates) < 14:
            return {"forecast": historical_rates[-1], "confidence": "low"}
        recent = statistics.mean(historical_rates[-7:])
        prior = statistics.mean(historical_rates[-14:-7])
        trend = (recent - prior) / prior if prior else 0
        congestion_boost = (congestion_index - 50) * 100
        forecast_30d = recent * (1 + trend) + congestion_boost
        return {
            "vessel_type": vessel_type,
            "current_rate_usd_day": round(recent, 0),
            "forecast_30d_usd_day": round(forecast_30d, 0),
            "trend_pct": round(trend * 100, 1),
            "confidence": "medium" if len(historical_rates) > 60 else "low"
        }

    def _baseline_speed(self, vessel_id: str, power_kw: float) -> float:
        baseline = self.baselines.get(vessel_id, {})
        powers = sorted(baseline.keys())
        for i in range(len(powers) - 1):
            if powers[i] <= power_kw <= powers[i+1]:
                ratio = (power_kw - powers[i]) / (powers[i+1] - powers[i])
                return baseline[powers[i]] + ratio * (
                    baseline[powers[i+1]] - baseline[powers[i]]
                )
        return 12.0  # fallback

    def _fuel_penalty_from_speed_loss(self, vessel_id: str,
                                       loss_pct: float) -> float:
        extra_fuel_pct = loss_pct * 1.8  # rough: 1% speed loss ~ 1.8% fuel
        base_consumption = 45  # MT/day average
        extra_mt = base_consumption * (extra_fuel_pct / 100)
        return extra_mt * 550  # USD/MT

    def _hull_recommendation(self, speed_loss: float) -> str:
        if speed_loss > self.SPEED_LOSS_DRYDOCK_THRESHOLD:
            return "URGENT: Consider early dry-dock or propeller polish"
        elif speed_loss > self.SPEED_LOSS_CLEANING_THRESHOLD:
            return "Schedule underwater hull cleaning at next port"
        elif speed_loss > 5.0:
            return "Monitor closely - cleaning recommended within 60 days"
        return "Hull performance within acceptable range"
Key insight: Turbocharger efficiency drops below 72% typically precede complete failure by 2-4 weeks. Catching this early with condition-based monitoring avoids emergency repairs at sea, which cost 3-5x more than planned maintenance in port.

3. Port & Terminal Operations

Port congestion costs the global shipping industry an estimated $22 billion annually. A vessel waiting at anchorage burns 5-8 MT of fuel per day while generating zero revenue. AI agents that optimize berth allocation, container yard planning, crane scheduling, and gate appointments can dramatically reduce turnaround times and unlock additional throughput without physical infrastructure expansion.

Berth Allocation Optimization

Berth allocation must consider vessel length and beam, cargo type compatibility (you cannot berth a chemical tanker next to a passenger ferry), available water depth at different tidal states, crane reach requirements, and landside connectivity. The agent solves this as a constraint satisfaction problem, maximizing berth utilization while respecting safety distances and operational windows.

Container Yard and Crane Scheduling

Every unnecessary container rehandle in the yard costs $30-50 and adds 3-5 minutes of delay. The agent optimizes stacking strategies based on vessel loading sequences, minimizing the number of times containers need to be shuffled. For crane scheduling, it sequences lifts to minimize trolley travel distance and coordinates multiple cranes on the same vessel to avoid interference zones.

from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Optional, Tuple
import heapq

@dataclass
class Berth:
    id: str
    length_m: float
    max_draft_m: float
    crane_count: int
    cargo_types: List[str]   # ["container", "bulk", "tanker"]
    tidal_restriction: bool

@dataclass
class VesselCall:
    vessel_id: str
    vessel_name: str
    loa_m: float             # length overall
    draft_m: float
    cargo_type: str
    container_moves: int     # 0 for non-container
    eta: datetime
    priority: int            # 1=highest (liner), 3=lowest (tramp)
    tide_dependent: bool

@dataclass
class TideWindow:
    start: datetime
    end: datetime
    max_draft_m: float

@dataclass
class TruckAppointment:
    appointment_id: str
    container_id: str
    gate: str
    time_slot: datetime
    direction: str           # "pickup" or "delivery"

class PortOperationsAgent:
    """Optimize berth allocation, yard planning, crane scheduling, and gates."""

    SAFETY_DISTANCE_M = 15       # minimum gap between vessels at berth
    CRANE_MOVES_PER_HOUR = 28    # average STS crane productivity
    REHANDLE_COST_USD = 40       # cost per unnecessary container move
    GATE_SLOT_MINUTES = 30

    def __init__(self, berths: List[Berth], tide_windows: List[TideWindow]):
        self.berths = {b.id: b for b in berths}
        self.tides = tide_windows
        self.berth_schedule = {b.id: [] for b in berths}

    def allocate_berths(self, vessel_calls: List[VesselCall]) -> List[dict]:
        """Assign vessels to berths respecting constraints and priorities."""
        # Sort by priority then ETA
        sorted_calls = sorted(vessel_calls, key=lambda v: (v.priority, v.eta))
        allocations = []

        for call in sorted_calls:
            best_berth = None
            best_score = float("inf")

            for berth_id, berth in self.berths.items():
                # Hard constraints
                if call.loa_m + self.SAFETY_DISTANCE_M > berth.length_m:
                    continue
                if call.cargo_type not in berth.cargo_types:
                    continue
                if call.draft_m > berth.max_draft_m:
                    if not self._has_tide_window(call.eta, call.draft_m):
                        continue

                # Check berth availability at ETA
                service_hours = self._estimate_service_time(call, berth)
                start = call.eta
                end = start + timedelta(hours=service_hours)

                if self._berth_available(berth_id, start, end):
                    # Score: minimize wait time + crane distance
                    wait = max(0, (start - call.eta).total_seconds() / 3600)
                    crane_score = berth.crane_count * 10
                    score = wait * 100 - crane_score
                    if score < best_score:
                        best_score = score
                        best_berth = {
                            "berth_id": berth_id,
                            "start": start,
                            "end": end,
                            "service_hours": service_hours
                        }

            if best_berth:
                self.berth_schedule[best_berth["berth_id"]].append(
                    (best_berth["start"], best_berth["end"], call.vessel_id)
                )
                allocations.append({
                    "vessel_id": call.vessel_id,
                    "vessel_name": call.vessel_name,
                    "berth": best_berth["berth_id"],
                    "berthing_time": best_berth["start"].isoformat(),
                    "departure_time": best_berth["end"].isoformat(),
                    "service_hours": best_berth["service_hours"],
                    "wait_hours": 0
                })
            else:
                allocations.append({
                    "vessel_id": call.vessel_id,
                    "vessel_name": call.vessel_name,
                    "berth": None,
                    "status": "anchorage_queue",
                    "reason": "No berth available matching constraints"
                })

        return allocations

    def optimize_container_stacking(self, containers: List[dict],
                                     loading_sequence: List[str]) -> dict:
        """Minimize rehandles by stacking in reverse loading order."""
        # Build priority map: first to load = top of stack
        priority = {cid: i for i, cid in enumerate(loading_sequence)}
        bays = {}
        rehandles = 0
        max_tier = 5  # max stacking height

        for container in containers:
            cid = container["id"]
            weight_class = container["weight_class"]  # H, M, L
            load_priority = priority.get(cid, 999)

            # Find best bay: minimize expected rehandles
            best_bay = None
            best_cost = float("inf")

            for bay_id, stack in bays.items():
                if len(stack) >= max_tier:
                    continue
                # Cost = number of containers above that load later
                blocking = sum(1 for s in stack if priority.get(s, 999) > load_priority)
                # Weight constraint: heavier on bottom
                weight_ok = self._weight_compatible(weight_class, stack, container)
                cost = blocking + (0 if weight_ok else 100)
                if cost < best_cost:
                    best_cost = cost
                    best_bay = bay_id

            if best_bay is None:
                # Open new bay
                best_bay = f"bay_{len(bays) + 1}"
                bays[best_bay] = []

            bays[best_bay].append(cid)
            rehandles += best_cost if best_cost < 100 else 0

        return {
            "total_containers": len(containers),
            "bays_used": len(bays),
            "estimated_rehandles": rehandles,
            "rehandle_cost_usd": rehandles * self.REHANDLE_COST_USD,
            "avg_stack_height": round(
                sum(len(s) for s in bays.values()) / max(len(bays), 1), 1
            )
        }

    def schedule_gate_appointments(self, appointments: List[TruckAppointment],
                                    gates: int,
                                    capacity_per_slot: int) -> List[dict]:
        """Balance truck arrivals across gates and time slots."""
        slot_load = {}  # (gate, time_slot) -> count
        scheduled = []

        for appt in sorted(appointments, key=lambda a: a.time_slot):
            best_gate = None
            min_load = float("inf")

            for g in range(gates):
                slot_key = (g, appt.time_slot.strftime("%H:%M"))
                current_load = slot_load.get(slot_key, 0)
                if current_load < capacity_per_slot and current_load < min_load:
                    min_load = current_load
                    best_gate = g

            if best_gate is not None:
                slot_key = (best_gate, appt.time_slot.strftime("%H:%M"))
                slot_load[slot_key] = slot_load.get(slot_key, 0) + 1
                scheduled.append({
                    "appointment_id": appt.appointment_id,
                    "container_id": appt.container_id,
                    "assigned_gate": best_gate,
                    "time_slot": appt.time_slot.isoformat(),
                    "status": "confirmed"
                })
            else:
                # Suggest next available slot
                next_slot = appt.time_slot + timedelta(minutes=self.GATE_SLOT_MINUTES)
                scheduled.append({
                    "appointment_id": appt.appointment_id,
                    "container_id": appt.container_id,
                    "assigned_gate": None,
                    "suggested_time": next_slot.isoformat(),
                    "status": "rescheduled"
                })

        return scheduled

    def _estimate_service_time(self, call: VesselCall, berth: Berth) -> float:
        if call.container_moves > 0:
            hours = call.container_moves / (
                self.CRANE_MOVES_PER_HOUR * min(berth.crane_count, 3)
            )
            return max(hours + 2, 6)  # minimum 6 hours including mooring
        return 24  # default for bulk/tanker

    def _berth_available(self, berth_id: str, start: datetime,
                          end: datetime) -> bool:
        for (s, e, _) in self.berth_schedule[berth_id]:
            if start < e and end > s:
                return False
        return True

    def _has_tide_window(self, eta: datetime, draft: float) -> bool:
        for tw in self.tides:
            if tw.start <= eta <= tw.end and tw.max_draft_m >= draft:
                return True
        return False

    def _weight_compatible(self, weight_class, stack, container) -> bool:
        weight_order = {"H": 3, "M": 2, "L": 1}
        if not stack:
            return True
        return weight_order.get(weight_class, 1) <= weight_order.get("M", 2)
Key insight: Berth allocation that accounts for tidal windows can reduce anchorage waiting time by 30-40% at tide-restricted ports. Many operators still plan berths without dynamic tide data, causing vessels to miss windows and wait an extra 12 hours for the next cycle.

4. Cargo & Commercial Operations

Commercial maritime operations involve a web of negotiations, rate predictions, documentation, and physical cargo planning. A single miscalculation in cargo stowage can compromise vessel stability, while missing a demurrage clause deadline can cost tens of thousands of dollars. AI agents bring precision and speed to these high-stakes calculations.

Freight Rate Prediction

Freight rates are driven by supply-demand balance, seasonal patterns (grain season, winter heating oil), geopolitical events, and fleet utilization. The agent monitors Baltic Exchange indices (BDI for dry bulk, BDTI for tankers), port congestion levels, and newbuilding deliveries to generate short-term rate forecasts that inform chartering decisions.

Demurrage and Despatch Calculations

Demurrage (the penalty for exceeding allowed laytime) and despatch (the reward for finishing early) require tracking every hour of a port call against the charter party's laytime calculation. Weather delays, strike exceptions, shifting between berths, and the distinction between "SHINC" (Sundays and Holidays Included) and "SHEX" (Sundays and Holidays Excluded) all affect the calculation. Manual processing is error-prone and routinely contested between owners and charterers.

Cargo Stowage Optimization

The agent calculates cargo distribution to maintain safe stability (GM values), optimal trim (for fuel efficiency), and hull stress within class limits. For container vessels, this means assigning each container to a specific bay, row, and tier while respecting weight limits, reefer plug locations, dangerous goods segregation rules, and stack weight constraints.

from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Tuple
import statistics

@dataclass
class FreightIndex:
    date: datetime
    route: str
    rate_usd: float           # $/day for TC, $/MT for voyage
    index_value: float

@dataclass
class LaytimeEvent:
    timestamp: datetime
    event_type: str           # "arrived", "nor_tendered", "berthed",
                              # "commenced", "completed", "sailed"
    remarks: str
    is_exception: bool        # weather, strike, etc.

@dataclass
class CargoParcel:
    cargo_id: str
    weight_mt: float
    volume_cbm: float
    cargo_type: str
    density: float
    is_dangerous: bool
    dg_class: Optional[str]   # IMO DG class
    reefer: bool
    discharge_port: str

class CargoCommercialAgent:
    """Freight forecasting, demurrage calculation, and stowage optimization."""

    def __init__(self):
        self.rate_history = {}

    def predict_freight_rate(self, route: str,
                              history: List[FreightIndex],
                              vessel_supply_growth: float,
                              cargo_demand_growth: float) -> dict:
        """Forecast freight rate using supply-demand balance and seasonality."""
        rates = [h.rate_usd for h in history if h.route == route]
        if len(rates) < 30:
            return {"error": "Insufficient data", "min_required": 30}

        # Decompose: trend + seasonal + residual
        trend = self._linear_trend(rates[-90:])
        seasonal = self._seasonal_factor(history, route)

        # Supply-demand imbalance signal
        sd_ratio = cargo_demand_growth / max(vessel_supply_growth, 0.01)
        sd_adjustment = (sd_ratio - 1.0) * 0.15  # 15% sensitivity

        current_rate = statistics.mean(rates[-7:])
        forecast_30d = current_rate * (1 + trend + seasonal + sd_adjustment)
        forecast_90d = current_rate * (1 + trend * 3 + seasonal + sd_adjustment * 2)

        # Confidence based on volatility
        volatility = statistics.stdev(rates[-30:]) / current_rate
        confidence = "high" if volatility < 0.1 else "medium" if volatility < 0.2 else "low"

        return {
            "route": route,
            "current_rate": round(current_rate, 0),
            "forecast_30d": round(forecast_30d, 0),
            "forecast_90d": round(forecast_90d, 0),
            "trend_monthly_pct": round(trend * 100, 1),
            "seasonal_factor": round(seasonal * 100, 1),
            "supply_demand_ratio": round(sd_ratio, 2),
            "volatility": round(volatility * 100, 1),
            "confidence": confidence,
            "recommendation": self._chartering_recommendation(
                trend, sd_ratio, confidence
            )
        }

    def calculate_demurrage(self, charter_party: dict,
                             events: List[LaytimeEvent]) -> dict:
        """Calculate demurrage/despatch from laytime events and CP terms."""
        allowed_hours = charter_party["allowed_laytime_hours"]
        demurrage_rate = charter_party["demurrage_usd_per_day"]
        despatch_rate = charter_party.get("despatch_usd_per_day",
                                           demurrage_rate / 2)
        terms = charter_party.get("terms", "SHINC")  # or SHEX

        # Build timeline of counting/non-counting time
        laytime_used = 0
        commenced = False
        last_event_time = None

        for event in sorted(events, key=lambda e: e.timestamp):
            if event.event_type == "commenced":
                commenced = True
                last_event_time = event.timestamp
            elif event.event_type == "completed":
                if commenced and last_event_time:
                    hours = self._count_laytime(
                        last_event_time, event.timestamp, terms
                    )
                    laytime_used += hours
                commenced = False
            elif commenced and event.is_exception:
                # Stop counting during exceptions
                if last_event_time:
                    hours = self._count_laytime(
                        last_event_time, event.timestamp, terms
                    )
                    laytime_used += hours
                    last_event_time = None
            elif commenced and last_event_time is None:
                # Resume after exception
                last_event_time = event.timestamp

        # Calculate result
        time_diff_hours = laytime_used - allowed_hours

        if time_diff_hours > 0:
            demurrage_usd = (time_diff_hours / 24) * demurrage_rate
            return {
                "status": "demurrage",
                "laytime_used_hours": round(laytime_used, 2),
                "allowed_hours": allowed_hours,
                "excess_hours": round(time_diff_hours, 2),
                "amount_usd": round(demurrage_usd, 2),
                "rate_usd_per_day": demurrage_rate,
                "events_processed": len(events)
            }
        else:
            despatch_usd = (abs(time_diff_hours) / 24) * despatch_rate
            return {
                "status": "despatch",
                "laytime_used_hours": round(laytime_used, 2),
                "allowed_hours": allowed_hours,
                "time_saved_hours": round(abs(time_diff_hours), 2),
                "amount_usd": round(despatch_usd, 2),
                "rate_usd_per_day": despatch_rate,
                "events_processed": len(events)
            }

    def optimize_stowage(self, parcels: List[CargoParcel],
                          vessel_capacity: dict) -> dict:
        """Optimize cargo distribution for stability, trim, and stress."""
        max_dwt = vessel_capacity["max_dwt_mt"]
        holds = vessel_capacity["holds"]  # [{id, capacity_mt, capacity_cbm, lcg}]
        target_trim = vessel_capacity.get("optimal_trim_m", -0.5)  # slight stern

        total_weight = sum(p.weight_mt for p in parcels)
        if total_weight > max_dwt:
            return {"error": f"Cargo {total_weight}MT exceeds DWT {max_dwt}MT"}

        # Sort parcels: heavy first, DG separated
        sorted_parcels = sorted(parcels, key=lambda p: -p.weight_mt)

        allocation = {h["id"]: [] for h in holds}
        hold_weights = {h["id"]: 0 for h in holds}
        hold_volumes = {h["id"]: 0 for h in holds}
        dg_holds = set()

        for parcel in sorted_parcels:
            best_hold = None
            best_trim_score = float("inf")

            for hold in holds:
                hid = hold["id"]
                # Capacity check
                if (hold_weights[hid] + parcel.weight_mt > hold["capacity_mt"]
                        or hold_volumes[hid] + parcel.volume_cbm > hold["capacity_cbm"]):
                    continue

                # DG segregation: no two DG classes in same hold
                if parcel.is_dangerous and hid in dg_holds:
                    continue

                # Trim impact: weight * lever arm from midship
                new_weight = hold_weights[hid] + parcel.weight_mt
                trim_contribution = new_weight * hold["lcg"]
                total_moment = sum(
                    hold_weights[h["id"]] * h["lcg"] for h in holds
                ) + parcel.weight_mt * hold["lcg"]
                est_trim = total_moment / max(total_weight, 1) * 0.01
                trim_score = abs(est_trim - target_trim)

                if trim_score < best_trim_score:
                    best_trim_score = trim_score
                    best_hold = hid

            if best_hold:
                allocation[best_hold].append(parcel.cargo_id)
                hold_weights[best_hold] += parcel.weight_mt
                hold_volumes[best_hold] += parcel.volume_cbm
                if parcel.is_dangerous:
                    dg_holds.add(best_hold)

        # Calculate final trim and GM
        total_moment = sum(
            hold_weights[h["id"]] * h["lcg"] for h in holds
        )
        final_trim = total_moment / max(total_weight, 1) * 0.01

        return {
            "allocation": {k: v for k, v in allocation.items() if v},
            "hold_utilization": {
                hid: {
                    "weight_mt": round(hold_weights[hid], 1),
                    "weight_pct": round(
                        hold_weights[hid] / h["capacity_mt"] * 100, 1
                    ),
                    "volume_pct": round(
                        hold_volumes[hid] / h["capacity_cbm"] * 100, 1
                    )
                }
                for h in holds for hid in [h["id"]]
            },
            "estimated_trim_m": round(final_trim, 2),
            "target_trim_m": target_trim,
            "total_cargo_mt": round(total_weight, 1),
            "dg_holds": list(dg_holds)
        }

    def _linear_trend(self, rates: List[float]) -> float:
        n = len(rates)
        if n < 2:
            return 0
        x_mean = (n - 1) / 2
        y_mean = statistics.mean(rates)
        num = sum((i - x_mean) * (r - y_mean) for i, r in enumerate(rates))
        den = sum((i - x_mean) ** 2 for i in range(n))
        slope = num / den if den else 0
        return slope / y_mean  # normalized monthly trend

    def _seasonal_factor(self, history: List[FreightIndex], route: str) -> float:
        monthly = {}
        for h in history:
            if h.route == route:
                m = h.date.month
                monthly.setdefault(m, []).append(h.rate_usd)
        if not monthly:
            return 0
        overall_mean = statistics.mean(
            r for rates in monthly.values() for r in rates
        )
        current_month = datetime.now().month
        month_mean = statistics.mean(monthly.get(current_month, [overall_mean]))
        return (month_mean - overall_mean) / overall_mean

    def _count_laytime(self, start: datetime, end: datetime,
                        terms: str) -> float:
        if terms == "SHINC":
            return (end - start).total_seconds() / 3600
        # SHEX: exclude Sundays and holidays
        hours = 0
        current = start
        while current < end:
            if current.weekday() != 6:  # not Sunday
                next_hour = min(current + timedelta(hours=1), end)
                hours += (next_hour - current).total_seconds() / 3600
            current += timedelta(hours=1)
        return hours

    def _chartering_recommendation(self, trend, sd_ratio, confidence) -> str:
        if trend > 0.05 and sd_ratio > 1.1:
            return "LOCK IN: Fix vessels on TC now, rates likely rising"
        elif trend < -0.05 and sd_ratio < 0.9:
            return "WAIT: Spot market improving, defer TC commitments"
        return "NEUTRAL: Monitor market, no strong directional signal"
Key insight: Automated demurrage calculation catches errors that manual processing misses. In practice, 15-20% of demurrage claims have discrepancies in laytime counting, especially around SHEX/SHINC terms and weather exception periods. The agent's audit trail provides indisputable documentation for claim resolution.

5. Maritime Safety & Compliance

Maritime safety regulations are some of the most stringent in any industry, and for good reason. The consequences of failure include loss of life, environmental catastrophe, and multimillion-dollar liabilities. AI agents add a layer of continuous monitoring that supplements human judgment without replacing the master's authority.

Collision Avoidance and AIS Anomaly Detection

COLREG (International Regulations for Preventing Collisions at Sea) define the rules of the road, but applying them in real-time with multiple crossing, overtaking, and head-on situations requires rapid calculation of CPA (Closest Point of Approach) and TCPA (Time to CPA). The agent processes AIS data from surrounding vessels, calculates collision risk, and recommends course or speed alterations that comply with COLREG rules 13-17. It also detects AIS anomalies (spoofing, gaps, impossible speed changes) that may indicate illicit activity.

ISM/ISPS Compliance and Crew Certification

The International Safety Management (ISM) Code and International Ship and Port Facility Security (ISPS) Code require ongoing documentation, drills, audits, and certifications. STCW (Standards of Training, Certification and Watchkeeping) mandates that every crew member holds valid certificates for their role. The agent tracks expiry dates, flag state requirements, and generates alerts before certificates lapse, preventing costly detentions during Port State Control (PSC) inspections.

import math
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict, Optional

@dataclass
class AISTarget:
    mmsi: str
    vessel_name: str
    lat: float
    lon: float
    cog: float          # course over ground, degrees
    sog: float          # speed over ground, knots
    heading: float
    nav_status: int     # 0=underway, 1=at anchor, 5=moored
    timestamp: datetime

@dataclass
class CrewCertificate:
    crew_id: str
    crew_name: str
    cert_type: str       # "STCW", "medical", "passport", "endorsement"
    issuing_authority: str
    issue_date: datetime
    expiry_date: datetime
    rank: str

@dataclass
class PSCDeficiency:
    category: str
    description: str
    severity: str        # "detainable", "non-detainable"
    ism_code_ref: str

class MaritimeSafetyAgent:
    """Collision avoidance, compliance monitoring, and PSC readiness."""

    CPA_WARNING_NM = 2.0       # closest point of approach warning
    CPA_DANGER_NM = 0.5        # immediate danger threshold
    TCPA_HORIZON_MIN = 30      # look-ahead time in minutes
    CERT_WARNING_DAYS = 90     # warn this many days before expiry

    def __init__(self, own_vessel: AISTarget):
        self.own_vessel = own_vessel
        self.targets = {}
        self.collision_log = []

    def update_ais_targets(self, targets: List[AISTarget]):
        for t in targets:
            # Anomaly detection: check for impossible values
            if t.mmsi in self.targets:
                prev = self.targets[t.mmsi]
                anomalies = self._detect_ais_anomalies(prev, t)
                if anomalies:
                    self.collision_log.append({
                        "type": "ais_anomaly",
                        "mmsi": t.mmsi,
                        "vessel": t.vessel_name,
                        "anomalies": anomalies,
                        "timestamp": t.timestamp.isoformat()
                    })
            self.targets[t.mmsi] = t

    def assess_collision_risk(self) -> List[dict]:
        """Calculate CPA/TCPA for all targets and recommend COLREG actions."""
        risks = []

        for mmsi, target in self.targets.items():
            cpa, tcpa = self._calculate_cpa_tcpa(self.own_vessel, target)

            if tcpa < 0:   # target moving away
                continue
            if tcpa > self.TCPA_HORIZON_MIN:
                continue

            risk_level = "safe"
            if cpa < self.CPA_DANGER_NM:
                risk_level = "danger"
            elif cpa < self.CPA_WARNING_NM:
                risk_level = "warning"

            if risk_level != "safe":
                situation = self._classify_encounter(self.own_vessel, target)
                action = self._colreg_recommendation(situation, cpa, tcpa)

                risks.append({
                    "mmsi": mmsi,
                    "vessel_name": target.vessel_name,
                    "cpa_nm": round(cpa, 2),
                    "tcpa_minutes": round(tcpa, 1),
                    "bearing": round(self._bearing_to(
                        self.own_vessel.lat, self.own_vessel.lon,
                        target.lat, target.lon
                    ), 1),
                    "risk_level": risk_level,
                    "encounter_type": situation,
                    "recommended_action": action
                })

        return sorted(risks, key=lambda r: r["cpa_nm"])

    def check_crew_compliance(self, certificates: List[CrewCertificate],
                                flag_state: str) -> dict:
        """Audit crew certifications for STCW compliance."""
        now = datetime.now()
        expired = []
        expiring_soon = []
        valid = []

        for cert in certificates:
            days_remaining = (cert.expiry_date - now).days

            if days_remaining < 0:
                expired.append({
                    "crew": cert.crew_name,
                    "rank": cert.rank,
                    "cert_type": cert.cert_type,
                    "expired_date": cert.expiry_date.strftime("%Y-%m-%d"),
                    "days_overdue": abs(days_remaining),
                    "severity": "critical"
                })
            elif days_remaining < self.CERT_WARNING_DAYS:
                expiring_soon.append({
                    "crew": cert.crew_name,
                    "rank": cert.rank,
                    "cert_type": cert.cert_type,
                    "expiry_date": cert.expiry_date.strftime("%Y-%m-%d"),
                    "days_remaining": days_remaining,
                    "severity": "warning"
                })
            else:
                valid.append(cert.crew_name)

        # PSC detention risk assessment
        detention_risk = "high" if expired else "medium" if expiring_soon else "low"

        return {
            "flag_state": flag_state,
            "total_certificates": len(certificates),
            "expired": expired,
            "expiring_within_90d": expiring_soon,
            "valid_count": len(valid),
            "detention_risk": detention_risk,
            "recommended_actions": self._cert_recommendations(expired, expiring_soon)
        }

    def psc_readiness_score(self, deficiencies: List[PSCDeficiency],
                             last_inspection: datetime,
                             open_ncrs: int) -> dict:
        """Score vessel's readiness for Port State Control inspection."""
        now = datetime.now()
        months_since_inspection = (now - last_inspection).days / 30

        # Base score starts at 100
        score = 100

        # Deductions for deficiencies
        for d in deficiencies:
            if d.severity == "detainable":
                score -= 25
            else:
                score -= 10

        # Deduction for open non-conformities
        score -= open_ncrs * 8

        # Bonus/penalty for inspection recency
        if months_since_inspection > 12:
            score -= 15
        elif months_since_inspection < 3:
            score += 5

        score = max(0, min(100, score))
        rating = (
            "excellent" if score >= 85 else
            "good" if score >= 70 else
            "needs_attention" if score >= 50 else
            "high_risk"
        )

        return {
            "readiness_score": score,
            "rating": rating,
            "detainable_deficiencies": sum(
                1 for d in deficiencies if d.severity == "detainable"
            ),
            "total_deficiencies": len(deficiencies),
            "open_ncrs": open_ncrs,
            "months_since_last_psc": round(months_since_inspection, 1),
            "priority_fixes": [
                d.description for d in deficiencies if d.severity == "detainable"
            ]
        }

    def _calculate_cpa_tcpa(self, own: AISTarget,
                              target: AISTarget) -> Tuple[float, float]:
        """Calculate CPA and TCPA using relative motion vectors."""
        # Convert to relative position and velocity
        dx = (target.lon - own.lon) * 60 * math.cos(math.radians(own.lat))
        dy = (target.lat - own.lat) * 60  # in nautical miles

        own_vx = own.sog * math.sin(math.radians(own.cog))
        own_vy = own.sog * math.cos(math.radians(own.cog))
        tgt_vx = target.sog * math.sin(math.radians(target.cog))
        tgt_vy = target.sog * math.cos(math.radians(target.cog))

        rel_vx = tgt_vx - own_vx
        rel_vy = tgt_vy - own_vy
        rel_speed_sq = rel_vx ** 2 + rel_vy ** 2

        if rel_speed_sq < 0.001:
            return (math.sqrt(dx**2 + dy**2), float("inf"))

        tcpa_hours = -(dx * rel_vx + dy * rel_vy) / rel_speed_sq
        tcpa_minutes = tcpa_hours * 60

        cpa_x = dx + rel_vx * tcpa_hours
        cpa_y = dy + rel_vy * tcpa_hours
        cpa = math.sqrt(cpa_x**2 + cpa_y**2)

        return (cpa, tcpa_minutes)

    def _classify_encounter(self, own: AISTarget,
                              target: AISTarget) -> str:
        relative_bearing = self._bearing_to(
            own.lat, own.lon, target.lat, target.lon
        )
        angle_diff = abs(own.cog - target.cog)
        if angle_diff > 180:
            angle_diff = 360 - angle_diff

        if angle_diff > 160:
            return "head_on"
        elif angle_diff < 30:
            return "overtaking"
        elif 10 < relative_bearing < 112.5:
            return "crossing_give_way"
        else:
            return "crossing_stand_on"

    def _colreg_recommendation(self, situation: str,
                                cpa: float, tcpa: float) -> str:
        if situation == "head_on":
            return "Rule 14: Alter course to STARBOARD"
        elif situation == "crossing_give_way":
            return "Rule 15: Alter course to STARBOARD, pass astern of target"
        elif situation == "overtaking":
            return "Rule 13: Keep clear, alter to port or starboard"
        elif situation == "crossing_stand_on":
            if cpa < self.CPA_DANGER_NM:
                return "Rule 17(b): STAND ON vessel must take action NOW"
            return "Rule 17(a): Maintain course and speed, monitor closely"
        return "Monitor situation"

    def _bearing_to(self, lat1, lon1, lat2, lon2) -> float:
        dlon = math.radians(lon2 - lon1)
        lat1r, lat2r = math.radians(lat1), math.radians(lat2)
        x = math.sin(dlon) * math.cos(lat2r)
        y = (math.cos(lat1r) * math.sin(lat2r) -
             math.sin(lat1r) * math.cos(lat2r) * math.cos(dlon))
        return (math.degrees(math.atan2(x, y)) + 360) % 360

    def _detect_ais_anomalies(self, prev: AISTarget,
                                curr: AISTarget) -> List[str]:
        anomalies = []
        dt_hours = (curr.timestamp - prev.timestamp).total_seconds() / 3600
        if dt_hours <= 0:
            return anomalies

        dist = math.sqrt(
            ((curr.lon - prev.lon) * 60 * math.cos(math.radians(curr.lat)))**2
            + ((curr.lat - prev.lat) * 60)**2
        )
        implied_speed = dist / dt_hours if dt_hours > 0 else 0

        if implied_speed > 50:
            anomalies.append(f"Position jump: implied {implied_speed:.0f}kn")
        if abs(curr.sog - prev.sog) > 10 and dt_hours < 0.1:
            anomalies.append(f"Speed jump: {prev.sog}→{curr.sog}kn in {dt_hours*60:.0f}min")
        if dt_hours > 2:
            anomalies.append(f"AIS gap: {dt_hours:.1f} hours without transmission")

        return anomalies

    def _cert_recommendations(self, expired, expiring) -> List[str]:
        actions = []
        if expired:
            actions.append(
                f"CRITICAL: {len(expired)} expired cert(s) — arrange renewal before next port"
            )
        if expiring:
            actions.append(
                f"Schedule renewal for {len(expiring)} cert(s) expiring within 90 days"
            )
        if not expired and not expiring:
            actions.append("All certifications current — no action required")
        return actions
Key insight: AIS anomaly detection catches more than just safety threats. Position jumps and transmission gaps correlate with sanctions evasion (dark fleet operations), illegal fishing, and ship-to-ship transfers. Flagging these patterns proactively protects the operator from compliance risk.

6. ROI Analysis: 20-Vessel Fleet

The real question for any shipping company evaluating AI agents is: what is the return on investment? Below is a detailed breakdown for a mid-size fleet of 20 bulk carriers (Supramax class, ~58,000 DWT each), operating primarily on Atlantic and Pacific trade routes.

Assumptions

Category Improvement Annual Savings (Fleet)
Weather Routing Fuel Savings 3-5% fuel reduction $1,848,000 - $3,080,000
Hull Performance Optimization Timely cleaning saves 8-12% fuel penalty $985,600 - $1,478,400
CII Rating Protection Avoid operational restrictions on 2-3 vessels $720,000 - $1,080,000
Port Turnaround Reduction 0.5-1 day saved per port call $810,000 - $1,620,000
Demurrage Recovery 15% reduction in demurrage paid $360,000 - $540,000
Charter Rate Optimization 2-3% better fixture rates $730,800 - $1,096,200
PSC Detention Avoidance Prevent 1-2 detentions/year $150,000 - $300,000
Engine Maintenance Optimization Condition-based vs calendar-based $400,000 - $600,000
Total Annual Savings $6,004,400 - $9,794,600

Implementation Cost vs. Return

from dataclasses import dataclass
from typing import List

@dataclass
class FleetROIModel:
    """Calculate ROI for AI agent deployment across a shipping fleet."""

    fleet_size: int = 20
    avg_dwt: float = 58000
    fuel_mt_per_day: float = 32
    fuel_cost_usd: float = 550
    sailing_days_year: int = 280
    port_days_year: int = 45
    tce_usd_day: float = 18000
    demurrage_events_year: int = 6
    avg_demurrage_usd: float = 45000

    def calculate_fuel_savings(self, optimization_pct: float = 0.04) -> dict:
        """Weather routing + CII optimization fuel reduction."""
        annual_fuel_per_vessel = self.fuel_mt_per_day * self.sailing_days_year
        annual_fuel_fleet = annual_fuel_per_vessel * self.fleet_size
        savings_mt = annual_fuel_fleet * optimization_pct
        savings_usd = savings_mt * self.fuel_cost_usd
        co2_reduction = savings_mt * 3.114  # MT CO2 per MT fuel
        return {
            "fuel_saved_mt": round(savings_mt, 0),
            "cost_saved_usd": round(savings_usd, 0),
            "co2_reduced_mt": round(co2_reduction, 0),
            "per_vessel_usd": round(savings_usd / self.fleet_size, 0)
        }

    def calculate_hull_optimization(self, cleaning_saves_pct: float = 0.10) -> dict:
        """ROI from timely hull cleaning triggered by performance monitoring."""
        annual_fuel_cost = (self.fuel_mt_per_day * self.sailing_days_year
                            * self.fuel_cost_usd)
        savings_per_vessel = annual_fuel_cost * cleaning_saves_pct * 0.5
        cleaning_cost = 50000   # per vessel per cleaning
        net_per_vessel = savings_per_vessel - cleaning_cost
        return {
            "gross_savings_per_vessel": round(savings_per_vessel, 0),
            "cleaning_cost": cleaning_cost,
            "net_savings_per_vessel": round(net_per_vessel, 0),
            "fleet_net_savings": round(net_per_vessel * self.fleet_size, 0)
        }

    def calculate_port_efficiency(self, days_saved: float = 0.75) -> dict:
        """Revenue from reduced port turnaround time."""
        calls_per_year = self.port_days_year / 3   # avg 3 days per call
        extra_earning_days = days_saved * calls_per_year
        revenue_per_vessel = extra_earning_days * self.tce_usd_day
        return {
            "extra_earning_days_per_vessel": round(extra_earning_days, 1),
            "revenue_per_vessel": round(revenue_per_vessel, 0),
            "fleet_revenue": round(revenue_per_vessel * self.fleet_size, 0)
        }

    def calculate_commercial_gains(self, rate_improvement_pct: float = 0.025,
                                    demurrage_reduction_pct: float = 0.15) -> dict:
        """Charter rate optimization + demurrage reduction."""
        annual_tce = self.tce_usd_day * self.sailing_days_year
        rate_gain = annual_tce * rate_improvement_pct * self.fleet_size
        demurrage_saved = (self.avg_demurrage_usd * self.demurrage_events_year
                           * demurrage_reduction_pct * self.fleet_size)
        return {
            "charter_rate_gain_usd": round(rate_gain, 0),
            "demurrage_saved_usd": round(demurrage_saved, 0),
            "total_commercial_gain": round(rate_gain + demurrage_saved, 0)
        }

    def full_roi_analysis(self) -> dict:
        """Complete ROI model for AI agent fleet deployment."""
        fuel = self.calculate_fuel_savings()
        hull = self.calculate_hull_optimization()
        port = self.calculate_port_efficiency()
        commercial = self.calculate_commercial_gains()

        # Implementation costs
        setup_cost = 250000          # initial integration + customization
        annual_license = 120000      # SaaS platform
        annual_support = 60000       # technical support + training
        total_annual_cost = annual_license + annual_support
        total_year1_cost = setup_cost + total_annual_cost

        total_annual_benefit = (
            fuel["cost_saved_usd"]
            + hull["fleet_net_savings"]
            + port["fleet_revenue"]
            + commercial["total_commercial_gain"]
        )

        roi_year1 = ((total_annual_benefit - total_year1_cost)
                      / total_year1_cost) * 100
        roi_year2 = ((total_annual_benefit - total_annual_cost)
                      / total_annual_cost) * 100
        payback_months = (total_year1_cost / total_annual_benefit) * 12

        return {
            "fleet_size": self.fleet_size,
            "annual_benefits": {
                "fuel_optimization": fuel["cost_saved_usd"],
                "hull_performance": hull["fleet_net_savings"],
                "port_efficiency": port["fleet_revenue"],
                "commercial_gains": commercial["total_commercial_gain"],
                "total": round(total_annual_benefit, 0)
            },
            "costs": {
                "year_1_total": total_year1_cost,
                "annual_recurring": total_annual_cost
            },
            "returns": {
                "roi_year_1_pct": round(roi_year1, 0),
                "roi_year_2_pct": round(roi_year2, 0),
                "payback_months": round(payback_months, 1),
                "net_benefit_year_1": round(
                    total_annual_benefit - total_year1_cost, 0
                ),
                "co2_reduction_mt": fuel["co2_reduced_mt"]
            }
        }

# Run the analysis
model = FleetROIModel(fleet_size=20)
results = model.full_roi_analysis()

print(f"Fleet: {results['fleet_size']} vessels")
print(f"Total Annual Benefits: ${results['annual_benefits']['total']:,.0f}")
print(f"Year 1 Cost: ${results['costs']['year_1_total']:,.0f}")
print(f"Year 1 ROI: {results['returns']['roi_year_1_pct']}%")
print(f"Year 2 ROI: {results['returns']['roi_year_2_pct']}%")
print(f"Payback Period: {results['returns']['payback_months']} months")
print(f"CO2 Reduced: {results['returns']['co2_reduction_mt']:,.0f} MT")
Bottom line: A 20-vessel fleet investing $430,000 in year one (setup + annual costs) can expect $6-10M in annual benefits, yielding a payback period under 1 month and year-2 ROI exceeding 3,000%. Even at conservative estimates using only half the projected savings, the investment pays for itself within the first quarter.

Getting Started: Implementation Roadmap

Deploying AI agents across maritime operations does not require a big-bang approach. Start with the highest-impact, lowest-risk module and expand from there:

  1. Month 1-2: Weather routing and fuel monitoring. Connect AIS and noon report data feeds. Deploy the voyage optimization agent on 3-5 vessels as a pilot. Measure fuel savings against historical baselines.
  2. Month 3-4: Hull performance tracking. Ingest speed-power data from the fleet. Establish clean-hull baselines. Set up automated cleaning recommendations.
  3. Month 5-6: Port operations and commercial agents. Integrate with port management systems. Deploy demurrage calculation automation. Begin charter rate forecasting.
  4. Month 7-8: Safety and compliance modules. Roll out crew certification tracking. Deploy PSC readiness scoring. Add collision avoidance monitoring for high-traffic routes.
  5. Month 9-12: Full integration and optimization. Connect all agents into a unified fleet intelligence platform. Fine-tune models with accumulated operational data. Expand to the full fleet.

The key to success is treating each agent as an advisory system that augments human decision-making rather than replacing it. The master retains full authority on the bridge, the superintendent makes the final call on maintenance scheduling, and the commercial team decides on fixtures. The AI agent provides data-driven recommendations that make those decisions faster and more informed.

Stay Updated on AI Agents for Maritime & Shipping

Get weekly insights on AI automation for logistics, shipping, and supply chain operations.

Subscribe to Newsletter