All files / sync connection-pool.ts

97.56% Statements 80/82
91.48% Branches 43/47
71.42% Functions 10/14
100% Lines 75/75

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215                                      28x 28x 28x                 391x         391x     391x 391x 391x 391x     391x               18x         18x 18x 1x 1x 1x 1x         18x 1x           17x 1x 1x 1x             16x 16x 16x       6x 6x     10x             10x 10x   10x   18x             15x 15x   13x 13x 12x 12x                   342x 335x 335x     342x 32x 32x       342x             49x 49x 26x   49x             12x             5x 5x 7x 7x 8x 4x   4x     7x 3x   4x                   8x   8x 11x 12x 5x 4x           8x   4x 4x 4x 4x   4x 3x     4x      
import type Database from "better-sqlite3-multiple-ciphers";
import { type ImapConfig, ImapSync } from "./imap-sync.js";
 
export interface PooledConnection {
	sync: ImapSync;
	connectorId: number;
	lastUsed: number;
	busy: boolean;
}
 
export interface ConnectionPoolOptions {
	/** Max connections per connector (default: 1) */
	maxPerConnector?: number;
	/** Max total connections across all connectors (default: 10) */
	maxTotal?: number;
	/** Idle timeout in ms before a connection is closed (default: 5 minutes) */
	idleTimeoutMs?: number;
}
 
const DEFAULT_MAX_PER_CONNECTOR = 1;
const DEFAULT_MAX_TOTAL = 10;
const DEFAULT_IDLE_TIMEOUT_MS = 5 * 60 * 1000;
 
/**
 * Manages a pool of IMAP connections across multiple inbound connectors.
 *
 * Reuses existing connections when possible, enforces per-connector
 * and total connection limits, and cleans up idle connections.
 */
export class ConnectionPool {
	private connections: Map<number, PooledConnection[]> = new Map();
	private db: Database.Database;
	private maxPerConnector: number;
	private maxTotal: number;
	private idleTimeoutMs: number;
	private cleanupTimer: ReturnType<typeof setInterval> | null = null;
 
	constructor(db: Database.Database, options: ConnectionPoolOptions = {}) {
		this.db = db;
		this.maxPerConnector = options.maxPerConnector ?? DEFAULT_MAX_PER_CONNECTOR;
		this.maxTotal = options.maxTotal ?? DEFAULT_MAX_TOTAL;
		this.idleTimeoutMs = options.idleTimeoutMs ?? DEFAULT_IDLE_TIMEOUT_MS;
 
		// Periodically clean up idle connections
		this.cleanupTimer = setInterval(() => this.evictIdle(), this.idleTimeoutMs);
	}
 
	/**
	 * Acquires an IMAP sync instance for the given inbound connector.
	 * Reuses an idle connection if available, or creates a new one.
	 */
	async acquire(connectorId: number, config: ImapConfig): Promise<ImapSync> {
		const connectorConns = this.connections.get(connectorId) ?? [];
 
		// Discard idle connections — ImapFlow instances can't be reused after
		// ungraceful shutdown and there's no reliable way to check liveness.
		// Always create a fresh connection for each sync cycle.
		const idleIdx = connectorConns.findIndex((c) => !c.busy);
		if (idleIdx !== -1) {
			const [stale] = connectorConns.splice(idleIdx, 1);
			stale.sync.disconnect().catch(() => {});
			Eif (connectorConns.length === 0) {
				this.connections.delete(connectorId);
			}
		}
 
		// Check per-connector limit
		if (connectorConns.length >= this.maxPerConnector) {
			throw new Error(
				`Connection limit reached for connector ${connectorId} (max: ${this.maxPerConnector})`,
			);
		}
 
		// Check total limit — evict oldest idle connection if at capacity
		if (this.totalConnections() >= this.maxTotal) {
			const evicted = this.evictOldestIdle();
			Eif (!evicted) {
				throw new Error(
					`Total connection limit reached (max: ${this.maxTotal}), all connections busy`,
				);
			}
		}
 
		// Create new connection
		const sync = new ImapSync(config, this.db, connectorId);
		try {
			await sync.connect();
		} catch (err) {
			// Force-close the failed client to release the underlying TCP socket
			// and prevent async teardown events from leaking after the error.
			sync.forceClose();
			throw err;
		}
 
		const pooled: PooledConnection = {
			sync,
			connectorId,
			lastUsed: Date.now(),
			busy: true,
		};
 
		Eif (!this.connections.has(connectorId)) {
			this.connections.set(connectorId, []);
		}
		this.connections.get(connectorId)?.push(pooled);
 
		return sync;
	}
 
	/**
	 * Releases a connection back to the pool for reuse.
	 */
	release(connectorId: number, sync: ImapSync): void {
		const connectorConns = this.connections.get(connectorId);
		if (!connectorConns) return;
 
		const pooled = connectorConns.find((c) => c.sync === sync);
		if (pooled) {
			pooled.busy = false;
			pooled.lastUsed = Date.now();
		}
	}
 
	/**
	 * Closes all connections and stops the cleanup timer.
	 * Uses force-close to avoid hanging on LOGOUT when connections are
	 * mid-FETCH (e.g., during process shutdown).
	 */
	async shutdown(): Promise<void> {
		if (this.cleanupTimer) {
			clearInterval(this.cleanupTimer);
			this.cleanupTimer = null;
		}
 
		for (const [, conns] of this.connections) {
			for (const conn of conns) {
				conn.sync.forceClose();
			}
		}
 
		this.connections.clear();
	}
 
	/**
	 * Returns the total number of active connections.
	 */
	totalConnections(): number {
		let total = 0;
		for (const [, conns] of this.connections) {
			total += conns.length;
		}
		return total;
	}
 
	/**
	 * Returns the number of connections for a specific connector.
	 */
	connectorConnections(connectorId: number): number {
		return this.connections.get(connectorId)?.length ?? 0;
	}
 
	/**
	 * Evicts connections that have been idle longer than the timeout.
	 */
	private evictIdle(): void {
		const now = Date.now();
		for (const [connectorId, conns] of this.connections) {
			const remaining: PooledConnection[] = [];
			for (const conn of conns) {
				if (!conn.busy && now - conn.lastUsed > this.idleTimeoutMs) {
					conn.sync.disconnect().catch(() => {});
				} else {
					remaining.push(conn);
				}
			}
			if (remaining.length === 0) {
				this.connections.delete(connectorId);
			} else {
				this.connections.set(connectorId, remaining);
			}
		}
	}
 
	/**
	 * Evicts the oldest idle connection across all connectors.
	 * Returns true if a connection was evicted.
	 */
	private evictOldestIdle(): boolean {
		let oldest: { connectorId: number; index: number; lastUsed: number } | null = null;
 
		for (const [connectorId, conns] of this.connections) {
			for (let i = 0; i < conns.length; i++) {
				if (!conns[i].busy) {
					if (!oldest || conns[i].lastUsed < oldest.lastUsed) {
						oldest = { connectorId, index: i, lastUsed: conns[i].lastUsed };
					}
				}
			}
		}
 
		if (!oldest) return false;
 
		const conns = this.connections.get(oldest.connectorId);
		Iif (!conns) return false;
		const [removed] = conns.splice(oldest.index, 1);
		removed.sync.disconnect().catch(() => {});
 
		if (conns.length === 0) {
			this.connections.delete(oldest.connectorId);
		}
 
		return true;
	}
}