AI Agent for Cleaning Services: Automate Scheduling, Quality Tracking & Client Management

March 28, 2026 14 min read Cleaning Services

The commercial cleaning industry generates over $90 billion annually in the US alone, yet most operators still dispatch crews with spreadsheets, track quality through clipboard checklists, and lose 15-25% of revenue to inefficient routing and unplanned callbacks. A regional company managing 200 contracts typically wastes 2-3 hours per day in unnecessary drive time, misses SLA deadlines on 8-12% of visits, and loses 18-22% of clients annually to service inconsistency.

AI agents built for cleaning operations can reason about interconnected variables that humans struggle to optimize simultaneously: crew skill sets, building access windows, traffic patterns, chemical drying times, equipment availability, and client-specific preferences. Unlike static scheduling software, these agents continuously adapt to cancellations, emergency requests, weather disruptions, and real-time crew availability.

This guide covers six core areas where AI agents transform cleaning operations, with production-ready Python code for each module. Whether you run a 10-person residential crew or a 200-contract commercial operation, these patterns scale to your business.

Table of Contents

1. Route & Schedule Optimization

Route optimization for cleaning services is a specialized variant of the Vehicle Routing Problem (VRP) with time windows. Unlike delivery logistics where stops take seconds, cleaning jobs range from 30 minutes to 8 hours, and each site has strict access constraints: office buildings can only be cleaned after 6 PM, medical facilities require daytime cleaning during low-patient hours, and retail spaces need service before opening at 9 AM. The AI agent must solve this as a multi-constraint optimization that minimizes total drive time while respecting every site's access window.

Travel time minimization alone typically saves 18-25% of daily drive time for a regional operator. But the real gains come from frequency optimization: not every site needs the same cleaning schedule. High-traffic lobbies might need daily service while executive floors need only twice weekly. Sensor data from foot traffic counters and IoT-enabled trash bins can trigger cleanings based on actual need rather than fixed schedules, reducing total service hours by 12-20% without impacting quality.

Seasonal demand adjustment is equally critical. Schools need deep cleaning during summer breaks, office buildings spike before holiday parties, and construction cleanup creates unpredictable surges. The agent forecasts these patterns from historical data and pre-positions crews accordingly, while maintaining an emergency dispatch queue for urgent requests like flood cleanup or post-event sanitation.

import math
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict
from datetime import datetime, time, timedelta
import heapq

@dataclass
class CleaningSite:
    site_id: str
    name: str
    lat: float
    lon: float
    square_footage: int
    cleaning_hours: float          # estimated time to clean
    access_start: time             # earliest access (e.g., 18:00)
    access_end: time               # latest completion (e.g., 06:00)
    frequency_per_week: int        # contracted visits
    priority: int                  # 1=critical (hospital), 3=flexible
    requires_skills: List[str]     # ["floor_stripping", "biohazard", etc.]
    last_cleaned: Optional[datetime] = None

@dataclass
class CrewUnit:
    crew_id: str
    members: int
    skills: List[str]
    home_base_lat: float
    home_base_lon: float
    shift_start: time
    shift_end: time
    hourly_rate: float
    vehicle_type: str              # "van", "truck"

class RouteScheduleAgent:
    """AI agent for multi-site cleaning route and schedule optimization."""

    DRIVE_SPEED_MPH = 25           # average urban speed
    MAX_OVERTIME_HOURS = 2.0
    OVERTIME_MULTIPLIER = 1.5
    EMERGENCY_BUFFER_PCT = 0.15    # reserve 15% capacity for emergencies

    def __init__(self, sites: List[CleaningSite], crews: List[CrewUnit]):
        self.sites = {s.site_id: s for s in sites}
        self.crews = crews
        self.distance_cache = {}

    def _haversine_miles(self, lat1, lon1, lat2, lon2) -> float:
        R = 3958.8  # Earth radius in miles
        dlat = math.radians(lat2 - lat1)
        dlon = math.radians(lon2 - lon1)
        a = (math.sin(dlat/2)**2 +
             math.cos(math.radians(lat1)) *
             math.cos(math.radians(lat2)) *
             math.sin(dlon/2)**2)
        return R * 2 * math.asin(math.sqrt(a))

    def _travel_time_hours(self, lat1, lon1, lat2, lon2) -> float:
        dist = self._haversine_miles(lat1, lon1, lat2, lon2)
        return dist / self.DRIVE_SPEED_MPH

    def optimize_daily_routes(self, date: datetime) -> Dict[str, dict]:
        """Assign sites to crews and order stops to minimize total drive time."""
        weekday = date.weekday()
        due_sites = self._get_due_sites(date, weekday)

        # Greedy nearest-neighbor with time window constraints
        assignments = {c.crew_id: [] for c in self.crews}
        crew_time = {c.crew_id: 0.0 for c in self.crews}
        crew_pos = {c.crew_id: (c.home_base_lat, c.home_base_lon)
                    for c in self.crews}

        available_capacity = {
            c.crew_id: self._shift_hours(c) * (1 - self.EMERGENCY_BUFFER_PCT)
            for c in self.crews
        }

        # Sort sites by priority then access window tightness
        ranked_sites = sorted(due_sites, key=lambda s: (
            s.priority,
            self._window_flexibility(s)
        ))

        for site in ranked_sites:
            best_crew = None
            best_cost = float("inf")

            for crew in self.crews:
                cid = crew.crew_id
                if not self._crew_qualified(crew, site):
                    continue
                if crew_time[cid] + site.cleaning_hours > available_capacity[cid]:
                    continue

                travel = self._travel_time_hours(
                    *crew_pos[cid], site.lat, site.lon
                )
                cost = travel + site.cleaning_hours * 0.1  # slight job-time weight
                if cost < best_cost:
                    best_cost = cost
                    best_crew = crew

            if best_crew:
                cid = best_crew.crew_id
                travel = self._travel_time_hours(
                    *crew_pos[cid], site.lat, site.lon
                )
                assignments[cid].append({
                    "site": site,
                    "travel_hours": round(travel, 2),
                    "clean_hours": site.cleaning_hours
                })
                crew_time[cid] += travel + site.cleaning_hours
                crew_pos[cid] = (site.lat, site.lon)

        # Calculate totals
        total_drive = sum(
            sum(s["travel_hours"] for s in stops)
            for stops in assignments.values()
        )
        total_clean = sum(
            sum(s["clean_hours"] for s in stops)
            for stops in assignments.values()
        )

        return {
            "date": date.isoformat(),
            "assignments": {
                cid: {
                    "stops": len(stops),
                    "drive_hours": round(sum(s["travel_hours"] for s in stops), 2),
                    "clean_hours": round(sum(s["clean_hours"] for s in stops), 2),
                    "route": [s["site"].name for s in stops]
                }
                for cid, stops in assignments.items() if stops
            },
            "total_drive_hours": round(total_drive, 2),
            "total_clean_hours": round(total_clean, 2),
            "efficiency_pct": round(
                total_clean / (total_clean + total_drive) * 100, 1
            ) if (total_clean + total_drive) > 0 else 0,
            "unassigned_sites": len(ranked_sites) - sum(
                len(s) for s in assignments.values()
            )
        }

    def trigger_sensor_based_clean(self, site_id: str,
                                    foot_traffic: int,
                                    trash_level_pct: float) -> dict:
        """Decide if a site needs unscheduled cleaning based on sensor data."""
        site = self.sites[site_id]
        days_since = (datetime.now() - site.last_cleaned).days if site.last_cleaned else 7
        urgency_score = (
            foot_traffic / 1000 * 0.4 +
            trash_level_pct * 0.35 +
            days_since / 7 * 0.25
        )
        return {
            "site_id": site_id,
            "urgency_score": round(urgency_score, 2),
            "trigger_clean": urgency_score > 0.7,
            "reason": (
                "high_traffic" if foot_traffic > 2000
                else "trash_full" if trash_level_pct > 0.85
                else "overdue" if days_since > 5
                else "normal"
            )
        }

    def _get_due_sites(self, date, weekday) -> List[CleaningSite]:
        due = []
        for site in self.sites.values():
            interval = 7 / max(site.frequency_per_week, 1)
            if site.last_cleaned:
                days_since = (date - site.last_cleaned).days
                if days_since >= interval - 0.5:
                    due.append(site)
            else:
                due.append(site)
        return due

    def _shift_hours(self, crew: CrewUnit) -> float:
        start = crew.shift_start.hour + crew.shift_start.minute / 60
        end = crew.shift_end.hour + crew.shift_end.minute / 60
        if end < start:
            end += 24
        return end - start

    def _crew_qualified(self, crew: CrewUnit, site: CleaningSite) -> bool:
        return all(skill in crew.skills for skill in site.requires_skills)

    def _window_flexibility(self, site: CleaningSite) -> float:
        start = site.access_start.hour + site.access_start.minute / 60
        end = site.access_end.hour + site.access_end.minute / 60
        if end < start:
            end += 24
        return end - start - site.cleaning_hours
