Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler algorithm #59

Open
ronag opened this issue Jan 17, 2022 · 9 comments
Open

Simpler algorithm #59

ronag opened this issue Jan 17, 2022 · 9 comments

Comments

@ronag
Copy link
Contributor

ronag commented Jan 17, 2022

Couldn't quite get thread-stream running well in our services so we re-implemented something similar. We used a simpler concurrent queue implementation that might be useful for thread-stream as well.

Basically we do it like this.

const MAX_LEN = 8 * 1024
const BUF_END = 256 * MAX_LEN
const BUF_LEN = BUF_END + MAX_LEN

const WRITE_INDEX = 4
const READ_INDEX = 8

const sharedBuffer = new SharedArrayBuffer(BUF_LEN)
const sharedState = new SharedArrayBuffer(128)

const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)


buffer[0] = 31 // Item header
Atomics.store(state, WRITE_INDEX, 1)
Atomics.notify(state, WRITE_INDEX)

// producer

let draining = false
async function drain () {
  draining = true

  let write = Atomics.load(state, WRITE_INDEX)
  let read = Atomics.load(state, READ_INDEX)

  while (write <= read && write + MAX_LEN > read) {
    const { async, value } = Atomics.waitAsync(state, READ_INDEX, read)
    if (async) {
      await value
    }
    read = Atomics.load(state, READ_INDEX)
  }

  draining = false
  emit('drain')
}

send(data) {
  const len = MAX_LEN // or Buffer.byteLength(name)

  let read = Atomics.load(this._state, READ_INDEX)

  while (write < read && write + len > read) {
    Atomics.wait(this._state, READ_INDEX, read)
    read = Atomics.load(state, READ_INDEX)
  }

  write += this._buffer.write(name, this._write)
  buffer[write++] = 31

  if (write > BUF_END) {
   write = 0
  }

  Atomics.store(state, WRITE_INDEX, write)
  Atomics.notify(state, WRITE_INDEX)

  const needDrain = write + MAX_LEN >= BUF_END

  if (needDrain && !draining) {
    draining = true
    drain()
  }

  return !draining
}

// consumer
async function * receive () {
  while (true) {
    let write = Atomics.load(state, WRITE_INDEX)

    if (read > BUF_END) {
      read = 0
    }

    while (read === write) {
      const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
      if (async) {
        await value
      }
      write = Atomics.load(state, WRITE_INDEX)
    }

    if (write < read) {
      write = BUF_END
    }

    const arr = []
    while (read < write) {
      const idx = buffer.indexOf(31, read)
      arr.push(buffer.toString('utf-8', read, idx))
      read = idx + 1
    }

    Atomics.store(state, READ_INDEX, read)
    Atomics.notify(state, READ_INDEX)

    yield* arr
  }
}
@mcollina
Copy link
Member

I don't think you can make the assumption that the consumer is faster.

I would really like to solve those bugs... however I need reproductions.

@ronag
Copy link
Contributor Author

ronag commented Jan 17, 2022

I don't think you can make the assumption that the consumer is faster.

We already kind of do that https://github.com/pinojs/thread-stream/blob/main/index.js#L218. My suggestion is missing 'drain' atm.

I can sort that out as well. I'd like to try applying this to thread-stream but is a bit short on time atm so just wanted to post our progress here n case someone wants to have a take on it-

@ronag
Copy link
Contributor Author

ronag commented Jan 17, 2022

Updated with drain

@mcollina
Copy link
Member

We already kind of do that https://github.com/pinojs/thread-stream/blob/main/index.js#L218. My suggestion is missing 'drain' atm.

This only happens if we are over MAX_STRING which happens only if the consumer is extremely slower than the producer.

Note that you use waitAsync which is not available in Node v14 as far as I am aware.

@ronag

This comment has been minimized.

@mcollina
Copy link
Member

IMHO 90% of the complexity comes when avoiding the blocking on the writer, moreover it's the first requirement of this system. If we are blocking the main thread, we could avoid all this threading and just keep everything in main.

@ronag
Copy link
Contributor Author

ronag commented Jan 17, 2022

IMHO 90% of the complexity comes when avoiding the blocking on the writer, moreover it's the first requirement of this system. If we are blocking the main thread, we could avoid all this threading and just keep everything in main.

Here you go with non blocking writer:

const assert = require('assert')
const EE = require('events')

const WRITE_INDEX = 4
const READ_INDEX = 8

async function* read({ sharedState, sharedBuffer, maxMessageSize }) {
  assert(sharedState)
  assert(sharedBuffer)
  assert(maxMessageSize)

  const state = new Int32Array(sharedState)
  const buffer = Buffer.from(sharedBuffer)
  const bufEnd = buffer.length - maxMessageSize

  let read = 0
  while (true) {
    let write = Atomics.load(state, WRITE_INDEX)

    if (read > bufEnd) {
      read = 0
    }

    while (read === write) {
      const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
      if (async) {
        await value
      }
      write = Atomics.load(state, WRITE_INDEX)
    }

    if (write < read) {
      write = bufEnd
    }

    const arr = []
    while (read < write) {
      const len = buffer.readInt32LE(read)
      arr.push(buffer.toString('utf-8', read + 4, read + 4 + len))
      read += 4 + len
    }

    Atomics.store(state, READ_INDEX, read)
    Atomics.notify(state, READ_INDEX)

    yield* arr
  }
}

class Writer extends EE {
  constructor({ sharedState, sharedBuffer, maxMessageSize }) {
    assert(sharedState && sharedState.byteLength >= 32)
    assert(sharedBuffer)
    assert(maxMessageSize)

    super()

    this._state = new Int32Array(sharedState)
    this._buffer = Buffer.from(sharedBuffer)
    this._write = 0
    this._maxMessageSize = maxMessageSize
    this._bufEnd = this._buffer.length - maxMessageSize
    this._needDrain = false
  }

  write(data) {
    let read = Atomics.load(this._state, READ_INDEX)

    while (this._isFull(read)) {
      Atomics.wait(this._state, READ_INDEX, read)
      read = Atomics.load(this._state, READ_INDEX)
    }

    const written = this._buffer.write(data, this._write + 4)

    this._buffer.writeInt32LE(written, this._write)
    this._write += 4 + written

    assert(this._write + 1 < this._buffer.length)

    if (this._write > this._bufEnd) {
      this._write = 0
    }

    Atomics.store(this._state, WRITE_INDEX, this._write)
    Atomics.notify(this._state, WRITE_INDEX)

    const needDrain = this._isFull(read)
    if (needDrain && !this._needDrain) {
      this._needDrain = true
      this._drain()
    }

    return needDrain
  }

  _isFull(read) {
    return this._write < read && this._write + this._maxMessageSize > read
  }

  async _drain() {
    let read = Atomics.load(this._state, READ_INDEX)
    while (this._isFull(read)) {
      const { async, value } = Atomics.wait(this._state, READ_INDEX, read)
      if (async) {
        await value
      }
      read = Atomics.load(this._state, READ_INDEX)
    }
    this._needDrain = false
    this.emit('drain')
  }
}

module.exports = {
  read,
  Writer,
}

@ronag
Copy link
Contributor Author

ronag commented Jan 17, 2022

We can polyfill Atomics.asyncWait in node 14.

@mcollina
Copy link
Member

There are a couple of edge cases relates to how we handle flushSync() and end() this is not managing. However I'be really happy to see the crappy waitAsync implementation we have here replaced with code similar to yours.

Something that worries me is that given we do not have the reproduction scripts for your problems, we'll make the same mistakes twice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants