Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | 15x 15x 15x 15x 15x 1x 1x 1x 9x 12x 10x 4x 1x 1x 15x 1x 13x 13x 13x 13x 13x 5x 13x 2x 13x 1x 1x 13x 13x 3x 3x 3x | import { simpleParser } from "mailparser";
import type { FolderInfo, IngestConnector, RawMessage } from "./types.js";
/**
* Payload shape expected from a Cloudflare Email Worker.
*
* The worker receives an EmailMessage, reads the raw RFC 5322 stream,
* base64-encodes it, and POSTs this JSON to Stork's webhook endpoint.
*/
export interface CloudflareEmailPayload {
/** Envelope sender */
from: string;
/** Envelope recipient */
to: string;
/** Raw RFC 5322 message, base64-encoded */
raw: string;
/** Size of the raw message in bytes */
rawSize: number;
}
export interface CloudflareEmailConfig {
/**
* Shared secret for authenticating incoming webhooks.
* The worker must send this in the `Authorization: Bearer <secret>` header.
*/
webhookSecret: string;
}
/**
* IngestConnector backed by Cloudflare Email Workers.
*
* Unlike IMAP (pull-based), this connector is push-based: a Cloudflare Email
* Worker receives mail at the edge and POSTs it to Stork's webhook endpoint.
* Messages are buffered in memory and yielded via fetchMessages().
*
* Typical flow:
* 1. Cloudflare receives email at a routing address
* 2. Email Worker reads the raw stream, base64-encodes it, POSTs to Stork
* 3. Stork's webhook route calls pushMessage() on this connector
* 4. The sync engine calls fetchMessages() to drain the buffer
*/
export class CloudflareEmailIngestConnector implements IngestConnector {
readonly name = "cloudflare-email";
private buffer: RawMessage[] = [];
private nextUid = 1;
private webhookSecret: string;
private connected = false;
constructor(config: CloudflareEmailConfig) {
this.webhookSecret = config.webhookSecret;
}
async connect(): Promise<void> {
this.connected = true;
}
async disconnect(): Promise<void> {
this.connected = false;
}
async listFolders(): Promise<FolderInfo[]> {
// Webhook-received mail lands in a single virtual INBOX
return [
{
path: "INBOX",
name: "Inbox",
delimiter: "/",
flags: [],
},
];
}
async *fetchMessages(_folder: string, sinceUid: number): AsyncIterable<RawMessage> {
for (const msg of this.buffer) {
if (msg.uid > sinceUid) {
yield msg;
}
}
}
/**
* Validates the webhook secret from an incoming request.
* Returns true if the provided secret matches the configured one.
*/
validateSecret(secret: string): boolean {
// Constant-time comparison to prevent timing attacks
if (secret.length !== this.webhookSecret.length) return false;
let result = 0;
for (let i = 0; i < secret.length; i++) {
result |= secret.charCodeAt(i) ^ this.webhookSecret.charCodeAt(i);
}
return result === 0;
}
/**
* Push a message received via webhook into the connector's buffer.
* Called by the webhook route handler after validating the request.
*
* @returns The UID assigned to the buffered message.
*/
async pushMessage(payload: CloudflareEmailPayload): Promise<number> {
const rawBuffer = Buffer.from(payload.raw, "base64");
const parsed = await simpleParser(rawBuffer);
const uid = this.nextUid++;
const fromAddr = parsed.from?.value?.[0];
const toAddrs = parsed.to
? (Array.isArray(parsed.to) ? parsed.to : [parsed.to]).flatMap((addr) =>
addr.value.map((v) => ({
address: v.address ?? "",
name: v.name ?? undefined,
})),
)
: undefined;
const ccAddrs = parsed.cc
? (Array.isArray(parsed.cc) ? parsed.cc : [parsed.cc]).flatMap((addr) =>
addr.value.map((v) => ({
address: v.address ?? "",
name: v.name ?? undefined,
})),
)
: undefined;
const message: RawMessage = {
uid,
messageId: parsed.messageId ?? undefined,
inReplyTo: parsed.inReplyTo ?? undefined,
subject: parsed.subject ?? undefined,
from: fromAddr
? { address: fromAddr.address ?? "", name: fromAddr.name ?? undefined }
: undefined,
to: toAddrs,
cc: ccAddrs,
date: parsed.date ?? undefined,
textBody: parsed.text ?? undefined,
htmlBody: typeof parsed.html === "string" ? parsed.html : undefined,
flags: [],
size: payload.rawSize,
attachments: (parsed.attachments ?? [])
.filter((att) => att.content != null)
.map((att) => ({
filename: att.filename ?? undefined,
contentType:
typeof att.contentType === "string" ? att.contentType : "application/octet-stream",
size: typeof att.size === "number" ? att.size : att.content.length,
contentId: att.contentId ?? undefined,
content: att.content,
})),
};
this.buffer.push(message);
return uid;
}
/**
* Clear all buffered messages up to and including the given UID.
* Called after the sync engine has persisted messages to storage.
*/
acknowledge(upToUid: number): void {
this.buffer = this.buffer.filter((m) => m.uid > upToUid);
}
/** Returns the number of buffered messages not yet consumed. */
get pendingCount(): number {
return this.buffer.length;
}
/** Returns whether the connector is in a connected state. */
get isConnected(): boolean {
return this.connected;
}
}
|