Key insight: Sensor-triggered cleaning based on foot traffic and trash levels reduces total service hours by 12-20% compared to fixed schedules while maintaining higher average cleanliness scores. The ROI on IoT sensors (about $50-80 per unit) pays back within 2-3 months through eliminated unnecessary visits.

2. Quality Inspection & Tracking

Quality control is the single biggest differentiator in commercial cleaning. A missed trash can or streaky window might seem minor, but compounded across hundreds of visits it becomes the reason clients switch providers. Traditional quality tracking relies on supervisors conducting spot checks on 5-10% of jobs. An AI agent with visual inspection capabilities can assess every single job through before/after photo comparison, checklist verification, and IoT sensor validation.

The visual inspection pipeline works by having crews photograph each room or zone at job completion. The agent compares these against reference images and a trained scoring model that evaluates floor shine, surface cleanliness, trash removal, and restroom supply levels. Combined with IoT sensors that track air freshener levels, soap dispenser fill, and paper towel stock, the system produces an objective quality score for every visit without requiring a supervisor on-site.

SLA compliance monitoring ties quality scores to contractual obligations. Most commercial contracts specify response times for urgent requests, minimum inspection scores, and maximum callback rates. The agent tracks all three in real-time, flags contracts approaching SLA breach thresholds, and correlates deficiency patterns with specific crews, times of day, or building zones to identify root causes before they become client complaints.

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import statistics

@dataclass
class InspectionPhoto:
    photo_id: str
    zone: str                      # "lobby", "restroom_2f", "office_3a"
    timestamp: datetime
    is_before: bool                # True=before cleaning, False=after
    quality_scores: Dict[str, float] = field(default_factory=dict)

@dataclass
class ChecklistItem:
    item_id: str
    zone: str
    task: str                      # "vacuum_carpet", "empty_trash", etc.
    completed: bool
    completion_time: Optional[datetime] = None
    verified_by_sensor: bool = False

@dataclass
class SLAContract:
    client_id: str
    max_response_hours: float      # emergency response SLA
    min_quality_score: float       # 0-100 scale
    max_callback_rate_pct: float   # e.g., 5%
    penalty_per_breach_usd: float

class QualityInspectionAgent:
    """AI agent for cleaning quality scoring, SLA monitoring, and deficiency tracking."""

    QUALITY_WEIGHTS = {
        "floor_cleanliness": 0.25,
        "surface_dust": 0.20,
        "trash_removal": 0.20,
        "restroom_sanitation": 0.20,
        "supply_levels": 0.15
    }

    def __init__(self):
        self.inspection_history = {}  # {site_id: [inspections]}
        self.deficiency_log = []
        self.callback_log = {}        # {site_id: [dates]}

    def score_inspection(self, site_id: str,
                          photos: List[InspectionPhoto],
                          checklist: List[ChecklistItem]) -> dict:
        """Score a cleaning job from photos and checklist completion."""
        # Photo-based scoring
        after_photos = [p for p in photos if not p.is_before]
        photo_scores = {}
        for photo in after_photos:
            for metric, score in photo.quality_scores.items():
                if metric not in photo_scores:
                    photo_scores[metric] = []
                photo_scores[metric].append(score)

        weighted_photo_score = 0
        for metric, weight in self.QUALITY_WEIGHTS.items():
            scores = photo_scores.get(metric, [75])
            weighted_photo_score += statistics.mean(scores) * weight

        # Checklist completion rate
        total_items = len(checklist)
        completed = sum(1 for item in checklist if item.completed)
        sensor_verified = sum(1 for item in checklist if item.verified_by_sensor)
        checklist_pct = (completed / total_items * 100) if total_items else 100

        # Combined score: 60% photo quality + 30% checklist + 10% sensor verification
        sensor_pct = (sensor_verified / max(total_items, 1)) * 100
        overall = (
            weighted_photo_score * 0.60 +
            checklist_pct * 0.30 +
            sensor_pct * 0.10
        )

        # Track deficiencies
        deficiencies = []
        for item in checklist:
            if not item.completed:
                deficiencies.append({
                    "zone": item.zone,
                    "task": item.task,
                    "timestamp": datetime.now().isoformat()
                })
                self.deficiency_log.append({
                    "site_id": site_id,
                    "zone": item.zone,
                    "task": item.task,
                    "date": datetime.now().date().isoformat()
                })

        result = {
            "site_id": site_id,
            "overall_score": round(overall, 1),
            "photo_score": round(weighted_photo_score, 1),
            "checklist_completion_pct": round(checklist_pct, 1),
            "sensor_verification_pct": round(sensor_pct, 1),
            "deficiencies": deficiencies,
            "grade": self._score_to_grade(overall)
        }

        if site_id not in self.inspection_history:
            self.inspection_history[site_id] = []
        self.inspection_history[site_id].append(result)

        return result

    def check_sla_compliance(self, site_id: str,
                              contract: SLAContract,
                              window_days: int = 30) -> dict:
        """Monitor SLA compliance and predict breach risk."""
        history = self.inspection_history.get(site_id, [])
        recent = history[-window_days:] if history else []

        # Quality SLA
        avg_score = statistics.mean(
            [i["overall_score"] for i in recent]
        ) if recent else 100
        quality_compliant = avg_score >= contract.min_quality_score

        # Callback rate SLA
        callbacks = self.callback_log.get(site_id, [])
        cutoff = datetime.now() - timedelta(days=window_days)
        recent_callbacks = [d for d in callbacks if d > cutoff]
        total_visits = len(recent) if recent else 1
        callback_rate = (len(recent_callbacks) / total_visits) * 100
        callback_compliant = callback_rate <= contract.max_callback_rate_pct

        # Breach risk prediction (trending)
        if len(recent) >= 5:
            last_5 = [i["overall_score"] for i in recent[-5:]]
            trend = (last_5[-1] - last_5[0]) / max(last_5[0], 1) * 100
            breach_risk = "high" if trend < -5 else "medium" if trend < 0 else "low"
        else:
            breach_risk = "unknown"

        penalties = 0
        if not quality_compliant:
            penalties += contract.penalty_per_breach_usd
        if not callback_compliant:
            penalties += contract.penalty_per_breach_usd

        return {
            "site_id": site_id,
            "client_id": contract.client_id,
            "quality_avg": round(avg_score, 1),
            "quality_target": contract.min_quality_score,
            "quality_compliant": quality_compliant,
            "callback_rate_pct": round(callback_rate, 1),
            "callback_target_pct": contract.max_callback_rate_pct,
            "callback_compliant": callback_compliant,
            "breach_risk": breach_risk,
            "estimated_penalties_usd": penalties,
            "data_points": len(recent)
        }

    def analyze_deficiency_trends(self, window_days: int = 90) -> dict:
        """Identify recurring deficiency patterns across all sites."""
        cutoff = datetime.now() - timedelta(days=window_days)
        recent = [d for d in self.deficiency_log
                  if d["date"] > cutoff.date().isoformat()]

        by_task = {}
        by_zone = {}
        for d in recent:
            by_task[d["task"]] = by_task.get(d["task"], 0) + 1
            by_zone[d["zone"]] = by_zone.get(d["zone"], 0) + 1

        top_tasks = sorted(by_task.items(), key=lambda x: -x[1])[:5]
        top_zones = sorted(by_zone.items(), key=lambda x: -x[1])[:5]

        return {
            "period_days": window_days,
            "total_deficiencies": len(recent),
            "top_missed_tasks": [
                {"task": t, "count": c} for t, c in top_tasks
            ],
            "top_problem_zones": [
                {"zone": z, "count": c} for z, c in top_zones
            ],
            "recommendation": self._deficiency_recommendation(top_tasks)
        }

    def _score_to_grade(self, score: float) -> str:
        if score >= 95: return "A+"
        if score >= 90: return "A"
        if score >= 85: return "B+"
        if score >= 80: return "B"
        if score >= 70: return "C"
        return "D"

    def _deficiency_recommendation(self, top_tasks) -> str:
        if not top_tasks:
            return "No significant deficiency patterns detected"
        worst = top_tasks[0][0]
        if "restroom" in worst.lower():
            return f"Schedule restroom-specific refresher training — '{worst}' is the most missed task"
        if "floor" in worst.lower() or "vacuum" in worst.lower():
            return f"Check equipment condition for floor care — '{worst}' failures may indicate worn pads or filters"
        return f"Add '{worst}' as a mandatory photo-verify checkpoint in the mobile app"
