PythonWorkerCircuitBreaker.java
package org.egothor.methodatlas.discovery.python;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Tracks consecutive Python worker-process restarts and trips an open circuit
* when the restart rate exceeds safe operating limits.
*
* <h2>Policy</h2>
*
* <p>
* The circuit opens when the number of worker restarts within the trailing
* {@code windowSeconds} sliding window reaches or exceeds {@code maxRestarts}.
* Once open, the circuit never closes — the plugin is disabled for the
* remainder of the scan run.
* </p>
*
* <h2>Thread safety</h2>
*
* <p>
* All mutating methods are guarded by a {@link ReentrantLock}.
* </p>
*/
final class PythonWorkerCircuitBreaker {
private static final Logger LOG = Logger.getLogger(PythonWorkerCircuitBreaker.class.getName());
private final int maxRestarts;
private final Duration window;
private final Clock clock;
private final ReentrantLock lock = new ReentrantLock();
private final Deque<Instant> restartTimestamps = new ArrayDeque<>();
private boolean open;
/**
* Creates a circuit breaker with the given limits.
*
* @param maxRestarts maximum number of restarts allowed within the window;
* must be positive
* @param windowSeconds width of the sliding time window in seconds;
* must be positive
*/
/* default */ PythonWorkerCircuitBreaker(int maxRestarts, int windowSeconds) {
this(maxRestarts, windowSeconds, Clock.systemUTC());
}
/**
* Creates a circuit breaker with a custom clock for testing.
*
* @param maxRestarts maximum restarts within the window
* @param windowSeconds sliding window width in seconds
* @param clock clock used to determine current time
*/
/* default */ PythonWorkerCircuitBreaker(int maxRestarts, int windowSeconds, Clock clock) {
if (maxRestarts <= 0) {
throw new IllegalArgumentException("maxRestarts must be positive, got: " + maxRestarts);
}
if (windowSeconds <= 0) {
throw new IllegalArgumentException("windowSeconds must be positive, got: " + windowSeconds);
}
this.maxRestarts = maxRestarts;
this.window = Duration.ofSeconds(windowSeconds);
this.clock = clock;
}
/**
* Returns {@code true} when the circuit is open (plugin must be disabled).
*
* @return {@code true} if the circuit has tripped
*/
/* default */ boolean isOpen() {
lock.lock();
try {
return open;
} finally {
lock.unlock();
}
}
/**
* Records a worker restart event and opens the circuit if the restart
* threshold has been exceeded within the sliding window.
*/
/* default */ void recordRestart() {
lock.lock();
try {
if (open) {
return;
}
Instant now = clock.instant();
Instant windowStart = now.minus(window);
while (!restartTimestamps.isEmpty()
&& restartTimestamps.peekFirst().isBefore(windowStart)) {
restartTimestamps.pollFirst();
}
restartTimestamps.addLast(now);
if (restartTimestamps.size() >= maxRestarts) {
open = true;
if (LOG.isLoggable(Level.WARNING)) {
LOG.warning("Python worker pool circuit breaker TRIPPED — "
+ restartTimestamps.size() + " restarts within the last "
+ window.toSeconds() + " s (limit=" + maxRestarts + "). "
+ "Python scanning is disabled for the remainder of this run. "
+ "Investigate worker logs for the root cause; "
+ "increase python.maxConsecutiveRestarts or "
+ "python.restartWindowSec to raise the threshold.");
}
}
} finally {
lock.unlock();
}
}
/**
* Returns the number of restart events currently recorded within the
* sliding window.
*
* @return restart count within the active window
*/
/* default */ int restartsInWindow() {
lock.lock();
try {
if (open) {
return restartTimestamps.size();
}
Instant now = clock.instant();
Instant windowStart = now.minus(window);
return (int) restartTimestamps.stream()
.filter(t -> !t.isBefore(windowStart))
.count();
} finally {
lock.unlock();
}
}
}