@@ -4,12 +4,14 @@ import com.beust.klaxon.JsonObject
4
4
import jupyter.kotlin.DisplayResult
5
5
import jupyter.kotlin.MimeTypedResult
6
6
import jupyter.kotlin.textResult
7
+ import org.jetbrains.annotations.TestOnly
7
8
import org.jetbrains.kotlin.config.KotlinCompilerVersion
8
9
import java.io.ByteArrayOutputStream
9
10
import java.io.OutputStream
10
11
import java.io.PrintStream
11
12
import java.lang.reflect.InvocationTargetException
12
13
import java.util.concurrent.atomic.AtomicLong
14
+ import kotlin.concurrent.timer
13
15
14
16
enum class ResponseState {
15
17
Ok , Error
@@ -177,47 +179,70 @@ class CapturingOutputStream(private val stdout: PrintStream,
177
179
private val conf : OutputConfig ,
178
180
private val captureOutput : Boolean ,
179
181
val onCaptured : (String ) -> Unit ) : OutputStream() {
180
- val capturedOutput = ByteArrayOutputStream ()
181
- private var time = System .currentTimeMillis ()
182
+ private val capturedLines = ByteArrayOutputStream ()
183
+ private val capturedNewLine = ByteArrayOutputStream ()
182
184
private var overallOutputSize = 0
183
185
private var newlineFound = false
184
186
185
- private fun shouldSend (b : Int ): Boolean {
187
+ private val timer = timer(
188
+ initialDelay = conf.captureBufferTimeLimitMs,
189
+ period = conf.captureBufferTimeLimitMs,
190
+ action = {
191
+ flush()
192
+ })
193
+
194
+ val contents: ByteArray
195
+ @TestOnly
196
+ get() = capturedLines.toByteArray() + capturedNewLine.toByteArray()
197
+
198
+ private fun sendIfNeeded (b : Int ) {
186
199
val c = b.toChar()
187
- newlineFound = newlineFound || c == ' \n ' || c == ' \r '
188
- if (newlineFound && capturedOutput.size() >= conf.captureNewlineBufferSize)
189
- return true
190
- if (capturedOutput.size() >= conf.captureBufferMaxSize)
191
- return true
192
-
193
- val currentTime = System .currentTimeMillis()
194
- if (currentTime - time >= conf.captureBufferTimeLimitMs) {
195
- time = currentTime
196
- return true
200
+ if (c == ' \n ' || c == ' \r ' ) {
201
+ newlineFound = true
202
+ capturedNewLine.writeTo(capturedLines)
203
+ capturedNewLine.reset()
197
204
}
198
- return false
205
+
206
+ val size = capturedLines.size() + capturedNewLine.size()
207
+
208
+ if (newlineFound && size >= conf.captureNewlineBufferSize)
209
+ return flushBuffers(capturedLines)
210
+ if (size >= conf.captureBufferMaxSize)
211
+ return flush()
199
212
}
200
213
201
214
override fun write (b : Int ) {
202
215
++ overallOutputSize
203
216
stdout.write(b)
204
217
205
218
if (captureOutput && overallOutputSize <= conf.cellOutputMaxSize) {
206
- capturedOutput.write(b)
207
- if (shouldSend(b)) {
208
- flush()
209
- }
219
+ capturedNewLine.write(b)
220
+ sendIfNeeded(b)
210
221
}
211
222
}
212
223
213
- override fun flush () {
224
+ private fun resetBuffer (buffer : ByteArrayOutputStream ): String {
225
+ val str = buffer.toString(" UTF-8" )
226
+ buffer.reset()
227
+ return str
228
+ }
229
+
230
+ private fun flushBuffers (vararg buffers : ByteArrayOutputStream ) {
214
231
newlineFound = false
215
- if (capturedOutput.size() > 0 ) {
216
- val str = capturedOutput.toString(" UTF-8" )
217
- capturedOutput.reset()
232
+ val str = buffers.map(this ::resetBuffer).reduce { acc, s -> acc + s }
233
+ if (str.isNotEmpty()) {
218
234
onCaptured(str)
219
235
}
220
236
}
237
+
238
+ override fun flush () {
239
+ flushBuffers(capturedLines, capturedNewLine)
240
+ }
241
+
242
+ override fun close () {
243
+ super .close()
244
+ timer.cancel()
245
+ }
221
246
}
222
247
223
248
fun Any.toMimeTypedResult (): MimeTypedResult ? = when (this ) {
@@ -301,7 +326,6 @@ fun JupyterConnection.evalWithIO(maybeConfig: OutputConfig?, body: () -> EvalRes
301
326
*/
302
327
val stdErr = StringBuilder ()
303
328
with (stdErr) {
304
- forkedError.capturedOutput.toString(" UTF-8" )?.nullWhenEmpty()?.also { appendln(it) }
305
329
val cause = ex.errorResult.cause
306
330
if (cause == null ) appendln(ex.errorResult.message)
307
331
else {
@@ -318,10 +342,10 @@ fun JupyterConnection.evalWithIO(maybeConfig: OutputConfig?, body: () -> EvalRes
318
342
ResponseWithMessage (ResponseState .Error , textResult(" Error!" ), emptyList(), null , stdErr.toString())
319
343
}
320
344
} finally {
345
+ forkedOut.close()
346
+ forkedError.close()
321
347
System .setIn(`in `)
322
348
System .setErr(err)
323
349
System .setOut(out )
324
350
}
325
351
}
326
-
327
- fun String.nullWhenEmpty (): String? = if (this .isBlank()) null else this
0 commit comments