Key insight: Companies that implement photo-verified quality scoring see callback rates drop from 8-12% to under 3% within 60 days. The visibility alone changes crew behavior: when every room is photographed and scored, thoroughness improves even before any management intervention.

3. Workforce Management

Cleaning services face workforce challenges that most industries do not: high turnover rates averaging 200-400% annually, split shifts spanning early morning and late evening, and a mix of full-time, part-time, and on-call workers who must be matched to sites based on certifications, language skills, and security clearances. An AI agent that manages scheduling, performance tracking, and training compliance can reduce labor costs by 10-15% while improving service consistency.

Shift scheduling must comply with labor laws that vary by state and municipality: maximum consecutive hours, mandatory rest periods, overtime thresholds, and predictive scheduling laws that require advance notice. The agent builds schedules that satisfy all legal constraints while minimizing overtime costs and matching crew skills to site requirements. When an employee calls in sick, the agent instantly identifies qualified replacements based on proximity, availability, and overtime status, sending dispatch notifications within minutes rather than the typical 30-60 minute scramble.

Employee performance tracking goes beyond simple attendance. The agent calculates a cost per clean for each crew by combining labor hours, drive time, supply usage, and callback rates. This reveals which crews are genuinely efficient versus which ones rush through jobs and generate callbacks. Combined with quality scores from the inspection module, management can identify top performers for retention bonuses and underperformers for targeted training.

from dataclasses import dataclass, field
from datetime import datetime, time, timedelta
from typing import List, Dict, Optional, Set
import statistics

@dataclass
class Employee:
    employee_id: str
    name: str
    role: str                      # "cleaner", "lead", "supervisor"
    skills: List[str]              # ["floor_care", "biohazard", "window"]
    certifications: Dict[str, datetime]  # {cert_name: expiry_date}
    hourly_rate: float
    overtime_rate: float
    max_weekly_hours: float
    home_lat: float
    home_lon: float
    security_clearances: List[str]  # ["government", "healthcare"]

@dataclass
class ShiftRecord:
    employee_id: str
    date: datetime
    clock_in: datetime
    clock_out: datetime
    site_id: str
    quality_score: float
    callbacks: int
    supplies_cost: float

