PythonWorkerPool.java

package org.egothor.methodatlas.discovery.python;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Manages a pool of long-lived Python worker processes for Python test file
 * scanning.
 *
 * <p>
 * Workers are started on demand when the first scan request arrives and
 * returned to a shared idle queue after each request completes.  Using a pool
 * avoids the Python startup overhead on every file, which would otherwise
 * dominate scan time for large Python projects.  Because workers are started
 * lazily, projects that contain no Python test files never spawn a Python
 * process at all.
 * </p>
 *
 * <h2>Worker lifecycle</h2>
 *
 * <ol>
 * <li>Workers are started one by one on the first {@link #scan} calls, up to
 *     {@code poolSize} total.</li>
 * <li>On each scan request, a worker is borrowed from the idle queue.</li>
 * <li>After a successful response the worker is returned to the queue.</li>
 * <li>On failure, the worker is killed and a replacement is created unless
 *     the circuit breaker has tripped.</li>
 * </ol>
 *
 * <h2>Shutdown</h2>
 *
 * <p>
 * {@link #close()} drains the idle queue, kills all workers, and removes the
 * JVM shutdown hook.  A JVM shutdown hook serves as a backstop.
 * </p>
 */
final class PythonWorkerPool implements Closeable {

    private static final Logger LOG = Logger.getLogger(PythonWorkerPool.class.getName());

    private static final long BORROW_TIMEOUT_MILLIS = 10_000L;

    private final Path scriptPath;
    private final PythonEnvironment pythonEnv;
    private final long workerTimeoutMillis;
    private final PythonWorkerCircuitBreaker circuitBreaker;
    private final BlockingQueue<PythonWorker> idleWorkers;
    private final int poolSize;
    @SuppressWarnings("PMD.DoNotUseThreads")
    private final Thread shutdownHook;

    private final ReentrantLock workerCreationLock = new ReentrantLock();
    // AtomicInteger so the lock-free fast-path read in startWorkerOnDemand observes a
    // coherent value; the increment runs under workerCreationLock.
    private final AtomicInteger nextWorkerIndex = new AtomicInteger();

    /**
     * Creates a worker pool.  No workers are started at construction time.
     *
     * @param scriptPath          path to the extracted {@code py-scanner.py} script
     * @param pythonEnv           Python environment information
     * @param poolSize            maximum number of concurrent workers; must be positive
     * @param workerTimeoutMillis per-request timeout in milliseconds
     * @param circuitBreaker      restart-limit tracker shared with this pool
     */
    /* default */
    @SuppressWarnings("PMD.DoNotUseThreads")
    PythonWorkerPool(Path scriptPath, PythonEnvironment pythonEnv,
            int poolSize, long workerTimeoutMillis, PythonWorkerCircuitBreaker circuitBreaker) {
        this.scriptPath = scriptPath;
        this.pythonEnv = pythonEnv;
        this.poolSize = poolSize;
        this.workerTimeoutMillis = workerTimeoutMillis;
        this.circuitBreaker = circuitBreaker;
        this.idleWorkers = new ArrayBlockingQueue<>(poolSize);

        this.shutdownHook = new Thread(this::shutdownAllWorkers, "py-worker-pool-shutdown");
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

    /**
     * Scans one Python file by delegating to a pooled worker.
     *
     * @param filePath absolute path of the file to scan
     * @return list of discovered method descriptors; empty when the circuit is
     *         open or a non-recoverable error occurs
     * @throws IOException if borrowing a worker fails with a hard I/O error
     */
    /* default */ List<PythonWorker.MethodDescriptor> scan(Path filePath) throws IOException {
        if (circuitBreaker.isOpen()) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.warning("Python worker pool circuit breaker is open — skipping " + filePath);
            }
            return List.of();
        }

        PythonWorker worker = borrow();
        if (worker == null) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.warning("No Python worker available within " + BORROW_TIMEOUT_MILLIS
                        + " ms — skipping " + filePath);
            }
            return List.of();
        }

        try {
            List<PythonWorker.MethodDescriptor> result = worker.scan(filePath);
            returnWorker(worker);
            return result;
        } catch (PythonWorker.WorkerException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING,
                        "Python worker error scanning " + filePath + " — killing and replacing: "
                        + e.getMessage(), e);
            }
            worker.kill("scan error: " + e.getMessage());
            replaceWorker();
            return List.of();
        }
    }

    /**
     * Shuts down all workers and removes the JVM shutdown hook.  Idempotent.
     */
    @Override
    public void close() {
        shutdownAllWorkers();
        try {
            Runtime.getRuntime().removeShutdownHook(shutdownHook);
        } catch (IllegalStateException e) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.log(Level.FINE, "Could not remove shutdown hook (JVM shutting down)", e);
            }
        }
    }

    // ── Private helpers ───────────────────────────────────────────────

    private PythonWorker createWorkerUnderLock() throws IOException {
        int index = nextWorkerIndex.getAndIncrement();
        PythonWorker worker = new PythonWorker(scriptPath, pythonEnv, workerTimeoutMillis, index);
        worker.start();
        return worker;
    }

    @SuppressWarnings("PMD.DoNotUseThreads")
    private PythonWorker borrow() {
        startWorkerOnDemand();
        try {
            return idleWorkers.poll(BORROW_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    private void startWorkerOnDemand() {
        if (!idleWorkers.isEmpty() || nextWorkerIndex.get() >= poolSize) {
            return;
        }
        workerCreationLock.lock();
        try {
            if (!idleWorkers.isEmpty() || nextWorkerIndex.get() >= poolSize) {
                return;
            }
            PythonWorker worker = createWorkerUnderLock();
            if (!idleWorkers.offer(worker)) {
                if (LOG.isLoggable(Level.WARNING)) {
                    LOG.warning("Idle worker queue full; terminating newly created worker.");
                }
                worker.kill("idle queue full");
            }
        } catch (IOException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING,
                        "Failed to start Python worker on demand (index " + nextWorkerIndex.get() + ")", e);
            }
        } finally {
            workerCreationLock.unlock();
        }
    }

    private void returnWorker(PythonWorker worker) {
        if (!idleWorkers.offer(worker)) {
            worker.kill("pool queue full on return");
        }
    }

    private void replaceWorker() {
        circuitBreaker.recordRestart();
        if (circuitBreaker.isOpen()) {
            return;
        }
        workerCreationLock.lock();
        try {
            PythonWorker replacement = createWorkerUnderLock();
            if (!idleWorkers.offer(replacement)) {
                replacement.kill("pool queue full after replacement");
            }
        } catch (IOException e) {
            if (LOG.isLoggable(Level.WARNING)) {
                LOG.log(Level.WARNING, "Failed to start replacement Python worker", e);
            }
        } finally {
            workerCreationLock.unlock();
        }
    }

    private void shutdownAllWorkers() {
        List<PythonWorker> workers = new ArrayList<>(poolSize);
        idleWorkers.drainTo(workers);
        for (PythonWorker w : workers) {
            w.kill("pool shutdown");
        }
        if (LOG.isLoggable(Level.INFO)) {
            LOG.log(Level.INFO, "Python worker pool shut down ({0} worker(s) stopped)",
                    workers.size());
        }
    }
}