TypeScriptWorkerPool.java

package org.egothor.methodatlas.discovery.typescript;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Manages a pool of long-lived Node.js worker processes for TypeScript
 * file scanning.
 *
 * <p>
 * Workers are started on demand when the first scan request arrives and
 * returned to a shared idle queue after each request completes.  Using a pool
 * avoids the Node.js startup overhead (typically 100–300 ms) on every file,
 * which would otherwise dominate scan time for large TypeScript projects.
 * Because workers are started lazily, projects that contain no TypeScript or
 * JavaScript test files never spawn a Node.js process at all.
 * </p>
 *
 * <h2>Worker lifecycle</h2>
 *
 * <ol>
 * <li>Workers are started one by one on the first {@link #scan} calls, up to
 *     {@code poolSize} total.</li>
 * <li>On each scan request, a worker is borrowed from the idle queue.</li>
 * <li>After a successful response the worker is returned to the queue.</li>
 * <li>On failure ({@link TypeScriptWorker.WorkerException} or I/O error),
 *     the worker is killed, a restart event is recorded in the
 *     {@link WorkerCircuitBreaker}, and a fresh worker is created to replace
 *     it — unless the circuit has already tripped.</li>
 * </ol>
 *
 * <h2>Circuit-breaker integration</h2>
 *
 * <p>
 * The pool delegates restart tracking to a shared {@link WorkerCircuitBreaker}.
 * When the circuit opens, no new workers are created.  Scan requests after
 * that point receive an empty result and a warning is logged once per request.
 * </p>
 *
 * <h2>Shutdown</h2>
 *
 * <p>
 * {@link #close()} drains the idle queue, kills all workers, and removes the
 * JVM shutdown hook registered at construction time.  Callers (typically the
 * {@link TypeScriptTestDiscovery}) must call {@code close()} at the end of the
 * scan run.  A JVM shutdown hook serves as a backstop in case {@code close()}
 * is not reached (e.g. an abrupt exit from a test framework).
 * </p>
 *
 * <h2>Thread safety</h2>
 *
 * <p>
 * The pool itself is thread-safe: the {@link BlockingQueue} provides the
 * necessary synchronisation for worker borrowing and returning.  The
 * {@link WorkerCircuitBreaker} is also thread-safe.  Worker instances
 * returned by {@link #borrow(Path)} must be used by at most one thread at a time.
 * </p>
 */
final class TypeScriptWorkerPool implements Closeable {

    private static final Logger LOG = Logger.getLogger(TypeScriptWorkerPool.class.getName());

    /** Milliseconds to wait for a worker to become available in the idle queue. */
    private static final long BORROW_TIMEOUT_MILLIS = 10_000L;

    private final Path bundlePath;
    private final NodeEnvironment nodeEnv;
    private final long workerTimeoutMillis;
    private final WorkerCircuitBreaker circuitBreaker;
    private final BlockingQueue<TypeScriptWorker> idleWorkers;
    private final int poolSize;
    @SuppressWarnings("PMD.DoNotUseThreads")
    private final Thread shutdownHook;

    /** Guards {@link #nextWorkerIndex} during worker creation. */
    private final ReentrantLock workerCreationLock = new ReentrantLock();

    /**
     * Number of workers ever created (for index assignment). An {@link AtomicInteger}
     * so the lock-free fast-path read in {@link #startWorkerOnDemand(Path)} observes a
     * coherent value; the increment runs under {@link #workerCreationLock}.
     */
    private final AtomicInteger nextWorkerIndex = new AtomicInteger();

    /**
     * Creates a worker pool.
     *
     * <p>
     * No workers are started at construction time.  The first worker is
     * started on demand when {@link #scan} is called for the first time.
     * This means that projects containing no TypeScript or JavaScript test
     * files never spawn a Node.js process.
     * </p>
     *
     * @param bundlePath           path to the verified bundle JS file
     * @param nodeEnv              Node.js environment information
     * @param poolSize             maximum number of concurrent workers; must be positive
     * @param workerTimeoutMillis  per-request timeout in milliseconds
     * @param circuitBreaker       restart-limit tracker shared with this pool
     */
    /* default */
    @SuppressWarnings("PMD.DoNotUseThreads")
    TypeScriptWorkerPool(Path bundlePath, NodeEnvironment nodeEnv,
            int poolSize, long workerTimeoutMillis, WorkerCircuitBreaker circuitBreaker) {
        this.bundlePath = bundlePath;
        this.nodeEnv = nodeEnv;
        this.poolSize = poolSize;
        this.workerTimeoutMillis = workerTimeoutMillis;
        this.circuitBreaker = circuitBreaker;
        this.idleWorkers = new ArrayBlockingQueue<>(poolSize);

        // Register a JVM shutdown hook as a safety net in case close() is never called.
        this.shutdownHook = new Thread(this::shutdownAllWorkers, "ts-worker-pool-shutdown");
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

    /**
     * Scans one TypeScript file by delegating to a pooled worker.
     *
     * <p>
     * The worker is borrowed from the idle queue, the request is sent, and
     * the worker is returned after a successful response.  On failure the
     * worker is killed, replaced if the circuit is still closed, and the error
     * is handled as a scan-file error (the caller logs and skips the file
     * rather than aborting the run).
     * </p>
     *
     * @param filePath      absolute path of the file to scan
     * @param functionNames test-function call names
     * @param allowedRoot   scan root for permission sandboxing; passed to
     *                      the worker on restart; may be {@code null}
     * @return list of discovered method descriptors; empty when the circuit is
     *         open or a non-recoverable error occurs
     * @throws IOException if borrowing a worker fails with a hard I/O error
     */
    /* default */ List<TypeScriptWorker.MethodDescriptor> scan(
            Path filePath, List<String> functionNames, Path allowedRoot) throws IOException {

        if (circuitBreaker.isOpen()) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.warning("TypeScript worker pool circuit breaker is open — "
                        + "skipping " + filePath);
            }
            return List.of();
        }

        TypeScriptWorker worker = borrow(allowedRoot);
        if (worker == null) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.warning("No TypeScript worker available within " + BORROW_TIMEOUT_MILLIS
                        + " ms — skipping " + filePath);
            }
            return List.of();
        }

        try {
            List<TypeScriptWorker.MethodDescriptor> result =
                    worker.scan(filePath, functionNames);
            returnWorker(worker);
            return result;
        } catch (TypeScriptWorker.WorkerException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING,
                        "TypeScript worker error scanning " + filePath + " — "
                        + "killing and replacing worker: " + e.getMessage(), e);
            }
            worker.kill("scan error: " + e.getMessage());
            replaceWorker(allowedRoot);
            return List.of();
        }
    }

    /**
     * Shuts down all workers and removes the JVM shutdown hook.
     *
     * <p>
     * This method is idempotent and safe to call from any thread.
     * </p>
     */
    @Override
    public void close() {
        shutdownAllWorkers();
        try {
            Runtime.getRuntime().removeShutdownHook(shutdownHook);
        } catch (IllegalStateException e) {
            // JVM is already shutting down; hook removal is not possible.
            if (LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "Could not remove shutdown hook (JVM shutting down)", e);
            }
        }
    }

    // -------------------------------------------------------------------------
    // Private helpers
    // -------------------------------------------------------------------------

    /**
     * Creates and starts a worker.
     * <p><b>Caller must hold {@link #workerCreationLock}.</b></p>
     */
    private TypeScriptWorker createWorkerUnderLock(Path allowedRoot) throws IOException {
        int index = nextWorkerIndex.getAndIncrement();
        TypeScriptWorker worker = new TypeScriptWorker(bundlePath, nodeEnv,
                workerTimeoutMillis, index);
        worker.start(allowedRoot);
        return worker;
    }

    /**
     * Borrows an idle worker, starting one on demand if the pool has not yet
     * reached {@link #poolSize}, then waiting up to {@link #BORROW_TIMEOUT_MILLIS} ms.
     *
     * @param allowedRoot scan root forwarded to a newly started worker's permission flag
     * @return an idle worker, or {@code null} on timeout
     */
    @SuppressWarnings("PMD.DoNotUseThreads")
    private TypeScriptWorker borrow(Path allowedRoot) {
        startWorkerOnDemand(allowedRoot);
        try {
            return idleWorkers.poll(BORROW_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    /**
     * Starts one new worker and places it in the idle queue if the queue is
     * currently empty and the pool has not yet reached its maximum size.
     * Concurrent callers are serialised by {@link #workerCreationLock}; the
     * condition is rechecked inside the lock to avoid starting duplicate workers.
     */
    private void startWorkerOnDemand(Path allowedRoot) {
        if (!idleWorkers.isEmpty() || nextWorkerIndex.get() >= poolSize) {
            return; // fast path: worker already available or pool is full
        }
        workerCreationLock.lock();
        try {
            if (!idleWorkers.isEmpty() || nextWorkerIndex.get() >= poolSize) {
                return; // another thread already started one
            }
            TypeScriptWorker worker = createWorkerUnderLock(allowedRoot);
            idleWorkers.offer(worker);
        } catch (IOException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING,
                        "Failed to start TypeScript worker on demand (index " + nextWorkerIndex.get() + ")", e);
            }
        } finally {
            workerCreationLock.unlock();
        }
    }

    /** Returns a worker to the idle queue. */
    private void returnWorker(TypeScriptWorker worker) {
        if (!idleWorkers.offer(worker)) {
            // Queue full (should not happen in a properly sized pool), kill the surplus.
            worker.kill("pool queue full on return");
        }
    }

    /**
     * Records a restart event in the circuit breaker and, if the circuit is
     * still closed, starts a replacement worker and puts it in the idle queue.
     *
     * @param allowedRoot scan root for the replacement worker's permission flag
     */
    private void replaceWorker(Path allowedRoot) {
        circuitBreaker.recordRestart();
        if (circuitBreaker.isOpen()) {
            return; // circuit just tripped; don't create more workers
        }
        workerCreationLock.lock();
        try {
            TypeScriptWorker replacement = createWorkerUnderLock(allowedRoot);
            if (!idleWorkers.offer(replacement)) {
                replacement.kill("pool queue full after replacement");
            }
        } catch (IOException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Failed to start replacement TypeScript worker", e);
            }
        } finally {
            workerCreationLock.unlock();
        }
    }

    /** Drains the idle queue and kills all workers. */
    private void shutdownAllWorkers() {
        List<TypeScriptWorker> workers = new ArrayList<>(poolSize);
        idleWorkers.drainTo(workers);
        for (TypeScriptWorker w : workers) {
            w.kill("pool shutdown");
        }
        if (LOG.isLoggable(Level.INFO)) {
            LOG.log(Level.INFO, "TypeScript worker pool shut down ({0} worker(s) stopped)",
                    workers.size());
        }
    }
}