class WorkforceManagementAgent:
    """AI agent for cleaning crew scheduling, performance, and compliance."""

    OVERTIME_THRESHOLD_WEEKLY = 40.0
    MIN_REST_BETWEEN_SHIFTS = 8.0    # hours
    MAX_CONSECUTIVE_DAYS = 6
    CERT_EXPIRY_WARNING_DAYS = 30

    def __init__(self, employees: List[Employee]):
        self.employees = {e.employee_id: e for e in employees}
        self.shift_history = {}      # {employee_id: [ShiftRecord]}
        self.absence_history = {}    # {employee_id: [dates]}

    def ingest_shift(self, record: ShiftRecord):
        eid = record.employee_id
        if eid not in self.shift_history:
            self.shift_history[eid] = []
        self.shift_history[eid].append(record)

    def calculate_cost_per_clean(self, employee_id: str,
                                  window_days: int = 30) -> dict:
        """Calculate true cost per clean including labor, drive, supplies, callbacks."""
        history = self.shift_history.get(employee_id, [])
        cutoff = datetime.now() - timedelta(days=window_days)
        recent = [s for s in history if s.date > cutoff]

        if not recent:
            return {"employee_id": employee_id, "data": "insufficient"}

        emp = self.employees[employee_id]
        total_hours = sum(
            (s.clock_out - s.clock_in).total_seconds() / 3600 for s in recent
        )
        regular_hours = min(total_hours, self.OVERTIME_THRESHOLD_WEEKLY * (window_days / 7))
        overtime_hours = max(0, total_hours - regular_hours)

        labor_cost = (regular_hours * emp.hourly_rate +
                      overtime_hours * emp.overtime_rate)
        supplies_cost = sum(s.supplies_cost for s in recent)
        callback_cost = sum(s.callbacks for s in recent) * emp.hourly_rate * 1.5
        total_cost = labor_cost + supplies_cost + callback_cost
        num_cleans = len(recent)

        avg_quality = statistics.mean([s.quality_score for s in recent])
        callback_rate = sum(s.callbacks for s in recent) / max(num_cleans, 1) * 100

        return {
            "employee_id": employee_id,
            "name": emp.name,
            "period_days": window_days,
            "total_cleans": num_cleans,
            "cost_per_clean": round(total_cost / max(num_cleans, 1), 2),
            "labor_cost": round(labor_cost, 2),
            "supplies_cost": round(supplies_cost, 2),
            "callback_cost": round(callback_cost, 2),
            "avg_quality_score": round(avg_quality, 1),
            "callback_rate_pct": round(callback_rate, 1),
            "overtime_hours": round(overtime_hours, 1),
            "efficiency_rank": self._rank_employee(employee_id, recent)
        }

    def find_replacement(self, site_id: str,
                          required_skills: List[str],
                          shift_date: datetime,
                          shift_start: time,
                          shift_end: time) -> List[dict]:
        """Find available qualified replacements for a sick call."""
        candidates = []

        for emp in self.employees.values():
            # Skill check
            if not all(s in emp.skills for s in required_skills):
                continue

            # Weekly hours check
            week_hours = self._weekly_hours(emp.employee_id, shift_date)
            shift_hours = self._time_diff_hours(shift_start, shift_end)
            if week_hours + shift_hours > emp.max_weekly_hours:
                continue

            # Rest period check
            last_shift = self._last_shift_end(emp.employee_id, shift_date)
            if last_shift:
                rest = (datetime.combine(shift_date, shift_start) - last_shift)
                if rest.total_seconds() / 3600 < self.MIN_REST_BETWEEN_SHIFTS:
                    continue

            # Consecutive days check
            consec = self._consecutive_days(emp.employee_id, shift_date)
            if consec >= self.MAX_CONSECUTIVE_DAYS:
                continue

            overtime = max(0, week_hours + shift_hours - self.OVERTIME_THRESHOLD_WEEKLY)
            cost = (shift_hours - overtime) * emp.hourly_rate + overtime * emp.overtime_rate

            candidates.append({
                "employee_id": emp.employee_id,
                "name": emp.name,
                "cost": round(cost, 2),
                "overtime_hours": round(overtime, 1),
                "distance_from_home": "nearby",
                "quality_avg": self._avg_quality(emp.employee_id)
            })

        candidates.sort(key=lambda c: (c["overtime_hours"], -c["quality_avg"], c["cost"]))
        return candidates[:5]

    def check_certifications(self) -> List[dict]:
        """Flag employees with expiring or expired certifications."""
        alerts = []
        now = datetime.now()
        warning_cutoff = now + timedelta(days=self.CERT_EXPIRY_WARNING_DAYS)

        for emp in self.employees.values():
            for cert, expiry in emp.certifications.items():
                if expiry < now:
                    alerts.append({
                        "employee_id": emp.employee_id,
                        "name": emp.name,
                        "certification": cert,
                        "expiry": expiry.isoformat(),
                        "status": "expired",
                        "action": "Remove from certified-required sites immediately"
                    })
                elif expiry < warning_cutoff:
                    alerts.append({
                        "employee_id": emp.employee_id,
                        "name": emp.name,
                        "certification": cert,
                        "expiry": expiry.isoformat(),
                        "status": "expiring_soon",
                        "action": f"Schedule renewal — {(expiry - now).days} days remaining"
                    })

        return sorted(alerts, key=lambda a: a["expiry"])

    def predict_absences(self, month: int) -> dict:
        """Predict absence rates by month from historical patterns."""
        monthly_rates = {}
        for eid, dates in self.absence_history.items():
            for d in dates:
                m = d.month
                monthly_rates[m] = monthly_rates.get(m, 0) + 1

        total_employees = len(self.employees)
        predicted_rate = monthly_rates.get(month, 0) / max(total_employees, 1)
        peak_months = sorted(monthly_rates, key=monthly_rates.get, reverse=True)[:3]

        return {
            "month": month,
            "predicted_absence_rate": round(predicted_rate * 100 / 12, 1),
            "peak_absence_months": peak_months,
            "recommended_float_staff": math.ceil(total_employees * predicted_rate * 0.15),
            "action": "Increase on-call pool" if month in peak_months else "Standard staffing"
        }

    def _weekly_hours(self, eid, ref_date) -> float:
        records = self.shift_history.get(eid, [])
        week_start = ref_date - timedelta(days=ref_date.weekday())
        week_end = week_start + timedelta(days=7)
        return sum(
            (r.clock_out - r.clock_in).total_seconds() / 3600
            for r in records if week_start <= r.date < week_end
        )

    def _last_shift_end(self, eid, ref_date) -> Optional[datetime]:
        records = self.shift_history.get(eid, [])
        past = [r for r in records if r.date.date() < ref_date.date()]
        return past[-1].clock_out if past else None

    def _consecutive_days(self, eid, ref_date) -> int:
        records = self.shift_history.get(eid, [])
        dates = {r.date.date() for r in records}
        count = 0
        d = ref_date.date() - timedelta(days=1)
        while d in dates:
            count += 1
            d -= timedelta(days=1)
        return count

    def _time_diff_hours(self, start: time, end: time) -> float:
        s = start.hour + start.minute / 60
        e = end.hour + end.minute / 60
        return e - s if e > s else (24 - s + e)

    def _avg_quality(self, eid) -> float:
        records = self.shift_history.get(eid, [])
        if not records: return 0
        return round(statistics.mean([r.quality_score for r in records]), 1)

    def _rank_employee(self, eid, recent) -> str:
        if not recent: return "unranked"
        avg_q = statistics.mean([r.quality_score for r in recent])
        cb_rate = sum(r.callbacks for r in recent) / len(recent)
        if avg_q >= 90 and cb_rate < 0.05: return "top_performer"
        if avg_q >= 80 and cb_rate < 0.10: return "solid"
        if avg_q < 70 or cb_rate > 0.15: return "needs_improvement"
        return "average"
Key insight: Cost-per-clean analysis typically reveals a 3-5x spread between your most and least efficient crews. The difference is rarely speed alone: high-callback crews cost 40-60% more per job when you factor in return trips, client dissatisfaction, and supervisor time spent handling complaints.

4. Client Management & Retention

Client acquisition in commercial cleaning costs 5-7x more than retention, yet the average regional operator loses 18-22% of contracts annually. Most churn is preventable: it stems from accumulated small dissatisfactions that go unaddressed until the client quietly solicits bids from competitors. An AI agent that monitors satisfaction signals, automates professional communication, and predicts churn risk can cut annual attrition by 30-50%.

Contract management goes beyond tracking renewal dates. The agent analyzes each client's service history to identify upsell opportunities: a client who consistently requests extra window cleaning might benefit from adding it to the base contract at a discount. Pricing optimization considers square footage, cleaning frequency, scope complexity, time-of-day constraints, and local competitor rates. The agent models the price elasticity for each client segment, finding the point where margin increases without triggering competitive bids.

Communication automation transforms the client experience. After every cleaning, the agent can generate a service report with before/after photos, checklist completion data, and any notes from the crew. Monthly summaries aggregate quality scores, track SLA compliance, and highlight improvements. This level of transparency builds trust and makes it significantly harder for competitors to displace your service on price alone.

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import statistics
import math

