From e1cab41415ff4c267b6bf0ed7a3538918fde5f83 Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Mon, 29 Jun 2026 07:39:30 +0000 Subject: [PATCH 1/2] feat: init cgroups constraints --- packages/engine-multi/src/api.ts | 2 + packages/engine-multi/src/api/call-worker.ts | 6 + packages/engine-multi/src/engine.ts | 4 + .../engine-multi/src/test/worker-functions.ts | 11 + packages/engine-multi/src/worker/cgroup.ts | 262 ++++++++++++++++++ packages/engine-multi/src/worker/pool.ts | 54 ++++ .../test/worker/cgroup-enforcement.test.ts | 51 ++++ .../engine-multi/test/worker/cgroup.test.ts | 73 +++++ packages/ws-worker/src/start.ts | 4 + packages/ws-worker/src/util/cli.ts | 20 ++ 10 files changed, 487 insertions(+) create mode 100644 packages/engine-multi/src/worker/cgroup.ts create mode 100644 packages/engine-multi/test/worker/cgroup-enforcement.test.ts create mode 100644 packages/engine-multi/test/worker/cgroup.test.ts diff --git a/packages/engine-multi/src/api.ts b/packages/engine-multi/src/api.ts index 29450b16a..783ac47d0 100644 --- a/packages/engine-multi/src/api.ts +++ b/packages/engine-multi/src/api.ts @@ -58,6 +58,8 @@ const createAPI = async function ( maxWorkers: options.maxWorkers, memoryLimitMb: options.memoryLimitMb || DEFAULT_MEMORY_LIMIT, + cgroupMemoryLimitMb: options.cgroupMemoryLimitMb, + cgroupParent: options.cgroupParent, runTimeoutMs: options.runTimeoutMs, statePropsToRemove: options.statePropsToRemove ?? [ diff --git a/packages/engine-multi/src/api/call-worker.ts b/packages/engine-multi/src/api/call-worker.ts index c3a062407..8a8cbdea5 100644 --- a/packages/engine-multi/src/api/call-worker.ts +++ b/packages/engine-multi/src/api/call-worker.ts @@ -13,6 +13,8 @@ type WorkerOptions = { env?: any; timeout?: number; // ms memoryLimitMb?: number; + cgroupMemoryLimitMb?: number; // hard cgroup v2 memory.max ceiling + cgroupParent?: string; // parent cgroup for per-child leaves proxyStdout?: boolean; // print internal stdout to console }; @@ -27,6 +29,8 @@ export default function initWorkers( env = {}, maxWorkers = 5, memoryLimitMb, + cgroupMemoryLimitMb, + cgroupParent, proxyStdout = false, } = options; @@ -36,6 +40,8 @@ export default function initWorkers( maxWorkers, env, memoryLimitMb, + cgroupMemoryLimitMb, + cgroupParent, proxyStdout, }, logger diff --git a/packages/engine-multi/src/engine.ts b/packages/engine-multi/src/engine.ts index 38467b97c..2dc5ce545 100644 --- a/packages/engine-multi/src/engine.ts +++ b/packages/engine-multi/src/engine.ts @@ -75,6 +75,8 @@ export type EngineOptions = { maxWorkers?: number; memoryLimitMb?: number; stateLimitMb?: number; + cgroupMemoryLimitMb?: number; + cgroupParent?: string; payloadLimitMb?: number; logPayloadLimitMb?: number; repoDir: string; @@ -142,6 +144,8 @@ const createEngine = async ( { maxWorkers: options.maxWorkers, memoryLimitMb: defaultMemoryLimit, + cgroupMemoryLimitMb: options.cgroupMemoryLimitMb, + cgroupParent: options.cgroupParent, proxyStdout: options.proxyStdout, }, options.logger diff --git a/packages/engine-multi/src/test/worker-functions.ts b/packages/engine-multi/src/test/worker-functions.ts index 8a0068827..a56f9d5be 100644 --- a/packages/engine-multi/src/test/worker-functions.ts +++ b/packages/engine-multi/src/test/worker-functions.ts @@ -119,6 +119,17 @@ const tasks = { // Array(1e9).fill('mario') }, + // Allocate memory OUTSIDE the V8 heap. Buffer.alloc lives in native memory, + // so --max-old-space-size can't catch this — only an OS-level limit (a + // cgroup memory.max ceiling) will. Zero-filling forces the pages resident so + // they count against the cgroup's accounting. + blowNativeMemory: async () => { + const chunks = []; + while (true) { + chunks.push(Buffer.alloc(10 * 1024 * 1024, 1)); // 10mb, resident + } + }, + // Some useful code // const stats = v8.getHeapStatistics(); // console.log( diff --git a/packages/engine-multi/src/worker/cgroup.ts b/packages/engine-multi/src/worker/cgroup.ts new file mode 100644 index 000000000..1aec9e33c --- /dev/null +++ b/packages/engine-multi/src/worker/cgroup.ts @@ -0,0 +1,262 @@ +// cgroup v2 memory enforcement for pooled child processes. +// +// The pool already limits the V8 heap via --max-old-space-size, but that does +// not bound native allocations, buffers or overall RSS. On Linux hosts that +// expose a writable cgroup v2 hierarchy we additionally place each child in its +// own leaf cgroup with a hard `memory.max` ceiling, so the kernel OOM-kills a +// runaway run before it can starve its neighbours. +// +// This is strictly best-effort: on non-Linux hosts, cgroup v1, or when we lack +// the permissions/delegation to create cgroups, every function degrades to a +// no-op and the caller falls back to heap-limit-only behaviour. + +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import type { Logger } from '@openfn/logger'; + +const CGROUP_ROOT = '/sys/fs/cgroup'; + +// Leaf cgroup into which we relocate any processes that sit in a cgroup we need +// to delegate the memory controller from (see enableMemoryController). +const LEADER_NAME = 'openfn-leader'; + +// Default parent cgroup under which per-child leaf cgroups are created. +// Must be a path the worker process can write to (typically requires cgroup +// delegation in a container, or running as root). +export const DEFAULT_CGROUP_PARENT = path.join(CGROUP_ROOT, 'openfn'); + +export type CgroupHandle = { + path: string; // absolute path to the leaf cgroup directory + eventsPath: string; // absolute path to the leaf's memory.events file +}; + +// Cache availability per parent so we only probe the filesystem (and warn) once. +const availabilityCache: Record = {}; + +// True if a cgroup interface file (e.g. cgroup.controllers, subtree_control) +// lists the given whitespace-separated token. +const fileLists = (file: string, token: string) => + fs.readFileSync(file, 'utf8').split(/\s+/).includes(token); + +// --------------------------------------------------------------------------- +// Availability & controller delegation (private) +// --------------------------------------------------------------------------- + +// Relocate every process in `dir` into a dedicated leader leaf cgroup, so `dir` +// itself becomes empty and its controllers can be delegated. This is the same +// move systemd/runc make; it includes the worker's own process when `dir` is +// the container's cgroup namespace root. +const moveProcsToLeader = (dir: string, logger: Logger) => { + const leader = path.join(dir, LEADER_NAME); + if (!fs.existsSync(leader)) { + fs.mkdirSync(leader); + } + const procs = fs + .readFileSync(path.join(dir, 'cgroup.procs'), 'utf8') + .trim() + .split('\n') + .filter(Boolean); + for (const pid of procs) { + try { + fs.writeFileSync(path.join(leader, 'cgroup.procs'), pid); + } catch (e) { + // Some processes (e.g. kernel threads) can't be moved; skip them. + logger.debug(`cgroup: could not move pid ${pid} into leader: ${ + (e as Error).message + }`); + } + } +}; + +// Enable the memory controller for the children of `dir` (idempotent). +const enableMemoryController = (dir: string, logger: Logger) => { + const subtree = path.join(dir, 'cgroup.subtree_control'); + if (fileLists(subtree, 'memory')) { + return; + } + try { + fs.writeFileSync(subtree, '+memory'); + } catch (e) { + // EBUSY means `dir` still holds member processes (cgroup v2's "no internal + // processes" rule forbids delegating from a populated cgroup). This is the + // common containerised case where the worker lives in the namespace root: + // move those processes into a leader leaf, then retry the delegation. + if ((e as NodeJS.ErrnoException).code === 'EBUSY') { + logger.debug( + `cgroup: ${dir} is populated; relocating processes to delegate memory` + ); + moveProcsToLeader(dir, logger); + fs.writeFileSync(subtree, '+memory'); + } else { + throw e; + } + } +}; + +// Ensure the parent cgroup exists and delegates the memory controller all the +// way down from the cgroup root, so leaf cgroups created under it can set +// memory.max. Throws on any failure (caught by the caller). +const ensureParent = (parent: string, logger: Logger) => { + const rel = path.relative(CGROUP_ROOT, parent); + if (!rel || rel.startsWith('..') || path.isAbsolute(rel)) { + throw new Error(`cgroup parent ${parent} is not under ${CGROUP_ROOT}`); + } + + // Delegate the memory controller from the root down to (and including) the + // parent, creating each intermediate cgroup as needed. Any process sitting in + // a cgroup we delegate from (notably the worker itself, in the namespace + // root) is relocated to a leader leaf first; processes otherwise only live in + // the leaves below `parent`. + enableMemoryController(CGROUP_ROOT, logger); + let dir = CGROUP_ROOT; + for (const segment of rel.split(path.sep).filter(Boolean)) { + dir = path.join(dir, segment); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir); + } + enableMemoryController(dir, logger); + } +}; + +const detect = (parent: string, logger: Logger): boolean => { + if (os.platform() !== 'linux') { + return false; + } + + // cgroup v2 (the unified hierarchy) exposes cgroup.controllers at the root; + // cgroup v1 does not. + const rootControllers = path.join(CGROUP_ROOT, 'cgroup.controllers'); + if (!fs.existsSync(rootControllers)) { + return false; + } + + try { + if (!fileLists(rootControllers, 'memory')) { + return false; + } + ensureParent(parent, logger); + return true; + } catch (e) { + logger.debug( + `cgroup: setup of parent ${parent} failed: ${(e as Error).message}` + ); + return false; + } +}; + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +// Returns true when cgroup v2 memory enforcement can be used under `parent`. +// Caches the result and warns (once per parent) when unavailable. +export const isCgroupV2Available = ( + parent: string, + logger: Logger +): boolean => { + if (parent in availabilityCache) { + return availabilityCache[parent]; + } + + const available = detect(parent, logger); + availabilityCache[parent] = available; + if (!available) { + logger.warn( + 'cgroup: v2 memory enforcement unavailable on this host; ' + + 'falling back to heap-limit only' + ); + } + return available; +}; + +// Create a leaf cgroup for `pid`, set its hard memory ceiling and move the +// process into it. Returns a handle, or null on any failure (best-effort). +export const createChildCgroup = ( + parent: string, + pid: number, + limitBytes: number, + logger: Logger +): CgroupHandle | null => { + const dir = path.join(parent, `run-${pid}`); + try { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir); + } + fs.writeFileSync(path.join(dir, 'memory.max'), String(limitBytes)); + // Stop the process escaping the limit via swap. The swap controller may be + // absent, so this is best-effort. + try { + fs.writeFileSync(path.join(dir, 'memory.swap.max'), '0'); + } catch (e) { + // no swap controller; ignore + } + // Moving the pid in must come last: a cgroup with member processes cannot + // have its controllers reconfigured. + fs.writeFileSync(path.join(dir, 'cgroup.procs'), String(pid)); + logger.debug( + `cgroup: constrained process ${pid} to ${limitBytes} bytes at ${dir}` + ); + return { path: dir, eventsPath: path.join(dir, 'memory.events') }; + } catch (e) { + logger.warn( + `cgroup: failed to constrain process ${pid}: ${(e as Error).message}` + ); + return null; + } +}; + +// Returns true if the kernel OOM-killed anything in this cgroup. Used to tell a +// cgroup memory kill (a bare SIGKILL with no V8 message) apart from other crashes. +export const hasOomKill = (handle: CgroupHandle): boolean => { + try { + const content = fs.readFileSync(handle.eventsPath, 'utf8'); + for (const line of content.split('\n')) { + const [key, value] = line.trim().split(/\s+/); + if ( + (key === 'oom_kill' || key === 'oom_group_kill') && + parseInt(value, 10) > 0 + ) { + return true; + } + } + } catch (e) { + // events file gone (cgroup already removed) or unreadable + } + return false; +}; + +// Remove a leaf cgroup. The cgroup can only be removed once empty, so we retry +// a handful of times while the killed process is reaped. Best-effort and +// non-blocking. +export const removeChildCgroup = ( + handle: CgroupHandle | null, + logger: Logger, + attempts = 10 +) => { + if (!handle) { + return; + } + try { + fs.rmdirSync(handle.path); + } catch (e) { + const err = e as NodeJS.ErrnoException; + if ( + attempts > 0 && + (err.code === 'EBUSY' || err.code === 'ENOTEMPTY') + ) { + setTimeout(() => removeChildCgroup(handle, logger, attempts - 1), 50); + return; + } + if (err.code !== 'ENOENT') { + logger.debug(`cgroup: could not remove ${handle.path}: ${err.message}`); + } + } +}; + +// For tests: clear the cached availability probes. +export const _resetAvailabilityCache = () => { + for (const key of Object.keys(availabilityCache)) { + delete availabilityCache[key]; + } +}; diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index 48a268803..c3f3c0e0d 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -13,6 +13,14 @@ import { import { HANDLED_EXIT_CODE } from '../events'; import { Logger } from '@openfn/logger'; import type { PayloadLimits } from './thread/runtime'; +import { + CgroupHandle, + DEFAULT_CGROUP_PARENT, + createChildCgroup, + hasOomKill, + isCgroupV2Available, + removeChildCgroup, +} from './cgroup'; export type PoolOptions = { capacity?: number; // defaults to 5 @@ -20,6 +28,12 @@ export type PoolOptions = { env?: Record; // default environment for workers memoryLimitMb?: number; // --max-old-space-size for child processes + // Hard memory.max ceiling (mb) applied to each child via a cgroup v2 leaf. + // Best-effort: ignored on hosts without a writable cgroup v2 hierarchy. + cgroupMemoryLimitMb?: number; + // Parent cgroup under which per-child leaf cgroups are created. + cgroupParent?: string; + proxyStdout?: boolean; // print internal stdout to console }; @@ -80,6 +94,21 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { // Keep track of all the workers we created const allWorkers: Record = {}; + // cgroup v2 leaf per child (keyed by pid), when memory enforcement is enabled + const cgroupParent = options.cgroupParent || DEFAULT_CGROUP_PARENT; + const cgroupEnabled = + !!options.cgroupMemoryLimitMb && + isCgroupV2Available(cgroupParent, logger); + const cgroups: Record = {}; + + // Tear down a child's leaf cgroup once it has been killed (best-effort). + const cleanupCgroup = (worker: ChildProcess | false) => { + if (worker && worker.pid && cgroups[worker.pid]) { + removeChildCgroup(cgroups[worker.pid], logger); + delete cgroups[worker.pid]; + } + }; + const init = (maybeChild: ChildProcess | false) => { let child: ChildProcess; if (!maybeChild) { @@ -107,6 +136,17 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug('pool: Created new child process', child.pid); allWorkers[child.pid!] = child; + + // Place the child in its own cgroup with a hard memory ceiling. The leaf + // lives for the lifetime of the (reused) child and is removed when it dies. + if (cgroupEnabled && child.pid) { + cgroups[child.pid] = createChildCgroup( + cgroupParent, + child.pid, + options.cgroupMemoryLimitMb! * 1024 * 1024, + logger + ); + } } else { child = maybeChild as ChildProcess; logger.debug('pool: Using existing child process', child.pid); @@ -153,6 +193,17 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { logger.debug(`pool: Worker exited unexpectedly with code ${code}`); clearTimeout(timeout); + // A cgroup memory kill is a bare SIGKILL with no V8 message, so check + // the cgroup's OOM counter before falling back to scraping stderr. + const handle = worker.pid ? cgroups[worker.pid] : null; + if (handle && hasOomKill(handle)) { + killWorker(worker); + // restore a placeholder to the queue + finish(false); + reject(new OOMError()); + return; + } + // Read the stderr stream from the worked to see if this looks like an OOM error const rl = readline.createInterface({ input: worker.stderr!, @@ -265,6 +316,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (worker) { logger.debug('pool: destroying worker ', worker.pid); worker.kill(); + cleanupCgroup(worker); delete allWorkers[worker.pid!]; } }; @@ -305,6 +357,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { const worker = pool.pop(); if (worker) { killPromises.push(waitForWorkerExit(worker)); + cleanupCgroup(worker); delete allWorkers[worker.pid!]; } } @@ -312,6 +365,7 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (immediate) { Object.values(allWorkers).forEach((worker) => { killPromises.push(waitForWorkerExit(worker, 1)); + cleanupCgroup(worker); delete allWorkers[worker.pid!]; }); } diff --git a/packages/engine-multi/test/worker/cgroup-enforcement.test.ts b/packages/engine-multi/test/worker/cgroup-enforcement.test.ts new file mode 100644 index 000000000..aace0eca5 --- /dev/null +++ b/packages/engine-multi/test/worker/cgroup-enforcement.test.ts @@ -0,0 +1,51 @@ +import test from 'ava'; +import os from 'node:os'; +import path from 'node:path'; +import { createMockLogger } from '@openfn/logger'; + +import createPool from '../../src/worker/pool'; + +// These tests prove the kernel actually OOM-kills a runaway run via the +// cgroup's memory.max ceiling. That only works on Linux with a writable +// cgroup v2 hierarchy (e.g. a privileged container), so they are skipped +// everywhere else — including CI on macOS. +// +// docker run --rm -it --privileged -v "$PWD":/kit -w /kit node:24 bash +// corepack enable && pnpm install && pnpm --filter @openfn/engine-multi build +// cd packages/engine-multi && pnpm ava test/worker/cgroup-enforcement.test.ts +const linuxOnly = os.platform() === 'linux' ? test.serial : test.serial.skip; + +const workerPath = path.resolve('dist/test/worker-functions.js'); +const logger = createMockLogger(); + +// Ceiling above Node's baseline RSS but low enough that blowNativeMemory +// crosses it almost immediately. memoryLimitMb is deliberately left unset so +// the V8 heap limit can't be what kills the run — only the cgroup can. +const cgroupMemoryLimitMb = 200; + +linuxOnly( + 'cgroup OOM-kills a run that exceeds memory.max and surfaces OOMError', + async (t) => { + const pool = createPool(workerPath, { cgroupMemoryLimitMb }, logger); + + await t.throwsAsync(() => pool.exec('blowNativeMemory', []), { + name: 'OOMError', + }); + + await pool.destroy(); + } +); + +linuxOnly('pool recovers after a cgroup OOM kill', async (t) => { + const pool = createPool(workerPath, { cgroupMemoryLimitMb }, logger); + + await t.throwsAsync(() => pool.exec('blowNativeMemory', []), { + name: 'OOMError', + }); + + // The pool should restore the dead worker's slot and keep working. + const result = await pool.exec('test', []); + t.is(result, 42); + + await pool.destroy(); +}); diff --git a/packages/engine-multi/test/worker/cgroup.test.ts b/packages/engine-multi/test/worker/cgroup.test.ts new file mode 100644 index 000000000..28a779338 --- /dev/null +++ b/packages/engine-multi/test/worker/cgroup.test.ts @@ -0,0 +1,73 @@ +import test from 'ava'; +import os from 'node:os'; +import { createMockLogger } from '@openfn/logger'; + +import { + DEFAULT_CGROUP_PARENT, + createChildCgroup, + hasOomKill, + isCgroupV2Available, + removeChildCgroup, + _resetAvailabilityCache, +} from '../../src/worker/cgroup'; + +const logger = createMockLogger(); + +const isLinux = os.platform() === 'linux'; + +test.beforeEach(() => { + _resetAvailabilityCache(); +}); + +test('DEFAULT_CGROUP_PARENT lives under the cgroup root', (t) => { + t.is(DEFAULT_CGROUP_PARENT, '/sys/fs/cgroup/openfn'); +}); + +// cgroups don't exist on macOS/Windows, so the whole module must no-op there. +if (!isLinux) { + test('isCgroupV2Available returns false on non-linux hosts', (t) => { + t.false(isCgroupV2Available(DEFAULT_CGROUP_PARENT, logger)); + }); + + test('availability probe is cached and only warns once', (t) => { + const l = createMockLogger('test', { level: 'debug' }); + isCgroupV2Available('/sys/fs/cgroup/test-cache', l); + isCgroupV2Available('/sys/fs/cgroup/test-cache', l); + isCgroupV2Available('/sys/fs/cgroup/test-cache', l); + + const warnings = l._history.filter((h: any) => h[0] === 'warn'); + t.is(warnings.length, 1); + }); +} + +test('createChildCgroup returns null when the parent is not writable', (t) => { + const handle = createChildCgroup( + '/this/path/does/not/exist', + 12345, + 256 * 1024 * 1024, + logger + ); + t.is(handle, null); +}); + +test('hasOomKill returns false when the events file is missing', (t) => { + t.false( + hasOomKill({ + path: '/no/such/cgroup', + eventsPath: '/no/such/cgroup/memory.events', + }) + ); +}); + +test('removeChildCgroup tolerates a null handle', (t) => { + t.notThrows(() => removeChildCgroup(null, logger)); +}); + +test('removeChildCgroup tolerates a non-existent cgroup', (t) => { + t.notThrows(() => + removeChildCgroup( + { path: '/no/such/cgroup', eventsPath: '/no/such/cgroup/memory.events' }, + logger + ) + ); +}); diff --git a/packages/ws-worker/src/start.ts b/packages/ws-worker/src/start.ts index 4e08e2601..f37f8b2f1 100644 --- a/packages/ws-worker/src/start.ts +++ b/packages/ws-worker/src/start.ts @@ -112,6 +112,10 @@ if (args.mock) { repoDir: args.repoDir, memoryLimitMb: args.runMemory, stateLimitMb: args.stateMemory, + // Hard cgroup ceiling sits above the V8 heap limit so GC fires first; + // default to run-memory + 128mb of headroom for native/buffer allocations. + cgroupMemoryLimitMb: args.cgroupMemory ?? (args.runMemory ?? 500) + 128, + cgroupParent: args.cgroupParent, maxWorkers: effectiveCapacity, statePropsToRemove: args.statePropsToRemove, runTimeoutMs: args.maxRunDurationSeconds * 1000, diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index f2850a383..e54e86f6a 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -16,6 +16,8 @@ type Args = { batchLimit?: number; batchLogs: boolean; capacity: number; + cgroupMemory?: number; + cgroupParent?: string; workloops?: string; claimTimeoutSeconds?: number; collectionsUrl?: string; @@ -82,6 +84,8 @@ export default function parseArgs(argv: string[]): Args { WORKER_BATCH_LIMIT, WORKER_BATCH_LOGS, WORKER_CAPACITY, + WORKER_CGROUP_MEMORY_MB, + WORKER_CGROUP_PARENT, WORKER_CLAIM_TIMEOUT_SECONDS, WORKER_COLLECTIONS_URL, WORKER_COLLECTIONS_VERSION, @@ -231,6 +235,16 @@ export default function parseArgs(argv: string[]): Args { 'Maximum memory allocated to a single run, in mb. Env: WORKER_MAX_PAYLOAD_MB', type: 'number', }) + .option('cgroup-memory', { + description: + 'Hard memory ceiling (mb) enforced per run via a cgroup v2 leaf. Linux only; falls back to heap-limit only when unavailable. Defaults to run-memory + 128. Env: WORKER_CGROUP_MEMORY_MB', + type: 'number', + }) + .option('cgroup-parent', { + description: + 'Parent cgroup path under which per-run leaf cgroups are created. Must be writable (cgroup delegation/root). Default /sys/fs/cgroup/openfn. Env: WORKER_CGROUP_PARENT', + type: 'string', + }) .option('max-run-duration-seconds', { alias: 't', @@ -339,6 +353,12 @@ export default function parseArgs(argv: string[]): Args { (WORKER_MAX_STATE_MEMORY_MB ? parseInt(WORKER_MAX_STATE_MEMORY_MB, 10) : undefined), + cgroupMemory: + args.cgroupMemory ?? + (WORKER_CGROUP_MEMORY_MB ? parseInt(WORKER_CGROUP_MEMORY_MB) : undefined), + cgroupParent: setArg(args.cgroupParent, WORKER_CGROUP_PARENT) as + | string + | undefined, payloadMemory: setArg(args.payloadMemory, WORKER_MAX_PAYLOAD_MB, 10), logPayloadMemory: setArg( args.logPayloadMemory, From 9af5b7e138e5ab100cd7bd58bfde536f351ce0de Mon Sep 17 00:00:00 2001 From: Farhan Yahaya Date: Mon, 29 Jun 2026 08:30:44 +0000 Subject: [PATCH 2/2] feat: determine where OOM came from --- packages/engine-multi/src/errors.ts | 9 ++++++++- packages/engine-multi/src/util/serialize-error.ts | 1 + packages/engine-multi/src/worker/pool.ts | 11 ++++++++++- .../test/worker/cgroup-enforcement.test.ts | 3 ++- packages/engine-multi/test/worker/pool.test.ts | 12 +++++++++--- 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/engine-multi/src/errors.ts b/packages/engine-multi/src/errors.ts index ca88cce83..9ac2e4eef 100644 --- a/packages/engine-multi/src/errors.ts +++ b/packages/engine-multi/src/errors.ts @@ -71,14 +71,21 @@ export class AutoinstallError extends EngineError { } } +// Where an OOM kill came from: +// - 'heap' the V8 heap limit (thread resourceLimits or --max-old-space-size) +// - 'cgroup' the OS cgroup memory.max ceiling (kernel SIGKILL) +export type OOMSource = 'heap' | 'cgroup'; + export class OOMError extends EngineError { severity = 'kill'; name = 'OOMError'; message; + source: OOMSource; - constructor() { + constructor(source: OOMSource = 'heap') { super(); + this.source = source; this.message = `Run exceeded maximum memory usage`; } } diff --git a/packages/engine-multi/src/util/serialize-error.ts b/packages/engine-multi/src/util/serialize-error.ts index 73b8a3e53..48718be73 100644 --- a/packages/engine-multi/src/util/serialize-error.ts +++ b/packages/engine-multi/src/util/serialize-error.ts @@ -4,6 +4,7 @@ export default (error: any) => { name: error.name, type: error.type, subtype: error.subtype, + source: error.source, severity: error.severity || 'crash', }; }; diff --git a/packages/engine-multi/src/worker/pool.ts b/packages/engine-multi/src/worker/pool.ts index c3f3c0e0d..dcda57d08 100644 --- a/packages/engine-multi/src/worker/pool.ts +++ b/packages/engine-multi/src/worker/pool.ts @@ -197,10 +197,14 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { // the cgroup's OOM counter before falling back to scraping stderr. const handle = worker.pid ? cgroups[worker.pid] : null; if (handle && hasOomKill(handle)) { + logger.error( + `pool: worker ${worker.pid} was killed by the OS for exceeding ` + + `its cgroup memory limit (${options.cgroupMemoryLimitMb}mb)` + ); killWorker(worker); // restore a placeholder to the queue finish(false); - reject(new OOMError()); + reject(new OOMError('cgroup')); return; } @@ -214,6 +218,9 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { if (worker.stderr && worker.stderr?.readableLength > 0) { for await (const line of rl) { if (line.match(/JavaScript heap out of memory/)) { + logger.error( + `pool: worker ${worker.pid} exceeded the V8 heap limit` + ); killWorker(worker); // restore a placeholder to the queue finish(false); @@ -301,6 +308,8 @@ function createPool(script: string, options: PoolOptions = {}, logger: Logger) { // @ts-ignore e.severity = evt.error.severity; e.name = evt.error.name; + // @ts-ignore preserve the OOM source ('heap'/'cgroup') across IPC + e.source = evt.error.source; reject(e); finish(worker); diff --git a/packages/engine-multi/test/worker/cgroup-enforcement.test.ts b/packages/engine-multi/test/worker/cgroup-enforcement.test.ts index aace0eca5..7054097c2 100644 --- a/packages/engine-multi/test/worker/cgroup-enforcement.test.ts +++ b/packages/engine-multi/test/worker/cgroup-enforcement.test.ts @@ -28,9 +28,10 @@ linuxOnly( async (t) => { const pool = createPool(workerPath, { cgroupMemoryLimitMb }, logger); - await t.throwsAsync(() => pool.exec('blowNativeMemory', []), { + const err = await t.throwsAsync(() => pool.exec('blowNativeMemory', []), { name: 'OOMError', }); + t.is((err as any).source, 'cgroup'); await pool.destroy(); } diff --git a/packages/engine-multi/test/worker/pool.test.ts b/packages/engine-multi/test/worker/pool.test.ts index 5a3b8d9dc..6c02ccc0d 100644 --- a/packages/engine-multi/test/worker/pool.test.ts +++ b/packages/engine-multi/test/worker/pool.test.ts @@ -179,9 +179,11 @@ test('throw if memory limit is exceeded', async (t) => { try { await pool.exec('blowMemory', [], { memoryLimitMb: 100 }); + t.fail('expected the run to OOM'); } catch (e: any) { t.is(e.message, 'Run exceeded maximum memory usage'); t.is(e.name, 'OOMError'); + t.is(e.source, 'heap'); } }); @@ -200,9 +202,13 @@ test('child process should not have --max-old-space-size when memoryLimitMb is n test('pool recovers after process-level OOM', async (t) => { const pool = createPool(workerPath, { memoryLimitMb: 50 }, logger); - await t.throwsAsync(() => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), { - name: 'OOMError', - }); + const err = await t.throwsAsync( + () => pool.exec('blowMemory', [], { memoryLimitMb: 20 }), + { + name: 'OOMError', + } + ); + t.is((err as any).source, 'heap'); // Pool should still be functional after the OOM const result = await pool.exec('test', [42]);