@@ -4,8 +4,11 @@ import com.margelo.nitro.NitroModules
44import com.margelo.nitro.core.Promise
55import com.margelo.nitro.core.ArrayBuffer
66import com.margelo.nitro.fs2.utils.Fs2Util
7+ import com.margelo.nitro.fs2.utils.BufferPool
8+ import com.margelo.nitro.fs2.utils.StreamError
79
810import java.io.File
11+ import java.io.IOException
912import java.io.InputStream
1013import java.io.OutputStream
1114import java.io.RandomAccessFile
@@ -34,7 +37,8 @@ class Fs2Stream() : HybridFs2StreamSpec() {
3437 val options : WriteStreamOptions ? ,
3538 var isActive : Boolean = false ,
3639 var position : Long = 0L ,
37- var job : Job ? = null
40+ var job : Job ? = null ,
41+ var hasError : Boolean = false
3842 )
3943
4044 // Stream handle maps
@@ -68,33 +72,64 @@ class Fs2Stream() : HybridFs2StreamSpec() {
6872 // Add reference to RNFSManager and context
6973 private val reactContext = NitroModules .applicationContext!!
7074
75+ // Add buffer pool instance
76+ private val bufferPool = BufferPool ()
77+
7178 // Helper to open InputStream for reading (file or content URI)
7279 private fun openInputStream (path : String , start : Long = 0L): InputStream {
7380 val uri = Fs2Util .getFileUri(path)
74- return if (" content" == uri.scheme) {
75- val input = reactContext.contentResolver.openInputStream(uri)
76- ? : throw Exception (" ENOENT: Could not open input stream for $path " )
77- if (start > 0 ) input.skip(start)
78- input
79- } else {
80- val raf = RandomAccessFile (Fs2Util .getOriginalFilepath(reactContext, path), " r" )
81- raf.seek(start)
82- object : InputStream () {
83- override fun read (): Int = raf.read()
84- override fun read (b : ByteArray , off : Int , len : Int ): Int = raf.read(b, off, len)
85- override fun close () = raf.close()
81+ try {
82+ if (" content" == uri.scheme) {
83+ val input = reactContext.contentResolver.openInputStream(uri)
84+ ? : throw StreamError .NotFound (path)
85+ if (start > 0 ) input.skip(start)
86+ return input
87+ } else {
88+ val filePath = Fs2Util .getOriginalFilepath(reactContext, path)
89+ val file = File (filePath)
90+ if (! file.canRead()) throw StreamError .AccessDenied (path)
91+ val raf = RandomAccessFile (filePath, " r" )
92+ raf.seek(start)
93+ return object : InputStream () {
94+ override fun read (): Int = raf.read()
95+ override fun read (b : ByteArray , off : Int , len : Int ): Int = raf.read(b, off, len)
96+ override fun close () = raf.close()
97+ }
8698 }
99+ } catch (e: SecurityException ) {
100+ throw StreamError .AccessDenied (path)
101+ } catch (e: IOException ) {
102+ throw StreamError .IOError (e.message ? : " I/O error" )
87103 }
88104 }
89105
90106 // Helper to open OutputStream for writing (file or content URI)
91107 private fun openOutputStream (path : String , append : Boolean ): OutputStream {
92108 val uri = Fs2Util .getFileUri(path)
93- return if (" content" == uri.scheme) {
94- reactContext.contentResolver.openOutputStream(uri, if (append) " wa" else " w" )
95- ? : throw Exception (" ENOENT: Could not open output stream for $path " )
96- } else {
97- FileOutputStream (Fs2Util .getOriginalFilepath(reactContext, path), append)
109+ try {
110+ if (" content" == uri.scheme) {
111+ val output = reactContext.contentResolver.openOutputStream(uri, if (append) " wa" else " w" )
112+ ? : throw StreamError .NotFound (path)
113+ return output
114+ } else {
115+ val filePath = Fs2Util .getOriginalFilepath(reactContext, path)
116+ val file = File (filePath)
117+
118+ if (file.exists()) {
119+ if (! file.canWrite()) throw StreamError .AccessDenied (path)
120+ } else {
121+ val parentDir = file.parentFile
122+ if (parentDir != null && ! parentDir.canWrite()) {
123+ throw StreamError .AccessDenied (path)
124+ }
125+ }
126+
127+ return FileOutputStream (filePath, append)
128+ }
129+ } catch (e: SecurityException ) {
130+ throw StreamError .AccessDenied (path)
131+ } catch (e: IOException ) {
132+ throw StreamError .IOError (e.message ? : " I/O error" )
98133 }
99134 }
100135
@@ -105,7 +140,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
105140 return Promise .async {
106141 val file = File (path)
107142 if (! file.exists() || ! file.isFile) {
108- throw Exception ( " ENOENT: File does not exist: $ path" )
143+ throw StreamError . NotFound ( path)
109144 }
110145 val streamId = UUID .randomUUID().toString()
111146 val state = ReadStreamState (file, options)
@@ -148,11 +183,34 @@ class Fs2Stream() : HybridFs2StreamSpec() {
148183 )
149184 }
150185 }
186+ } catch (e: SecurityException ) {
187+ impl.state.hasError = true
188+ writeStreamErrorListeners[streamId]?.invoke(
189+ WriteStreamErrorEvent (
190+ streamId = streamId,
191+ error = StreamError .AccessDenied (state.file.path).message ? : " Access denied" ,
192+ code = null
193+ )
194+ )
195+ } catch (e: IOException ) {
196+ impl.state.hasError = true
197+ writeStreamErrorListeners[streamId]?.invoke(
198+ WriteStreamErrorEvent (
199+ streamId = streamId,
200+ error = StreamError .IOError (e.message ? : " I/O error" ).message ? : " I/O error" ,
201+ code = null
202+ )
203+ )
151204 } catch (e: Exception ) {
205+ impl.state.hasError = true
206+ val error = when (e) {
207+ is StreamError -> e
208+ else -> StreamError .IOError (e.message ? : " Unknown error" )
209+ }
152210 writeStreamErrorListeners[streamId]?.invoke(
153211 WriteStreamErrorEvent (
154212 streamId = streamId,
155- error = e .message ? : " Unknown error" ,
213+ error = error .message ? : " Unknown error" ,
156214 code = null
157215 )
158216 )
@@ -172,12 +230,14 @@ class Fs2Stream() : HybridFs2StreamSpec() {
172230 override fun startReadStream (streamId : String ): Promise <Unit > {
173231 return Promise .async {
174232 val state =
175- readStreams[streamId] ? : throw Exception ( " ENOENT: No such read stream: $ streamId" )
233+ readStreams[streamId] ? : throw StreamError . InvalidStream ( streamId)
176234 if (state.isActive) return @async
177235 state.isActive = true
178- if (state.job == null ) {
236+
237+ // Only create new job if none exists or previous one is completed
238+ if (state.job == null || state.job?.isActive == false ) {
179239 state.job = streamScope.launch {
180- val bufferSize = state.options?.bufferSize ? : 8192
240+ val bufferSize = state.options?.bufferSize ? : BufferPool . DEFAULT_BUFFER_SIZE
181241 val start = state.options?.start ? : 0L
182242 val end = state.options?.end
183243 var position = start
@@ -187,23 +247,24 @@ class Fs2Stream() : HybridFs2StreamSpec() {
187247 try {
188248 state.position = position
189249 openInputStream(state.file.path, start).use { inputStream ->
190- val buffer = ByteArray (bufferSize.toInt())
191- readLoop@ while (true ) {
192- var shouldBreak = false
193- state.pauseMutex.withLock {
250+ var buffer = bufferPool.acquire(bufferSize.toInt())
251+ try {
252+ readLoop@ while (true ) {
253+ // Wait if paused - acquire lock briefly to check, then suspend if needed
254+ state.pauseMutex.withLock {
255+ // Just checking pause state, lock will be released after this block
256+ }
257+
258+ // Perform I/O without holding the lock
194259 val bytesToRead = if (end != null ) {
195260 val remaining = end - position + 1
196- if (remaining <= 0 ) {
197- shouldBreak = true
198- return @withLock
199- }
261+ if (remaining <= 0 ) break @readLoop
200262 minOf(bufferSize.toLong(), remaining).toInt()
201- } else bufferSize
202- val read = inputStream.read(buffer, 0 , bytesToRead.toInt())
203- if (read == - 1 ) {
204- shouldBreak = true
205- return @withLock
206- }
263+ } else bufferSize.toInt()
264+
265+ val read = inputStream.read(buffer, 0 , bytesToRead)
266+ if (read == - 1 ) break @readLoop
267+
207268 val data = buffer.copyOf(read)
208269
209270 readStreamDataListeners[streamId]?.invoke(
@@ -219,6 +280,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
219280 state.position = position
220281 bytesReadTotal + = read
221282 chunk++
283+
222284 readStreamProgressListeners[streamId]?.invoke(
223285 ReadStreamProgressEvent (
224286 streamId = streamId,
@@ -227,12 +289,11 @@ class Fs2Stream() : HybridFs2StreamSpec() {
227289 progress = bytesReadTotal.toDouble() / fileLength.toDouble()
228290 )
229291 )
230- if (end != null && position > end) {
231- shouldBreak = true
232- return @withLock
233- }
292+
293+ if (end != null && position > end) break @readLoop
234294 }
235- if (shouldBreak) break @readLoop
295+ } finally {
296+ bufferPool.release(buffer)
236297 }
237298 }
238299 readStreamEndListeners[streamId]?.invoke(
@@ -242,11 +303,33 @@ class Fs2Stream() : HybridFs2StreamSpec() {
242303 success = true
243304 )
244305 )
306+ } catch (e: SecurityException ) {
307+ val error = StreamError .AccessDenied (state.file.path)
308+ readStreamErrorListeners[streamId]?.invoke(
309+ ReadStreamErrorEvent (
310+ streamId = streamId,
311+ error = error.message ? : " Access denied" ,
312+ code = null
313+ )
314+ )
315+ } catch (e: IOException ) {
316+ val error = StreamError .IOError (e.message ? : " I/O error" )
317+ readStreamErrorListeners[streamId]?.invoke(
318+ ReadStreamErrorEvent (
319+ streamId = streamId,
320+ error = error.message ? : " I/O error" ,
321+ code = null
322+ )
323+ )
245324 } catch (e: Exception ) {
325+ val error = when (e) {
326+ is StreamError -> e
327+ else -> StreamError .IOError (e.message ? : " Unknown error" )
328+ }
246329 readStreamErrorListeners[streamId]?.invoke(
247330 ReadStreamErrorEvent (
248331 streamId = streamId,
249- error = e .message ? : " Unknown error" ,
332+ error = error .message ? : " Unknown error" ,
250333 code = null
251334 )
252335 )
@@ -269,7 +352,9 @@ class Fs2Stream() : HybridFs2StreamSpec() {
269352 val state =
270353 readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
271354 if (! state.isActive) return @async
272- if (! state.pauseMutex.isLocked) state.pauseMutex.lock()
355+
356+ // Use tryLock to avoid deadlock - locks mutex to pause stream
357+ state.pauseMutex.tryLock()
273358 state.isActive = false
274359 }
275360 }
@@ -279,7 +364,15 @@ class Fs2Stream() : HybridFs2StreamSpec() {
279364 val state =
280365 readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
281366 if (state.isActive) return @async
282- if (state.pauseMutex.isLocked) state.pauseMutex.unlock()
367+
368+ // Safely unlock mutex to resume stream - check if locked first
369+ if (state.pauseMutex.isLocked) {
370+ try {
371+ state.pauseMutex.unlock()
372+ } catch (e: IllegalStateException ) {
373+ // Mutex might have been unlocked by another coroutine, ignore
374+ }
375+ }
283376 state.isActive = true
284377 }
285378 }
@@ -308,17 +401,14 @@ class Fs2Stream() : HybridFs2StreamSpec() {
308401 override fun writeToStream (streamId : String , data : ArrayBuffer ): Promise <Unit > {
309402 val copiedBuffer: ArrayBuffer
310403 try {
311- // Create a copy of the ArrayBuffer to ensure we have ownership
312404 copiedBuffer = ArrayBuffer .copy(data)
313405 } catch (e: Exception ) {
314- // If copying fails, reject immediately
315- return Promise .rejected(Exception (" Failed to copy ArrayBuffer: ${e.message} " ))
406+ return Promise .rejected(StreamError .BufferError (" Failed to copy ArrayBuffer: ${e.message} " ))
316407 }
317408
318409 return Promise .async {
319- val impl =
320- writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
321- if (! impl.state.isActive) throw Exception (" EPIPE: Write stream is not active: $streamId " )
410+ val impl = writeStreams[streamId] ? : throw StreamError .InvalidStream (streamId)
411+ if (! impl.state.isActive) throw StreamError .StreamInactive (streamId)
322412 val bytes = copiedBuffer.getBuffer(true ).let { buf ->
323413 if (buf.hasArray()) {
324414 buf.array().copyOfRange(
@@ -330,7 +420,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
330420 }
331421 }
332422 impl.queue.add(WriteRequest (bytes))
333- impl.state.job?.let { if (! it.isActive) throw Exception ( " EPIPE: Write job is not active " ) }
423+ impl.state.job?.let { if (! it.isActive) throw StreamError . StreamInactive (streamId ) }
334424 }
335425 }
336426
@@ -344,17 +434,28 @@ class Fs2Stream() : HybridFs2StreamSpec() {
344434
345435 override fun closeWriteStream (streamId : String ): Promise <Unit > {
346436 return Promise .async {
347- val impl = writeStreams.remove(streamId)
348- ? : throw Exception (" ENOENT: No such write stream: $streamId " )
349- impl.state.job?.cancel()
350- impl.outputStream.close()
437+ val impl = writeStreams.remove(streamId) ? : throw StreamError .InvalidStream (streamId)
438+
439+ // Signal end to prevent new writes and wait for pending writes to complete
440+ impl.state.isActive = false
441+ impl.queue.add(WriteRequest (null , isEnd = true ))
442+
443+ // Wait for background job to finish processing
444+ impl.state.job?.join()
445+
446+ try {
447+ impl.outputStream.close()
448+ } catch (_: Exception ) {
449+ }
450+
351451 writeStreamFinishListeners[streamId]?.invoke(
352452 WriteStreamFinishEvent (
353453 streamId = streamId,
354454 bytesWritten = impl.state.position,
355- success = true
455+ success = ! impl.state.hasError
356456 )
357457 )
458+
358459 writeStreamProgressListeners.remove(streamId)
359460 writeStreamFinishListeners.remove(streamId)
360461 writeStreamErrorListeners.remove(streamId)
@@ -407,7 +508,7 @@ class Fs2Stream() : HybridFs2StreamSpec() {
407508 WriteStreamFinishEvent (
408509 streamId = streamId,
409510 bytesWritten = impl.state.position,
410- success = true
511+ success = ! impl.state.hasError
411512 )
412513 )
413514
0 commit comments