@dataclass
class CleaningContract:
    client_id: str
    company_name: str
    contract_start: datetime
    contract_end: datetime
    monthly_value: float
    square_footage: int
    frequency_per_week: int
    scope: List[str]               # ["general", "windows", "carpet", "restroom"]
    auto_renew: bool
    last_price_increase: Optional[datetime] = None

@dataclass
class ClientInteraction:
    client_id: str
    date: datetime
    interaction_type: str          # "complaint", "compliment", "request", "inquiry"
    severity: int                  # 1-5, 5=critical
    resolved: bool
    resolution_hours: Optional[float] = None

class ClientManagementAgent:
    """AI agent for cleaning client retention, pricing, and churn prediction."""

    CHURN_RISK_THRESHOLD = 0.65
    UPSELL_QUALITY_THRESHOLD = 85
    PRICE_INCREASE_COOLDOWN_DAYS = 365

    def __init__(self):
        self.contracts = {}
        self.interactions = {}
        self.quality_scores = {}    # {client_id: [scores]}

    def register_contract(self, contract: CleaningContract):
        self.contracts[contract.client_id] = contract

    def log_interaction(self, interaction: ClientInteraction):
        cid = interaction.client_id
        if cid not in self.interactions:
            self.interactions[cid] = []
        self.interactions[cid].append(interaction)

    def predict_churn(self, client_id: str) -> dict:
        """Predict churn probability from satisfaction signals."""
        contract = self.contracts.get(client_id)
        if not contract:
            return {"error": "client not found"}

        interactions = self.interactions.get(client_id, [])
        recent_90d = [i for i in interactions
                      if i.date > datetime.now() - timedelta(days=90)]

        # Signal weights
        complaints = [i for i in recent_90d if i.interaction_type == "complaint"]
        compliments = [i for i in recent_90d if i.interaction_type == "compliment"]
        avg_severity = statistics.mean([c.severity for c in complaints]) if complaints else 0
        unresolved = sum(1 for c in complaints if not c.resolved)

        quality_scores = self.quality_scores.get(client_id, [])
        recent_quality = quality_scores[-10:] if quality_scores else [85]
        quality_trend = (
            (recent_quality[-1] - recent_quality[0]) / max(recent_quality[0], 1)
            if len(recent_quality) >= 2 else 0
        )

        days_to_renewal = (contract.contract_end - datetime.now()).days
        tenure_months = (datetime.now() - contract.contract_start).days / 30

        # Churn score: 0 = no risk, 1 = certain churn
        churn_score = (
            min(len(complaints) * 0.15, 0.45) +
            min(avg_severity * 0.06, 0.30) +
            min(unresolved * 0.10, 0.20) +
            max(-quality_trend * 2, 0) * 0.15 +
            (0.10 if days_to_renewal < 60 and not contract.auto_renew else 0) -
            min(len(compliments) * 0.05, 0.15) -
            min(tenure_months * 0.005, 0.10)
        )
        churn_score = max(0, min(1, churn_score))

        actions = []
        if churn_score > 0.7:
            actions.append("Schedule executive account review within 1 week")
            actions.append("Prepare retention offer: 5-10% discount or scope upgrade")
        elif churn_score > 0.4:
            actions.append("Increase supervisor inspection frequency")
            actions.append("Send satisfaction survey")
        if unresolved > 0:
            actions.append(f"Resolve {unresolved} open complaints immediately")

        return {
            "client_id": client_id,
            "company": contract.company_name,
            "churn_probability": round(churn_score, 2),
            "risk_level": (
                "critical" if churn_score > 0.7
                else "high" if churn_score > 0.5
                else "medium" if churn_score > 0.3
                else "low"
            ),
            "complaints_90d": len(complaints),
            "unresolved_complaints": unresolved,
            "quality_trend": round(quality_trend * 100, 1),
            "days_to_renewal": days_to_renewal,
            "contract_monthly_value": contract.monthly_value,
            "annual_revenue_at_risk": round(contract.monthly_value * 12, 0),
            "recommended_actions": actions
        }

    def identify_upsell(self, client_id: str) -> dict:
        """Find upsell opportunities based on service history and quality."""
        contract = self.contracts.get(client_id)
        interactions = self.interactions.get(client_id, [])
        quality = self.quality_scores.get(client_id, [])

        if not contract:
            return {"error": "client not found"}

        avg_quality = statistics.mean(quality[-10:]) if quality else 0
        requests = [i for i in interactions if i.interaction_type == "request"]

        opportunities = []
        request_categories = {}
        for r in requests:
            cat = r.interaction_type
            request_categories[cat] = request_categories.get(cat, 0) + 1

        # Scope expansion
        all_scopes = ["general", "windows", "carpet", "restroom", "kitchen",
                      "exterior", "pressure_wash", "floor_stripping"]
        missing = [s for s in all_scopes if s not in contract.scope]
        for scope in missing[:3]:
            est_value = contract.monthly_value * 0.15
            opportunities.append({
                "type": "scope_expansion",
                "service": scope,
                "estimated_monthly_value": round(est_value, 0),
                "pitch": f"Add {scope} service for comprehensive coverage"
            })

        # Frequency increase
        if avg_quality >= self.UPSELL_QUALITY_THRESHOLD and contract.frequency_per_week < 5:
            freq_value = contract.monthly_value * 0.25
            opportunities.append({
                "type": "frequency_increase",
                "current": contract.frequency_per_week,
                "proposed": contract.frequency_per_week + 1,
                "estimated_monthly_value": round(freq_value, 0),
                "pitch": "Increase frequency for consistently higher cleanliness"
            })

        return {
            "client_id": client_id,
            "current_monthly_value": contract.monthly_value,
            "opportunities": opportunities,
            "total_upsell_potential": round(
                sum(o["estimated_monthly_value"] for o in opportunities), 0
            )
        }

    def optimize_pricing(self, client_id: str,
                          market_rate_per_sqft: float) -> dict:
        """Recommend pricing adjustments based on cost, market, and retention risk."""
        contract = self.contracts.get(client_id)
        if not contract:
            return {"error": "client not found"}

        current_rate = contract.monthly_value / max(contract.square_footage, 1)
        market_gap_pct = ((market_rate_per_sqft - current_rate)
                          / max(current_rate, 0.01)) * 100

        churn = self.predict_churn(client_id)
        churn_risk = churn["churn_probability"]

        # Conservative increase if below market and low churn risk
        if market_gap_pct > 5 and churn_risk < 0.3:
            increase_pct = min(market_gap_pct * 0.5, 8)
            action = "increase"
        elif market_gap_pct > 10 and churn_risk < 0.5:
            increase_pct = min(market_gap_pct * 0.3, 5)
            action = "moderate_increase"
        elif churn_risk > 0.6:
            increase_pct = 0
            action = "hold_or_discount"
        else:
            increase_pct = 0
            action = "maintain"

        new_monthly = contract.monthly_value * (1 + increase_pct / 100)

        return {
            "client_id": client_id,
            "current_monthly": contract.monthly_value,
            "current_rate_sqft": round(current_rate, 4),
            "market_rate_sqft": round(market_rate_per_sqft, 4),
            "market_gap_pct": round(market_gap_pct, 1),
            "churn_risk": round(churn_risk, 2),
            "recommended_action": action,
            "increase_pct": round(increase_pct, 1),
            "new_monthly": round(new_monthly, 0),
            "annual_revenue_impact": round((new_monthly - contract.monthly_value) * 12, 0)
        }
