Skip to content

Commit b52f14a

Browse files
committed
feat: file stream improvements
1 parent 3cf595c commit b52f14a

File tree

6 files changed

+597
-99
lines changed

6 files changed

+597
-99
lines changed

android/src/main/java/com/margelo/nitro/fs2/Fs2Stream.kt

Lines changed: 159 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import com.margelo.nitro.NitroModules
44
import com.margelo.nitro.core.Promise
55
import com.margelo.nitro.core.ArrayBuffer
66
import com.margelo.nitro.fs2.utils.Fs2Util
7+
import com.margelo.nitro.fs2.utils.BufferPool
8+
import com.margelo.nitro.fs2.utils.StreamError
79

810
import java.io.File
11+
import java.io.IOException
912
import java.io.InputStream
1013
import java.io.OutputStream
1114
import java.io.RandomAccessFile
@@ -34,7 +37,8 @@ class Fs2Stream() : HybridFs2StreamSpec() {
3437
val options: WriteStreamOptions?,
3538
var isActive: Boolean = false,
3639
var position: Long = 0L,
37-
var job: Job? = null
40+
var job: Job? = null,
41+
var hasError: Boolean = false
3842
)
3943

4044
// Stream handle maps
@@ -68,33 +72,64 @@ class Fs2Stream() : HybridFs2StreamSpec() {
6872
// Add reference to RNFSManager and context
6973
private val reactContext = NitroModules.applicationContext!!
7074

75+
// Add buffer pool instance
76+
private val bufferPool = BufferPool()
77+
7178
// Helper to open InputStream for reading (file or content URI)
7279
private fun openInputStream(path: String, start: Long = 0L): InputStream {
7380
val uri = Fs2Util.getFileUri(path)
74-
return if ("content" == uri.scheme) {
75-
val input = reactContext.contentResolver.openInputStream(uri)
76-
?: throw Exception("ENOENT: Could not open input stream for $path")
77-
if (start > 0) input.skip(start)
78-
input
79-
} else {
80-
val raf = RandomAccessFile(Fs2Util.getOriginalFilepath(reactContext, path), "r")
81-
raf.seek(start)
82-
object : InputStream() {
83-
override fun read(): Int = raf.read()
84-
override fun read(b: ByteArray, off: Int, len: Int): Int = raf.read(b, off, len)
85-
override fun close() = raf.close()
81+
try {
82+
if ("content" == uri.scheme) {
83+
val input = reactContext.contentResolver.openInputStream(uri)
84+
?: throw StreamError.NotFound(path)
85+
if (start > 0) input.skip(start)
86+
return input
87+
} else {
88+
val filePath = Fs2Util.getOriginalFilepath(reactContext, path)
89+
val file = File(filePath)
90+
if (!file.canRead()) throw StreamError.AccessDenied(path)
91+
val raf = RandomAccessFile(filePath, "r")
92+
raf.seek(start)
93+
return object : InputStream() {
94+
override fun read(): Int = raf.read()
95+
override fun read(b: ByteArray, off: Int, len: Int): Int = raf.read(b, off, len)
96+
override fun close() = raf.close()
97+
}
8698
}
99+
} catch (e: SecurityException) {
100+
throw StreamError.AccessDenied(path)
101+
} catch (e: IOException) {
102+
throw StreamError.IOError(e.message ?: "I/O error")
87103
}
88104
}
89105

90106
// Helper to open OutputStream for writing (file or content URI)
91107
private fun openOutputStream(path: String, append: Boolean): OutputStream {
92108
val uri = Fs2Util.getFileUri(path)
93-
return if ("content" == uri.scheme) {
94-
reactContext.contentResolver.openOutputStream(uri, if (append) "wa" else "w")
95-
?: throw Exception("ENOENT: Could not open output stream for $path")
96-
} else {
97-
FileOutputStream(Fs2Util.getOriginalFilepath(reactContext, path), append)
109+
try {
110+
if ("content" == uri.scheme) {
111+
val output = reactContext.contentResolver.openOutputStream(uri, if (append) "wa" else "w")
112+
?: throw StreamError.NotFound(path)
113+
return output
114+
} else {
115+
val filePath = Fs2Util.getOriginalFilepath(reactContext, path)
116+
val file = File(filePath)
117+
118+
if (file.exists()) {
119+
if (!file.canWrite()) throw StreamError.AccessDenied(path)
120+
} else {
121+
val parentDir = file.parentFile
122+
if (parentDir != null && !parentDir.canWrite()) {
123+
throw StreamError.AccessDenied(path)
124+
}
125+
}
126+
127+
return FileOutputStream(filePath, append)
128+
}
129+
} catch (e: SecurityException) {
130+
throw StreamError.AccessDenied(path)
131+
} catch (e: IOException) {
132+
throw StreamError.IOError(e.message ?: "I/O error")
98133
}
99134
}
100135

@@ -105,7 +140,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
105140
return Promise.async {
106141
val file = File(path)
107142
if (!file.exists() || !file.isFile) {
108-
throw Exception("ENOENT: File does not exist: $path")
143+
throw StreamError.NotFound(path)
109144
}
110145
val streamId = UUID.randomUUID().toString()
111146
val state = ReadStreamState(file, options)
@@ -148,11 +183,34 @@ class Fs2Stream() : HybridFs2StreamSpec() {
148183
)
149184
}
150185
}
186+
} catch (e: SecurityException) {
187+
impl.state.hasError = true
188+
writeStreamErrorListeners[streamId]?.invoke(
189+
WriteStreamErrorEvent(
190+
streamId = streamId,
191+
error = StreamError.AccessDenied(state.file.path).message ?: "Access denied",
192+
code = null
193+
)
194+
)
195+
} catch (e: IOException) {
196+
impl.state.hasError = true
197+
writeStreamErrorListeners[streamId]?.invoke(
198+
WriteStreamErrorEvent(
199+
streamId = streamId,
200+
error = StreamError.IOError(e.message ?: "I/O error").message ?: "I/O error",
201+
code = null
202+
)
203+
)
151204
} catch (e: Exception) {
205+
impl.state.hasError = true
206+
val error = when (e) {
207+
is StreamError -> e
208+
else -> StreamError.IOError(e.message ?: "Unknown error")
209+
}
152210
writeStreamErrorListeners[streamId]?.invoke(
153211
WriteStreamErrorEvent(
154212
streamId = streamId,
155-
error = e.message ?: "Unknown error",
213+
error = error.message ?: "Unknown error",
156214
code = null
157215
)
158216
)
@@ -172,12 +230,14 @@ class Fs2Stream() : HybridFs2StreamSpec() {
172230
override fun startReadStream(streamId: String): Promise<Unit> {
173231
return Promise.async {
174232
val state =
175-
readStreams[streamId] ?: throw Exception("ENOENT: No such read stream: $streamId")
233+
readStreams[streamId] ?: throw StreamError.InvalidStream(streamId)
176234
if (state.isActive) return@async
177235
state.isActive = true
178-
if (state.job == null) {
236+
237+
// Only create new job if none exists or previous one is completed
238+
if (state.job == null || state.job?.isActive == false) {
179239
state.job = streamScope.launch {
180-
val bufferSize = state.options?.bufferSize ?: 8192
240+
val bufferSize = state.options?.bufferSize ?: BufferPool.DEFAULT_BUFFER_SIZE
181241
val start = state.options?.start ?: 0L
182242
val end = state.options?.end
183243
var position = start
@@ -187,23 +247,24 @@ class Fs2Stream() : HybridFs2StreamSpec() {
187247
try {
188248
state.position = position
189249
openInputStream(state.file.path, start).use { inputStream ->
190-
val buffer = ByteArray(bufferSize.toInt())
191-
readLoop@ while (true) {
192-
var shouldBreak = false
193-
state.pauseMutex.withLock {
250+
var buffer = bufferPool.acquire(bufferSize.toInt())
251+
try {
252+
readLoop@ while (true) {
253+
// Wait if paused - acquire lock briefly to check, then suspend if needed
254+
state.pauseMutex.withLock {
255+
// Just checking pause state, lock will be released after this block
256+
}
257+
258+
// Perform I/O without holding the lock
194259
val bytesToRead = if (end != null) {
195260
val remaining = end - position + 1
196-
if (remaining <= 0) {
197-
shouldBreak = true
198-
return@withLock
199-
}
261+
if (remaining <= 0) break@readLoop
200262
minOf(bufferSize.toLong(), remaining).toInt()
201-
} else bufferSize
202-
val read = inputStream.read(buffer, 0, bytesToRead.toInt())
203-
if (read == -1) {
204-
shouldBreak = true
205-
return@withLock
206-
}
263+
} else bufferSize.toInt()
264+
265+
val read = inputStream.read(buffer, 0, bytesToRead)
266+
if (read == -1) break@readLoop
267+
207268
val data = buffer.copyOf(read)
208269

209270
readStreamDataListeners[streamId]?.invoke(
@@ -219,6 +280,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
219280
state.position = position
220281
bytesReadTotal += read
221282
chunk++
283+
222284
readStreamProgressListeners[streamId]?.invoke(
223285
ReadStreamProgressEvent(
224286
streamId = streamId,
@@ -227,12 +289,11 @@ class Fs2Stream() : HybridFs2StreamSpec() {
227289
progress = bytesReadTotal.toDouble() / fileLength.toDouble()
228290
)
229291
)
230-
if (end != null && position > end) {
231-
shouldBreak = true
232-
return@withLock
233-
}
292+
293+
if (end != null && position > end) break@readLoop
234294
}
235-
if (shouldBreak) break@readLoop
295+
} finally {
296+
bufferPool.release(buffer)
236297
}
237298
}
238299
readStreamEndListeners[streamId]?.invoke(
@@ -242,11 +303,33 @@ class Fs2Stream() : HybridFs2StreamSpec() {
242303
success = true
243304
)
244305
)
306+
} catch (e: SecurityException) {
307+
val error = StreamError.AccessDenied(state.file.path)
308+
readStreamErrorListeners[streamId]?.invoke(
309+
ReadStreamErrorEvent(
310+
streamId = streamId,
311+
error = error.message ?: "Access denied",
312+
code = null
313+
)
314+
)
315+
} catch (e: IOException) {
316+
val error = StreamError.IOError(e.message ?: "I/O error")
317+
readStreamErrorListeners[streamId]?.invoke(
318+
ReadStreamErrorEvent(
319+
streamId = streamId,
320+
error = error.message ?: "I/O error",
321+
code = null
322+
)
323+
)
245324
} catch (e: Exception) {
325+
val error = when (e) {
326+
is StreamError -> e
327+
else -> StreamError.IOError(e.message ?: "Unknown error")
328+
}
246329
readStreamErrorListeners[streamId]?.invoke(
247330
ReadStreamErrorEvent(
248331
streamId = streamId,
249-
error = e.message ?: "Unknown error",
332+
error = error.message ?: "Unknown error",
250333
code = null
251334
)
252335
)
@@ -269,7 +352,9 @@ class Fs2Stream() : HybridFs2StreamSpec() {
269352
val state =
270353
readStreams[streamId] ?: throw Exception("ENOENT: No such read stream: $streamId")
271354
if (!state.isActive) return@async
272-
if (!state.pauseMutex.isLocked) state.pauseMutex.lock()
355+
356+
// Use tryLock to avoid deadlock - locks mutex to pause stream
357+
state.pauseMutex.tryLock()
273358
state.isActive = false
274359
}
275360
}
@@ -279,7 +364,15 @@ class Fs2Stream() : HybridFs2StreamSpec() {
279364
val state =
280365
readStreams[streamId] ?: throw Exception("ENOENT: No such read stream: $streamId")
281366
if (state.isActive) return@async
282-
if (state.pauseMutex.isLocked) state.pauseMutex.unlock()
367+
368+
// Safely unlock mutex to resume stream - check if locked first
369+
if (state.pauseMutex.isLocked) {
370+
try {
371+
state.pauseMutex.unlock()
372+
} catch (e: IllegalStateException) {
373+
// Mutex might have been unlocked by another coroutine, ignore
374+
}
375+
}
283376
state.isActive = true
284377
}
285378
}
@@ -308,17 +401,14 @@ class Fs2Stream() : HybridFs2StreamSpec() {
308401
override fun writeToStream(streamId: String, data: ArrayBuffer): Promise<Unit> {
309402
val copiedBuffer: ArrayBuffer
310403
try {
311-
// Create a copy of the ArrayBuffer to ensure we have ownership
312404
copiedBuffer = ArrayBuffer.copy(data)
313405
} catch (e: Exception) {
314-
// If copying fails, reject immediately
315-
return Promise.rejected(Exception("Failed to copy ArrayBuffer: ${e.message}"))
406+
return Promise.rejected(StreamError.BufferError("Failed to copy ArrayBuffer: ${e.message}"))
316407
}
317408

318409
return Promise.async {
319-
val impl =
320-
writeStreams[streamId] ?: throw Exception("ENOENT: No such write stream: $streamId")
321-
if (!impl.state.isActive) throw Exception("EPIPE: Write stream is not active: $streamId")
410+
val impl = writeStreams[streamId] ?: throw StreamError.InvalidStream(streamId)
411+
if (!impl.state.isActive) throw StreamError.StreamInactive(streamId)
322412
val bytes = copiedBuffer.getBuffer(true).let { buf ->
323413
if (buf.hasArray()) {
324414
buf.array().copyOfRange(
@@ -330,7 +420,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
330420
}
331421
}
332422
impl.queue.add(WriteRequest(bytes))
333-
impl.state.job?.let { if (!it.isActive) throw Exception("EPIPE: Write job is not active") }
423+
impl.state.job?.let { if (!it.isActive) throw StreamError.StreamInactive(streamId) }
334424
}
335425
}
336426

@@ -344,17 +434,28 @@ class Fs2Stream() : HybridFs2StreamSpec() {
344434

345435
override fun closeWriteStream(streamId: String): Promise<Unit> {
346436
return Promise.async {
347-
val impl = writeStreams.remove(streamId)
348-
?: throw Exception("ENOENT: No such write stream: $streamId")
349-
impl.state.job?.cancel()
350-
impl.outputStream.close()
437+
val impl = writeStreams.remove(streamId) ?: throw StreamError.InvalidStream(streamId)
438+
439+
// Signal end to prevent new writes and wait for pending writes to complete
440+
impl.state.isActive = false
441+
impl.queue.add(WriteRequest(null, isEnd = true))
442+
443+
// Wait for background job to finish processing
444+
impl.state.job?.join()
445+
446+
try {
447+
impl.outputStream.close()
448+
} catch (_: Exception) {
449+
}
450+
351451
writeStreamFinishListeners[streamId]?.invoke(
352452
WriteStreamFinishEvent(
353453
streamId = streamId,
354454
bytesWritten = impl.state.position,
355-
success = true
455+
success = !impl.state.hasError
356456
)
357457
)
458+
358459
writeStreamProgressListeners.remove(streamId)
359460
writeStreamFinishListeners.remove(streamId)
360461
writeStreamErrorListeners.remove(streamId)
@@ -407,7 +508,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
407508
WriteStreamFinishEvent(
408509
streamId = streamId,
409510
bytesWritten = impl.state.position,
410-
success = true
511+
success = !impl.state.hasError
411512
)
412513
)
413514

0 commit comments

Comments
 (0)