From 50321f107346b4ada00da0dbda052cd636fb1eea Mon Sep 17 00:00:00 2001 From: Dominik Kowalczyk Date: Tue, 24 Mar 2026 22:01:44 +0100 Subject: [PATCH] runner: modernize agent I/O with CompletableFuture and BufferedReader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace busy-wait polling loop in RefereeAgent.getOutput() with CompletableFuture + orTimeout for precise, non-blocking timeouts - Use a dedicated single-thread ExecutorService (AGENT_IO_EXECUTOR) for blocking I/O to avoid ForkJoinPool starvation - Switch to BufferedReader (readLine) instead of raw InputStream reads - Remove unused processStdout InputStream field and abstract getOutputStream() method — only the BufferedReader is needed now - Deduplicate timeout logic: RefereeAgent now delegates to Agent.getOutput() instead of reimplementing it - Clean up unused imports (InputStreamReader, TimeoutException) --- .../codingame/gameengine/runner/Agent.java | 108 ++++++++++-------- .../runner/CommandLinePlayerAgent.java | 26 ++--- .../gameengine/runner/JavaPlayerAgent.java | 14 ++- .../gameengine/runner/RefereeAgent.java | 69 ++--------- 4 files changed, 92 insertions(+), 125 deletions(-) diff --git a/runner/src/main/java/com/codingame/gameengine/runner/Agent.java b/runner/src/main/java/com/codingame/gameengine/runner/Agent.java index 23359a89..23078498 100644 --- a/runner/src/main/java/com/codingame/gameengine/runner/Agent.java +++ b/runner/src/main/java/com/codingame/gameengine/runner/Agent.java @@ -1,10 +1,16 @@ package com.codingame.gameengine.runner; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -15,34 +21,45 @@ abstract class Agent { public static final int AGENT_MAX_BUFFER_SIZE = 10_000; public static final int THRESHOLD_LIMIT_STDERR_SIZE = 4096 * 50; + /** + * Single dedicated thread for all blocking agent I/O — sequential game loop + * never needs more than one. + */ + private static final ExecutorService AGENT_IO_EXECUTOR = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "agent-reader"); + t.setDaemon(true); + return t; + }); + private static Log log = LogFactory.getLog(Agent.class); private OutputStream processStdin; - private InputStream processStdout; + private BufferedReader processStdoutReader; private InputStream processStderr; private int totalStderrBytesSent = 0; private int agentId; - private boolean lastAgentByteIsCarriageReturn = false; private boolean failed = false; private String nickname; private String avatar; public long lastExecutionTimeMs; + public Agent() { } abstract protected OutputStream getInputStream(); - abstract protected InputStream getOutputStream(); + abstract protected BufferedReader getOutputReader(); abstract protected InputStream getErrorStream(); /** - * Initialize an agent given global properties. A call to this function is needed before-all + * Initialize an agent given global properties. A call to this function is + * needed before-all * * @param conf - * Global configuration + * Global configuration */ public void initialize(Properties conf) { this.lastExecutionTimeMs = 0; @@ -54,7 +71,7 @@ public void initialize(Properties conf) { public void execute() { try { this.processStdin = getInputStream(); - this.processStdout = getOutputStream(); + this.processStdoutReader = getOutputReader(); this.processStderr = getErrorStream(); runInputOutput(); } catch (Exception e) { @@ -70,7 +87,7 @@ public void destroy() { * Launch the agent. After the call, agent is ready to process input / output * * @throws Exception - * if an error occurs + * if an error occurs */ protected abstract void runInputOutput() throws Exception; @@ -78,7 +95,7 @@ public void destroy() { * Write 'input' to standard input of agent * * @param input - * an input to write + * an input to write */ public void sendInput(String input) { if (processStdin != null) { @@ -98,57 +115,56 @@ public void sendInput(String input) { * Get the output of an agent * * @param nbLine - * Number of lines wanted + * Number of lines wanted * @param timeout - * Stop reading after timeout milliseconds - * @return the agent output + * Stop reading after timeout milliseconds + * @return the agent output, or null if timed out or reader closed */ public String getOutput(int nbLine, long timeout) { - if (processStdout == null) { + return getOutput(nbLine, timeout, AGENT_MAX_BUFFER_SIZE); + } + + /** + * Read at most maxOutputSize bytes across nbLine lines. + * + * @param nbLine + * Number of lines wanted + * @param timeout + * Stop reading after timeout milliseconds + * @param maxOutputSize + * Maximum number of bytes to read + * @return the agent output, or null if timed out or reader closed + */ + protected final String getOutput(int nbLine, long timeout, int maxOutputSize) { + if (processStdoutReader == null) { return null; } - try { - byte[] tmp = new byte[AGENT_MAX_BUFFER_SIZE]; - int offset = 0; - int nbOccurences = 0; - - long t0 = System.nanoTime(); - - while ((offset < AGENT_MAX_BUFFER_SIZE) && (nbOccurences < nbLine)) { - long current = System.nanoTime(); - if ((current - t0) > (timeout * 1_000_000l)) { - break; - } - - if (processStdout.available() > 0) { - int nbRead = processStdout.read(tmp, offset, 1); - if (nbRead < 0) { - // Should not happen, just in case... + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < nbLine; i++) { + String line = processStdoutReader.readLine(); + if (line == null) { break; } - byte curByte = tmp[offset]; - if (!((curByte == '\n') && lastAgentByteIsCarriageReturn)) { - offset += nbRead; - if ((curByte == '\n') || (curByte == '\r')) { - ++nbOccurences; - } - } - lastAgentByteIsCarriageReturn = curByte == '\r'; - } else { - if ((offset < AGENT_MAX_BUFFER_SIZE) && (nbOccurences < nbLine)) { - Thread.sleep(1); + sb.append(line).append('\n'); + if (sb.length() >= maxOutputSize) { + break; } } + return sb.toString(); + } catch (IOException e) { + return null; } + }, AGENT_IO_EXECUTOR); - return new String(tmp, 0, offset, UTF8); - } catch (IOException e1) { - processStdout = null; - } catch (InterruptedException e) { - // wtf + try { + return future.orTimeout(timeout, TimeUnit.MILLISECONDS).join(); + } catch (CompletionException e) { + future.cancel(true); + return null; } - return null; } /** diff --git a/runner/src/main/java/com/codingame/gameengine/runner/CommandLinePlayerAgent.java b/runner/src/main/java/com/codingame/gameengine/runner/CommandLinePlayerAgent.java index d6b9b745..475f6a78 100644 --- a/runner/src/main/java/com/codingame/gameengine/runner/CommandLinePlayerAgent.java +++ b/runner/src/main/java/com/codingame/gameengine/runner/CommandLinePlayerAgent.java @@ -1,17 +1,19 @@ package com.codingame.gameengine.runner; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Properties; /** - * This class is used in with the GameRunner to add an AI as a player. + * This class is used in with the GameRunner to add an AI as a + * player. */ public class CommandLinePlayerAgent extends Agent { private OutputStream processStdin; - private InputStream processStdout; + private BufferedReader processInputReader; private InputStream processStderr; private String[] commandArray; private Process process; @@ -20,18 +22,18 @@ public class CommandLinePlayerAgent extends Agent { * Creates an Agent for your game, will run the given commandLine at game start * * @param commandLine - * the command line to run + * the command line to run */ public CommandLinePlayerAgent(String commandLine) { super(); this.commandArray = commandLine.split(" "); } - + /** * Creates an Agent for your game, will run the given commandLine at game start * * @param commandArray - * the command array to run + * the command array to run */ public CommandLinePlayerAgent(String[] commandArray) { super(); @@ -44,8 +46,8 @@ protected OutputStream getInputStream() { } @Override - protected InputStream getOutputStream() { - return processStdout; + protected BufferedReader getOutputReader() { + return processInputReader; } @Override @@ -61,21 +63,15 @@ public void initialize(Properties conf) { throw new RuntimeException("Failed to launch " + String.join(" ", commandArray), e); } processStdin = process.getOutputStream(); - processStdout = process.getInputStream(); + processInputReader = process.inputReader(); processStderr = process.getErrorStream(); } - @Override - public String getOutput(int nbLine, long timeout) { - String output = super.getOutput(nbLine, timeout); - return output; - } - /** * Launch the agent. After the call, agent is ready to process input / output * * @throws Exception - * if an error occurs + * if an error occurs */ @Override diff --git a/runner/src/main/java/com/codingame/gameengine/runner/JavaPlayerAgent.java b/runner/src/main/java/com/codingame/gameengine/runner/JavaPlayerAgent.java index 997b6385..e60a02f9 100644 --- a/runner/src/main/java/com/codingame/gameengine/runner/JavaPlayerAgent.java +++ b/runner/src/main/java/com/codingame/gameengine/runner/JavaPlayerAgent.java @@ -1,7 +1,9 @@ package com.codingame.gameengine.runner; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; @@ -34,11 +36,12 @@ public class JavaPlayerAgent extends Agent { private OutputStream processStdin; private InputStream processStdout; + private BufferedReader processStdoutReader; private InputStream processStderr; /** * @param className - * The name of the class to use as a participating AI. + * The name of the class to use as a participating AI. */ public JavaPlayerAgent(String className) { super(); @@ -48,6 +51,7 @@ public JavaPlayerAgent(String className) { try { processStdin = new PipedOutputStream(agentStdin); processStdout = new PipedInputStream(agentStdout, 100_000); + processStdoutReader = new BufferedReader(new InputStreamReader(processStdout, UTF8)); processStderr = new PipedInputStream(agentStderr, 100_000); } catch (IOException e) { throw new RuntimeException("Cannot initialize Player Agent", e); @@ -60,8 +64,8 @@ protected OutputStream getInputStream() { } @Override - protected InputStream getOutputStream() { - return processStdout; + protected BufferedReader getOutputReader() { + return processStdoutReader; } @Override @@ -77,7 +81,7 @@ public void initialize(Properties conf) { * Launch the agent. After the call, agent is ready to process input / output * * @throws Exception - * if an error occurs + * if an error occurs */ @Override protected void runInputOutput() throws Exception { @@ -99,7 +103,7 @@ public void destroy() { if (javaRunnerThread.isAlive()) { // TODO javaRunnerThread.interrupt(); -// javaRunnerThread.destroy(); + // javaRunnerThread.destroy(); javaRunnerThread.stop(); } } diff --git a/runner/src/main/java/com/codingame/gameengine/runner/RefereeAgent.java b/runner/src/main/java/com/codingame/gameengine/runner/RefereeAgent.java index 307a62c7..2fe298c4 100644 --- a/runner/src/main/java/com/codingame/gameengine/runner/RefereeAgent.java +++ b/runner/src/main/java/com/codingame/gameengine/runner/RefereeAgent.java @@ -1,7 +1,9 @@ package com.codingame.gameengine.runner; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; @@ -13,7 +15,6 @@ class RefereeAgent extends Agent { public static final int REFEREE_MAX_BUFFER_SIZE_EXTRA = 100_000; public static final int REFEREE_MAX_BUFFER_SIZE = 30_000; - private boolean lastRefereeByteIsCarriageReturn = false; private PipedInputStream agentStdin = new PipedInputStream(100_000); private PipedOutputStream agentStdout = new PipedOutputStream(); @@ -21,8 +22,9 @@ class RefereeAgent extends Agent { private OutputStream processStdin = null; private InputStream processStdout = null; + private BufferedReader processStdoutReader; private InputStream processStderr = null; - + private Thread thread; public RefereeAgent() { @@ -31,12 +33,13 @@ public RefereeAgent() { try { processStdin = new PipedOutputStream(agentStdin); processStdout = new PipedInputStream(agentStdout, 100_000); + processStdoutReader = new BufferedReader(new InputStreamReader(processStdout, UTF8)); processStderr = new PipedInputStream(agentStderr, 100_000); } catch (IOException e) { throw new RuntimeException("Cannot initialize Referee Agent"); } } - + @Override public void destroy() { if (thread != null) { @@ -44,15 +47,14 @@ public void destroy() { } } - @Override protected OutputStream getInputStream() { return processStdin; } @Override - protected InputStream getOutputStream() { - return processStdout; + protected BufferedReader getOutputReader() { + return processStdoutReader; } @Override @@ -78,58 +80,7 @@ public String getOutput(int nbLine, long timeout) { @Override public String getOutput(int nbLine, long timeout, boolean extraBufferSpace) { - if (processStdout == null) { - return null; - } - int maxBufferSize = extraBufferSpace ? REFEREE_MAX_BUFFER_SIZE_EXTRA : REFEREE_MAX_BUFFER_SIZE; - try { - byte[] tmp = new byte[maxBufferSize]; - int offset = 0; - int nbOccurences = 0; - - long t0 = System.nanoTime(); - - while ((offset < maxBufferSize) && (nbOccurences < nbLine)) { - long current = System.nanoTime(); - if ((current - t0) > (timeout * 1_000_000L)) { - break; - } - - while ((offset < maxBufferSize) && (processStdout.available() > 0) - && (nbOccurences < nbLine)) { - current = System.nanoTime(); - if ((current - t0) > (timeout * 1_000_000L)) { - break; - } - - int nbRead = processStdout.read(tmp, offset, 1); - if (nbRead <= 0) { - break; - } - byte curByte = tmp[offset]; - if (!((curByte == '\n') && lastRefereeByteIsCarriageReturn)) { - offset += nbRead; - if ((curByte == '\n') || (curByte == '\r')) { - ++nbOccurences; - } - } - lastRefereeByteIsCarriageReturn = curByte == '\r'; - } - - if (!((offset < REFEREE_MAX_BUFFER_SIZE) && (nbOccurences < nbLine))) { - break; - } - - try { - Thread.sleep(1); - } catch (Exception e) { - } - } - return new String(tmp, 0, offset, UTF8); - } catch (IOException e) { - e.printStackTrace(); - processStdout = null; - } - return null; + int limit = extraBufferSpace ? REFEREE_MAX_BUFFER_SIZE_EXTRA : REFEREE_MAX_BUFFER_SIZE; + return getOutput(nbLine, timeout, limit); } } \ No newline at end of file