Key insight: Churn prediction models for cleaning services achieve highest accuracy when they weight unresolved complaints and quality score trends over raw complaint counts. A single unresolved complaint is a stronger churn signal than three complaints that were promptly addressed. Proactive outreach when churn probability exceeds 0.5 recovers 40-60% of at-risk accounts.

5. Inventory & Supply Chain

Chemical and supply costs represent 8-15% of revenue for most cleaning companies, yet inventory management is often an afterthought. Crews over-dilute expensive disinfectants (wasting product), under-dilute cheap cleaners (reducing effectiveness), and equipment breaks down mid-shift because maintenance is tracked on a whiteboard if it is tracked at all. An AI agent that monitors consumption rates, enforces dilution ratios, and automates reordering can cut supply costs by 15-25% while ensuring consistent product quality.

Equipment maintenance scheduling prevents the costly disruption of mid-shift breakdowns. Floor scrubbers, carpet extractors, and backpack vacuums all have operating-hour thresholds for pad replacement, filter changes, and motor servicing. The agent tracks cumulative hours per unit and schedules maintenance during off-peak periods. Condition monitoring through simple IoT sensors (vibration, suction pressure) can predict failures 1-2 weeks before they occur.

Green product compliance is increasingly a contract requirement, particularly for government buildings, healthcare facilities, and LEED-certified offices. The agent tracks which products carry Green Seal, EPA Safer Choice, or equivalent certifications, ensures compliant products are dispatched to sites that require them, and flags when a supplier substitutes a non-certified alternative.

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import statistics

@dataclass
class CleaningProduct:
    product_id: str
    name: str
    category: str                  # "disinfectant", "glass", "floor", "degreaser"
    cost_per_unit: float           # cost per gallon/liter
    dilution_ratio: float          # e.g., 1:64 = 0.015625
    coverage_sqft_per_unit: float  # diluted coverage
    green_certified: bool
    certifications: List[str]      # ["green_seal", "epa_safer_choice"]
    vendor: str
    lead_time_days: int

@dataclass
class EquipmentUnit:
    unit_id: str
    equipment_type: str            # "floor_scrubber", "vacuum", "carpet_extractor"
    purchase_date: datetime
    total_operating_hours: float
    maintenance_interval_hours: float
    last_maintenance: datetime
    condition_score: float         # 0-100 from IoT sensors
    assigned_crew: Optional[str] = None

@dataclass
class UsageRecord:
    product_id: str
    site_id: str
    date: datetime
    quantity_used: float           # gallons/liters
    dilution_actual: float         # actual dilution ratio applied
    square_footage_cleaned: int

class InventorySupplyAgent:
    """AI agent for cleaning supply tracking, equipment maintenance, and ordering."""

    DILUTION_TOLERANCE_PCT = 15     # acceptable deviation from spec
    REORDER_BUFFER_DAYS = 7         # order this many days before stockout
    EQUIPMENT_WARNING_HOURS = 50    # warn this many hours before maintenance due

    def __init__(self, products: List[CleaningProduct],
                 equipment: List[EquipmentUnit]):
        self.products = {p.product_id: p for p in products}
        self.equipment = {e.unit_id: e for e in equipment}
        self.usage_history = {}     # {product_id: [UsageRecord]}
        self.inventory_levels = {}  # {product_id: quantity_on_hand}

    def log_usage(self, record: UsageRecord):
        pid = record.product_id
        if pid not in self.usage_history:
            self.usage_history[pid] = []
        self.usage_history[pid].append(record)

    def check_dilution_compliance(self, product_id: str,
                                   window_days: int = 30) -> dict:
        """Monitor dilution ratios and flag over/under-dilution."""
        product = self.products[product_id]
        records = self.usage_history.get(product_id, [])
        cutoff = datetime.now() - timedelta(days=window_days)
        recent = [r for r in records if r.date > cutoff]

        if not recent:
            return {"product_id": product_id, "data": "insufficient"}

        spec_ratio = product.dilution_ratio
        tolerance = spec_ratio * (self.DILUTION_TOLERANCE_PCT / 100)

        violations = []
        costs_actual = []
        costs_optimal = []

        for r in recent:
            deviation_pct = ((r.dilution_actual - spec_ratio)
                             / max(spec_ratio, 0.001)) * 100
            cost_actual = r.quantity_used * product.cost_per_unit
            cost_optimal = (r.square_footage_cleaned
                            / max(product.coverage_sqft_per_unit, 1)
                            * product.cost_per_unit)
            costs_actual.append(cost_actual)
            costs_optimal.append(cost_optimal)

            if abs(r.dilution_actual - spec_ratio) > tolerance:
                violations.append({
                    "site_id": r.site_id,
                    "date": r.date.isoformat(),
                    "actual_ratio": r.dilution_actual,
                    "spec_ratio": spec_ratio,
                    "deviation_pct": round(deviation_pct, 1),
                    "type": "over_diluted" if r.dilution_actual > spec_ratio else "under_diluted"
                })

        waste_usd = sum(costs_actual) - sum(costs_optimal)

        return {
            "product_id": product_id,
            "product_name": product.name,
            "period_days": window_days,
            "total_uses": len(recent),
            "violations": len(violations),
            "violation_rate_pct": round(len(violations) / max(len(recent), 1) * 100, 1),
            "cost_actual": round(sum(costs_actual), 2),
            "cost_optimal": round(sum(costs_optimal), 2),
            "waste_usd": round(max(waste_usd, 0), 2),
            "worst_violations": violations[:5],
            "recommendation": (
                "Install auto-dilution dispensers at high-violation sites"
                if len(violations) > len(recent) * 0.3
                else "Schedule dilution training refresher"
                if violations else "Compliance within acceptable range"
            )
        }

    def schedule_equipment_maintenance(self) -> List[dict]:
        """Generate maintenance schedule based on hours and condition."""
        schedule = []
        now = datetime.now()

        for unit in self.equipment.values():
            hours_since = unit.total_operating_hours
            last_maint = unit.last_maintenance
            interval = unit.maintenance_interval_hours

            # Hours until next maintenance
            hours_at_last = hours_since  # simplified
            hours_remaining = interval - (hours_since % interval)

            # Condition-based adjustment
            if unit.condition_score < 60:
                urgency = "immediate"
                hours_remaining = 0
            elif unit.condition_score < 75:
                urgency = "soon"
                hours_remaining = min(hours_remaining, 20)
            elif hours_remaining < self.EQUIPMENT_WARNING_HOURS:
                urgency = "scheduled"
            else:
                urgency = "normal"
                continue  # skip if not due soon

            schedule.append({
                "unit_id": unit.unit_id,
                "type": unit.equipment_type,
                "total_hours": unit.total_operating_hours,
                "hours_until_due": round(hours_remaining, 0),
                "condition_score": unit.condition_score,
                "urgency": urgency,
                "assigned_crew": unit.assigned_crew,
                "action": (
                    "Pull from service immediately"
                    if urgency == "immediate"
                    else f"Schedule maintenance within {int(hours_remaining)} operating hours"
                )
            })

        return sorted(schedule, key=lambda s: s["hours_until_due"])

    def auto_reorder(self, daily_usage_avg: Optional[Dict[str, float]] = None) -> List[dict]:
        """Calculate reorder needs based on consumption rate and lead time."""
        orders = []

        for pid, product in self.products.items():
            on_hand = self.inventory_levels.get(pid, 0)
            records = self.usage_history.get(pid, [])

            # Calculate daily consumption rate
            if daily_usage_avg and pid in daily_usage_avg:
                daily_rate = daily_usage_avg[pid]
            elif records:
                recent_30 = [r for r in records
                             if r.date > datetime.now() - timedelta(days=30)]
                daily_rate = (sum(r.quantity_used for r in recent_30)
                              / max(len(set(r.date.date() for r in recent_30)), 1))
            else:
                continue

            days_of_stock = on_hand / max(daily_rate, 0.01)
            reorder_point = daily_rate * (product.lead_time_days + self.REORDER_BUFFER_DAYS)

            if on_hand <= reorder_point:
                order_qty = daily_rate * 30  # 30-day supply
                orders.append({
                    "product_id": pid,
                    "product_name": product.name,
                    "vendor": product.vendor,
                    "on_hand": round(on_hand, 1),
                    "daily_rate": round(daily_rate, 2),
                    "days_of_stock": round(days_of_stock, 1),
                    "order_quantity": round(order_qty, 1),
                    "estimated_cost": round(order_qty * product.cost_per_unit, 2),
                    "lead_time_days": product.lead_time_days,
                    "green_certified": product.green_certified,
                    "urgency": "critical" if days_of_stock < 3 else "standard"
                })

        return sorted(orders, key=lambda o: o["days_of_stock"])

    def green_compliance_check(self, site_requirements: Dict[str, List[str]]) -> dict:
        """Verify green product compliance for certified buildings."""
        violations = []
        compliant_sites = 0
        total_sites = len(site_requirements)

        for site_id, required_certs in site_requirements.items():
            site_violations = []
            for pid, product in self.products.items():
                usage = self.usage_history.get(pid, [])
                used_at_site = [r for r in usage if r.site_id == site_id]
                if used_at_site and not any(
                    cert in product.certifications for cert in required_certs
                ):
                    site_violations.append({
                        "product": product.name,
                        "required_certs": required_certs,
                        "product_certs": product.certifications
                    })

            if site_violations:
                violations.append({
                    "site_id": site_id,
                    "violations": site_violations
                })
            else:
                compliant_sites += 1

        return {
            "total_sites": total_sites,
            "compliant_sites": compliant_sites,
            "compliance_rate_pct": round(compliant_sites / max(total_sites, 1) * 100, 1),
            "violations": violations
        }
