@@ -18,7 +18,7 @@ import java.util.UUID
1818import kotlinx.coroutines.*
1919import kotlinx.coroutines.sync.withLock
2020
21- class Fs2Stream (): HybridFs2StreamSpec() {
21+ class Fs2Stream () : HybridFs2StreamSpec() {
2222 // Stream state data classes
2323 private data class ReadStreamState (
2424 val file : File ,
@@ -28,6 +28,7 @@ class Fs2Stream(): HybridFs2StreamSpec() {
2828 var job : Job ? = null ,
2929 val pauseMutex : kotlinx.coroutines.sync.Mutex = kotlinx.coroutines.sync.Mutex (locked = false) // Unlocked = active, Locked = paused
3030 )
31+
3132 private data class WriteStreamState (
3233 val file : File ,
3334 val options : WriteStreamOptions ? ,
@@ -45,15 +46,24 @@ class Fs2Stream(): HybridFs2StreamSpec() {
4546
4647 // Event listener maps (for demonstration, not yet emitting events)
4748 private val readStreamDataListeners = ConcurrentHashMap <String , (ReadStreamDataEvent ) - > Unit > ()
48- private val readStreamProgressListeners = ConcurrentHashMap <String , (ReadStreamProgressEvent ) - > Unit > ()
49+ private val readStreamProgressListeners =
50+ ConcurrentHashMap <String , (ReadStreamProgressEvent ) - > Unit > ()
4951 private val readStreamEndListeners = ConcurrentHashMap <String , (ReadStreamEndEvent ) - > Unit > ()
50- private val readStreamErrorListeners = ConcurrentHashMap <String , (ReadStreamErrorEvent ) - > Unit > ()
51- private val writeStreamProgressListeners = ConcurrentHashMap <String , (WriteStreamProgressEvent ) - > Unit > ()
52- private val writeStreamFinishListeners = ConcurrentHashMap <String , (WriteStreamFinishEvent ) - > Unit > ()
53- private val writeStreamErrorListeners = ConcurrentHashMap <String , (WriteStreamErrorEvent ) - > Unit > ()
52+ private val readStreamErrorListeners =
53+ ConcurrentHashMap <String , (ReadStreamErrorEvent ) - > Unit > ()
54+ private val writeStreamProgressListeners =
55+ ConcurrentHashMap <String , (WriteStreamProgressEvent ) - > Unit > ()
56+ private val writeStreamFinishListeners =
57+ ConcurrentHashMap <String , (WriteStreamFinishEvent ) - > Unit > ()
58+ private val writeStreamErrorListeners =
59+ ConcurrentHashMap <String , (WriteStreamErrorEvent ) - > Unit > ()
5460
5561 // Write stream: queue for incoming writes
56- private data class WriteRequest (val data : ByteArray , val isString : Boolean = false )
62+ private data class WriteRequest (
63+ val data : ByteArray? ,
64+ val isString : Boolean = false ,
65+ val isEnd : Boolean = false
66+ )
5767
5868 // Add reference to RNFSManager and context
5969 private val reactContext = NitroModules .applicationContext!!
@@ -122,18 +132,21 @@ class Fs2Stream(): HybridFs2StreamSpec() {
122132 impl.state.job = streamScope.launch {
123133 var bytesWritten = 0L
124134 try {
125- while (impl.state.isActive ) {
135+ writeLoop@ while (true ) {
126136 val req = impl.queue.take()
127- impl.outputStream.write(req.data)
128- impl.state.position + = req.data.size
129- bytesWritten + = req.data.size
130- writeStreamProgressListeners[streamId]?.invoke(
131- WriteStreamProgressEvent (
132- streamId = streamId,
133- bytesWritten = bytesWritten,
134- lastChunkSize = req.data.size.toLong()
137+ if (req.isEnd) break @writeLoop
138+ req.data?.let { data ->
139+ impl.outputStream.write(data)
140+ impl.state.position + = data.size
141+ bytesWritten + = data.size
142+ writeStreamProgressListeners[streamId]?.invoke(
143+ WriteStreamProgressEvent (
144+ streamId = streamId,
145+ bytesWritten = bytesWritten,
146+ lastChunkSize = data.size.toLong()
147+ )
135148 )
136- )
149+ }
137150 }
138151 } catch (e: Exception ) {
139152 writeStreamErrorListeners[streamId]?.invoke(
@@ -144,7 +157,10 @@ class Fs2Stream(): HybridFs2StreamSpec() {
144157 )
145158 )
146159 } finally {
147- try { impl.outputStream.close() } catch (_: Exception ) {}
160+ try {
161+ impl.outputStream.close()
162+ } catch (_: Exception ) {
163+ }
148164 impl.state.isActive = false
149165 }
150166 }
@@ -155,7 +171,8 @@ class Fs2Stream(): HybridFs2StreamSpec() {
155171 // --- Read Stream Control ---
156172 override fun startReadStream (streamId : String ): Promise <Unit > {
157173 return Promise .async {
158- val state = readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
174+ val state =
175+ readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
159176 if (state.isActive) return @async
160177 state.isActive = true
161178 if (state.job == null ) {
@@ -249,7 +266,8 @@ class Fs2Stream(): HybridFs2StreamSpec() {
249266
250267 override fun pauseReadStream (streamId : String ): Promise <Unit > {
251268 return Promise .async {
252- val state = readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
269+ val state =
270+ readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
253271 if (! state.isActive) return @async
254272 if (! state.pauseMutex.isLocked) state.pauseMutex.lock()
255273 state.isActive = false
@@ -258,7 +276,8 @@ class Fs2Stream(): HybridFs2StreamSpec() {
258276
259277 override fun resumeReadStream (streamId : String ): Promise <Unit > {
260278 return Promise .async {
261- val state = readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
279+ val state =
280+ readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
262281 if (state.isActive) return @async
263282 if (state.pauseMutex.isLocked) state.pauseMutex.unlock()
264283 state.isActive = true
@@ -267,7 +286,8 @@ class Fs2Stream(): HybridFs2StreamSpec() {
267286
268287 override fun closeReadStream (streamId : String ): Promise <Unit > {
269288 return Promise .async {
270- val state = readStreams.remove(streamId) ? : throw Exception (" ENOENT: No such read stream: $streamId " )
289+ val state = readStreams.remove(streamId)
290+ ? : throw Exception (" ENOENT: No such read stream: $streamId " )
271291 state.job?.cancel()
272292 readStreamDataListeners.remove(streamId)
273293 readStreamProgressListeners.remove(streamId)
@@ -278,19 +298,33 @@ class Fs2Stream(): HybridFs2StreamSpec() {
278298
279299 override fun isReadStreamActive (streamId : String ): Promise <Boolean > {
280300 return Promise .async {
281- val state = readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
301+ val state =
302+ readStreams[streamId] ? : throw Exception (" ENOENT: No such read stream: $streamId " )
282303 return @async state.isActive
283304 }
284305 }
285306
286307 // --- Write Stream Control ---
287308 override fun writeToStream (streamId : String , data : ArrayBuffer ): Promise <Unit > {
309+ val copiedBuffer: ArrayBuffer
310+ try {
311+ // Create a copy of the ArrayBuffer to ensure we have ownership
312+ copiedBuffer = ArrayBuffer .copy(data)
313+ } catch (e: Exception ) {
314+ // If copying fails, reject immediately
315+ return Promise .rejected(Exception (" Failed to copy ArrayBuffer: ${e.message} " ))
316+ }
317+
288318 return Promise .async {
289- val impl = writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
319+ val impl =
320+ writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
290321 if (! impl.state.isActive) throw Exception (" EPIPE: Write stream is not active: $streamId " )
291- val bytes = data .getBuffer(true ).let { buf ->
322+ val bytes = copiedBuffer .getBuffer(true ).let { buf ->
292323 if (buf.hasArray()) {
293- buf.array().copyOfRange(buf.arrayOffset() + buf.position(), buf.arrayOffset() + buf.limit())
324+ buf.array().copyOfRange(
325+ buf.arrayOffset() + buf.position(),
326+ buf.arrayOffset() + buf.limit()
327+ )
294328 } else {
295329 ByteArray (buf.remaining()).also { buf.get(it) }
296330 }
@@ -302,43 +336,87 @@ class Fs2Stream(): HybridFs2StreamSpec() {
302336
303337 override fun flushWriteStream (streamId : String ): Promise <Unit > {
304338 return Promise .async {
305- val impl = writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
339+ val impl =
340+ writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
306341 impl.outputStream.flush()
307342 }
308343 }
309344
310345 override fun closeWriteStream (streamId : String ): Promise <Unit > {
311346 return Promise .async {
312- val impl = writeStreams.remove(streamId) ? : throw Exception (" ENOENT: No such write stream: $streamId " )
347+ val impl = writeStreams.remove(streamId)
348+ ? : throw Exception (" ENOENT: No such write stream: $streamId " )
313349 impl.state.job?.cancel()
314350 impl.outputStream.close()
315- writeStreamProgressListeners.remove(streamId)
316- writeStreamFinishListeners.remove(streamId)
317- writeStreamErrorListeners.remove(streamId)
318351 writeStreamFinishListeners[streamId]?.invoke(
319352 WriteStreamFinishEvent (
320353 streamId = streamId,
321354 bytesWritten = impl.state.position,
322355 success = true
323356 )
324357 )
358+ writeStreamProgressListeners.remove(streamId)
359+ writeStreamFinishListeners.remove(streamId)
360+ writeStreamErrorListeners.remove(streamId)
325361 }
326362 }
327363
328364 override fun isWriteStreamActive (streamId : String ): Promise <Boolean > {
329365 return Promise .async {
330- val impl = writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
366+ val impl =
367+ writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
331368 return @async impl.state.isActive
332369 }
333370 }
334371
335372 override fun getWriteStreamPosition (streamId : String ): Promise <Long > {
336373 return Promise .async {
337- val impl = writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
374+ val impl =
375+ writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
338376 return @async impl.state.position
339377 }
340378 }
341379
380+ override fun endWriteStream (streamId : String ): Promise <Unit > {
381+ return Promise .async {
382+ val impl =
383+ writeStreams[streamId] ? : throw Exception (" ENOENT: No such write stream: $streamId " )
384+
385+ // Mark the stream as finished (no more writes)
386+ impl.state.isActive = false
387+
388+ // Enqueue an 'end' marker to unblock the write job
389+ impl.queue.add(WriteRequest (null , isEnd = true ))
390+
391+ // Wait for the background job to finish
392+ impl.state.job?.join()
393+
394+ // Now cleanup (remove from map, close file, emit finish)
395+ writeStreams.remove(streamId)
396+
397+ try {
398+ impl.outputStream.flush()
399+ } catch (_: Exception ) {
400+ }
401+ try {
402+ impl.outputStream.close()
403+ } catch (_: Exception ) {
404+ }
405+
406+ writeStreamFinishListeners[streamId]?.invoke(
407+ WriteStreamFinishEvent (
408+ streamId = streamId,
409+ bytesWritten = impl.state.position,
410+ success = true
411+ )
412+ )
413+
414+ writeStreamProgressListeners.remove(streamId)
415+ writeStreamFinishListeners.remove(streamId)
416+ writeStreamErrorListeners.remove(streamId)
417+ }
418+ }
419+
342420 // --- Event Listener Registration ---
343421 override fun listenToReadStreamData (
344422 streamId : String ,
0 commit comments