Key insight: Dilution ratio monitoring alone typically recovers 8-12% of chemical spend. Over-dilution wastes expensive concentrates, while under-dilution reduces cleaning effectiveness and triggers callbacks. Auto-dilution dispensers ($200-400 per unit) combined with AI monitoring pay for themselves within 1-2 months on high-volume sites.

6. ROI Analysis for Regional Cleaning Company (200 Contracts)

A regional cleaning company managing 200 commercial contracts operates 15-25 crews, covers a metro area with 50-80 mile radius, and generates $3-6M in annual revenue. At this scale, even small percentage improvements compound into substantial savings. The following model quantifies the return on deploying AI agents across all five operational areas, using conservative estimates validated against industry benchmarks.

Implementation costs include the AI platform subscription, IoT sensors for high-value sites, integration with existing scheduling and payroll systems, and staff training. The model assumes a phased 6-month rollout starting with route optimization (fastest payback) and ending with client retention analytics (highest long-term value). All benefit estimates use the conservative end of documented ranges.

The compound effect matters: route optimization reduces drive time, which frees capacity for additional contracts, which increases revenue without proportional headcount growth. Quality tracking reduces callbacks, which improves client satisfaction, which reduces churn, which reduces acquisition spending. Each module amplifies the others.

from dataclasses import dataclass
import math

class CleaningCompanyROIModel:
    """ROI model for AI agent deployment across a 200-contract cleaning operation."""

    def __init__(self, num_contracts: int = 200, num_crews: int = 20,
                 avg_monthly_contract: float = 2000,
                 annual_revenue: float = 4_800_000):
        self.num_contracts = num_contracts
        self.num_crews = num_crews
        self.avg_monthly = avg_monthly_contract
        self.annual_revenue = annual_revenue

    def route_efficiency_savings(self) -> dict:
        """Route optimization: reduce drive time and fuel costs."""
        daily_drive_hours_before = 2.5          # per crew
        drive_reduction_pct = 0.22              # 22% reduction (conservative)
        fuel_cost_per_hour = 18                 # fuel + vehicle wear
        working_days = 260

        hours_saved_daily = daily_drive_hours_before * drive_reduction_pct * self.num_crews
        annual_hours_saved = hours_saved_daily * working_days
        fuel_savings = annual_hours_saved * fuel_cost_per_hour
        labor_savings = annual_hours_saved * 22  # avg hourly rate

        # Freed capacity enables additional contracts
        hours_per_clean = 2.5
        extra_cleans_weekly = (hours_saved_daily * 5) / hours_per_clean
        extra_revenue = extra_cleans_weekly * 52 * (self.avg_monthly / 4.33)

        return {
            "category": "Route Optimization",
            "drive_hours_saved_annually": round(annual_hours_saved, 0),
            "fuel_savings_usd": round(fuel_savings, 0),
            "labor_savings_usd": round(labor_savings, 0),
            "extra_capacity_revenue_usd": round(extra_revenue, 0),
            "total_benefit_usd": round(fuel_savings + labor_savings + extra_revenue, 0)
        }

    def labor_optimization_savings(self) -> dict:
        """Workforce management: reduce overtime, callbacks, and turnover costs."""
        annual_labor_cost = self.annual_revenue * 0.55
        overtime_before_pct = 0.12
        overtime_reduction = 0.40               # 40% overtime reduction
        overtime_savings = annual_labor_cost * overtime_before_pct * overtime_reduction * 0.5

        # Callback reduction
        callbacks_per_month = self.num_contracts * 0.10  # 10% callback rate
        callback_reduction = 0.65                        # 65% fewer callbacks
        callback_cost = 85                               # per callback
        callback_savings = callbacks_per_month * callback_reduction * callback_cost * 12

        # Turnover reduction
        annual_turnover_rate_before = 2.5                # 250%
        turnover_reduction = 0.20                        # reduce by 20%
        cost_per_replacement = 2500
        employees = self.num_crews * 3                   # 3 per crew avg
        turnover_savings = (employees * annual_turnover_rate_before *
                            turnover_reduction * cost_per_replacement)

        return {
            "category": "Labor Optimization",
            "overtime_savings_usd": round(overtime_savings, 0),
            "callback_savings_usd": round(callback_savings, 0),
            "turnover_savings_usd": round(turnover_savings, 0),
            "total_benefit_usd": round(
                overtime_savings + callback_savings + turnover_savings, 0
            )
        }

    def client_retention_savings(self) -> dict:
        """Client management: reduce churn and increase upsell revenue."""
        current_churn_rate = 0.20                # 20% annual
        churn_reduction = 0.35                   # reduce churn by 35%
        retained_contracts = self.num_contracts * current_churn_rate * churn_reduction
        retained_revenue = retained_contracts * self.avg_monthly * 12

        # Acquisition cost avoided
        acquisition_cost = 3500                  # per new contract
        acquisition_savings = retained_contracts * acquisition_cost

        # Upsell revenue
        upsell_rate = 0.08                       # 8% of clients accept upsell
        upsell_value = self.avg_monthly * 0.20   # 20% contract increase
        upsell_revenue = self.num_contracts * upsell_rate * upsell_value * 12

        # Pricing optimization
        underpriced_pct = 0.15
        avg_increase = 0.05
        pricing_revenue = (self.num_contracts * underpriced_pct *
                           self.avg_monthly * avg_increase * 12)

        return {
            "category": "Client Retention & Growth",
            "retained_revenue_usd": round(retained_revenue, 0),
            "acquisition_savings_usd": round(acquisition_savings, 0),
            "upsell_revenue_usd": round(upsell_revenue, 0),
            "pricing_optimization_usd": round(pricing_revenue, 0),
            "total_benefit_usd": round(
                retained_revenue + acquisition_savings +
                upsell_revenue + pricing_revenue, 0
            )
        }

    def inventory_savings(self) -> dict:
        """Supply chain: reduce waste, prevent stockouts, optimize ordering."""
        annual_supply_cost = self.annual_revenue * 0.10
        dilution_waste_recovery = annual_supply_cost * 0.10
        bulk_ordering_savings = annual_supply_cost * 0.08
        equipment_downtime_savings = self.num_crews * 12 * 350  # 12 incidents/yr * cost

        return {
            "category": "Inventory & Supply Chain",
            "dilution_savings_usd": round(dilution_waste_recovery, 0),
            "ordering_savings_usd": round(bulk_ordering_savings, 0),
            "equipment_savings_usd": round(equipment_downtime_savings, 0),
            "total_benefit_usd": round(
                dilution_waste_recovery + bulk_ordering_savings +
                equipment_downtime_savings, 0
            )
        }

    def full_roi_analysis(self) -> dict:
        """Complete ROI analysis with costs and payback period."""
        route = self.route_efficiency_savings()
        labor = self.labor_optimization_savings()
        client = self.client_retention_savings()
        inventory = self.inventory_savings()

        # Implementation costs
        platform_annual = 24_000           # AI platform subscription
        iot_sensors = 15_000               # one-time: sensors for top 50 sites
        integration = 20_000               # one-time: system integration
        training = 8_000                   # one-time: staff training
        ongoing_maintenance = 6_000        # annual

        total_year1_cost = platform_annual + iot_sensors + integration + training + ongoing_maintenance
        total_annual_cost = platform_annual + ongoing_maintenance

        total_annual_benefit = (
            route["total_benefit_usd"] +
            labor["total_benefit_usd"] +
            client["total_benefit_usd"] +
            inventory["total_benefit_usd"]
        )

        roi_year1 = ((total_annual_benefit - total_year1_cost)
                      / total_year1_cost) * 100
        roi_year2 = ((total_annual_benefit - total_annual_cost)
                      / total_annual_cost) * 100
        payback_months = (total_year1_cost / max(total_annual_benefit, 1)) * 12

        return {
            "company_profile": {
                "contracts": self.num_contracts,
                "crews": self.num_crews,
                "annual_revenue": self.annual_revenue
            },
            "annual_benefits": {
                "route_optimization": route["total_benefit_usd"],
                "labor_optimization": labor["total_benefit_usd"],
                "client_retention": client["total_benefit_usd"],
                "inventory_supply": inventory["total_benefit_usd"],
                "total": round(total_annual_benefit, 0)
            },
            "costs": {
                "year_1_total": total_year1_cost,
                "annual_recurring": total_annual_cost
            },
            "returns": {
                "roi_year_1_pct": round(roi_year1, 0),
                "roi_year_2_pct": round(roi_year2, 0),
                "payback_months": round(payback_months, 1),
                "net_benefit_year_1": round(
                    total_annual_benefit - total_year1_cost, 0
                )
            }
        }

# Run the analysis
model = CleaningCompanyROIModel(
    num_contracts=200, num_crews=20,
    avg_monthly_contract=2000, annual_revenue=4_800_000
)
results = model.full_roi_analysis()

print(f"Company: {results['company_profile']['contracts']} contracts, "
      f"${results['company_profile']['annual_revenue']:,.0f} revenue")
print(f"\nAnnual Benefits Breakdown:")
for k, v in results["annual_benefits"].items():
    if k != "total":
        print(f"  {k}: ${v:,.0f}")
print(f"  TOTAL: ${results['annual_benefits']['total']:,.0f}")
print(f"\nYear 1 Cost: ${results['costs']['year_1_total']:,.0f}")
print(f"Year 1 ROI: {results['returns']['roi_year_1_pct']}%")
print(f"Year 2 ROI: {results['returns']['roi_year_2_pct']}%")
print(f"Payback: {results['returns']['payback_months']} months")
Bottom line: A 200-contract cleaning company investing $73,000 in year one can expect $320,000-$780,000 in annual benefits, yielding a payback period under 2 months and year-2 ROI exceeding 900%. Route optimization and labor management deliver the fastest returns, while client retention compounds value over years as reduced churn eliminates the constant treadmill of replacing lost accounts.

Getting Started: Implementation Roadmap

Deploying AI agents across a cleaning operation works best as a phased rollout, starting with the modules that deliver the fastest measurable ROI:

  1. Month 1-2: Route optimization. Connect your scheduling system and crew GPS data. Deploy the route agent on 5 crews as a pilot. Measure drive time reduction against the previous month's baseline.
  2. Month 2-3: Quality inspection. Roll out photo documentation and checklist tracking through the crew mobile app. Establish quality score baselines for all sites. Install IoT sensors at your top 20 highest-value contracts.
  3. Month 3-4: Workforce management. Integrate with payroll and HR systems. Deploy cost-per-clean tracking and automated replacement dispatch. Begin certification tracking.
  4. Month 4-5: Inventory and supply chain. Set up product usage logging and dilution monitoring. Configure auto-reorder thresholds. Deploy equipment maintenance scheduling.
  5. Month 5-6: Client management and retention. Activate churn prediction on all contracts. Launch automated service reports. Begin pricing optimization analysis on contracts up for renewal.

The key to adoption is making the AI agent a tool that helps crews do their jobs better rather than a surveillance system. Route optimization means less driving and more predictable shifts. Quality tracking means fewer callbacks and fewer angry supervisor calls. When the frontline team sees the benefit, adoption follows naturally.

The AI Agent Playbook

Step-by-step templates, SOUL.md frameworks, and security checklists for deploying AI agents in any service business.

Get the Playbook — $19