Skip to content

Add packet transfer protocol #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
110 changes: 85 additions & 25 deletions core/serial.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,32 +215,50 @@ To add a new serial device, you must add an object to
});
}
}, function(data) { // RECEIEVE DATA
if (!(data instanceof ArrayBuffer)) console.warn("Serial port implementation is not returning ArrayBuffers");
if (Espruino.Config.SERIAL_FLOW_CONTROL) {
var u = new Uint8Array(data);
for (var i=0;i<u.length;i++) {
if (u[i]==17) { // XON
console.log("XON received => resume upload");
flowControlXOFF = false;
if (flowControlTimeout) {
clearTimeout(flowControlTimeout);
flowControlTimeout = undefined;
if (!(data instanceof ArrayBuffer)) console.warn("Serial port implementation is not returning ArrayBuffers")

// Filter incoming data to handle and remove control characters
const filteredData = new Uint8Array(data).filter((v) => {
switch (v) {
case 17: // XON
if (Espruino.Config.SERIAL_FLOW_CONTROL) {
console.log("XON received => resume upload")
flowControlXOFF = false
if (flowControlTimeout) {
clearTimeout(flowControlTimeout)
flowControlTimeout = undefined
}
}
return false

case 19: // XOFF
if (Espruino.Config.SERIAL_FLOW_CONTROL) {
console.log("XOFF received => pause upload")
flowControlXOFF = true
if (flowControlTimeout) clearTimeout(flowControlTimeout)
flowControlTimeout = setTimeout(function () {
console.log(
`XOFF timeout (${FLOW_CONTROL_RESUME_TIMEOUT}s) => resume upload anyway`
)
flowControlXOFF = false
flowControlTimeout = undefined
}, FLOW_CONTROL_RESUME_TIMEOUT)
}
}
if (u[i]==19) { // XOFF
console.log("XOFF received => pause upload");
flowControlXOFF = true;
if (flowControlTimeout)
clearTimeout(flowControlTimeout);
flowControlTimeout = setTimeout(function() {
console.log(`XOFF timeout (${FLOW_CONTROL_RESUME_TIMEOUT}s) => resume upload anyway`);
flowControlXOFF = false;
flowControlTimeout = undefined;
}, FLOW_CONTROL_RESUME_TIMEOUT);
}
return false

case 6: // ACK
emit("ack")
return false

case 21: // NACK
emit("nack")
return false
}
}
if (readListener) readListener(data);

return true
})

if (readListener) readListener(filteredData.buffer)
}, function(error) { // DISCONNECT
currentDevice = undefined;
if (writeTimeout!==undefined)
Expand Down Expand Up @@ -412,6 +430,45 @@ To add a new serial device, you must add an object to
}
};

/**
* Simplified events system.
* @typedef {"close"|"data"|"open"|"error"|"ack"|"nack"|"packet"} PacketEvent
* @typedef {(...any) => void} PacketEventListener
*/

/** @type {Object.<PacketEvent, PacketEventListener} */
var pkListeners = {};

/**
* Act on events using a simplified events listener
* @param {PacketEvent} evt
* @param {PacketEventListener} cb
*/
function on(evt, cb) {
let e = "on" + evt;
if (!pkListeners[e]) pkListeners[e] = [];
pkListeners[e].push(cb);
}

/**
* Emit event on the event handler, will call all registered callbacks for {evt} and pass {data}
* @param {PacketEvent} evt
* @param {...any} data
*/
function emit(evt, ...data) {
let e = "on" + evt;
if (pkListeners[e]) pkListeners[e].forEach(fn => fn(...data));
}

/**
* Remove a {PacketEvent} listener
* @param {PacketEvent} evt
* @param {PacketEventListener} callback
*/
function removeListener(evt, callback) {
let e = "on" + evt;
if (pkListeners[e]) pkListeners[e] = pkListeners[e].filter(fn => fn != callback);
}

// ----------------------------------------------------------
Espruino.Core.Serial = {
Expand Down Expand Up @@ -442,6 +499,9 @@ To add a new serial device, you must add an object to
},
"setBinary": function(isOn) {
sendingBinary = isOn;
}
},

// Packet events system
on, emit, removeListener
};
})();
154 changes: 153 additions & 1 deletion core/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,157 @@
}
}

/**
* Packet types mapped to their wire values
* @typedef {Object} PacketTypes
* @property {number} RESPONSE - Response to an EVAL packet
* @property {number} EVAL - Execute and return the result as RESPONSE packet
* @property {number} EVENT - Parse as JSON and create `E.on('packet', ...)` event
* @property {number} FILE_SEND - Called before DATA, with {fn:"filename",s:123}
* @property {number} DATA - Sent after FILE_SEND with blocks of data for the file
* @property {number} FILE_RECV - Receive a file - returns a series of PT_TYPE_DATA packets, with a final zero length packet to end
*/
const pkTypes = Object.freeze({
RESPONSE: 0,
EVAL: 0x2000,
EVENT: 0x4000,
FILE_SEND: 0x6000,
DATA: 0x8000,
FILE_RECV: 0xA000
})

/**
* Creates a new packet for transfer using the packet protocol
* @param {number} pkType The packet type being sent, from `PacketTypes`
* @param {string} data Data to be appended to the end of the packet (max length 8191 bytes)
* @returns {string}
*/
function createPacket(pkType, data) {

// Check the packet type is one of the known types
if (!Object.hasOwn(pkTypes, pkType)) throw new Error(`'pkType' '${pkType}' not one of ${Object.keys(pkTypes)}`);

// Check the data is a string type and length is in bounds
if (typeof data !== 'string') throw new Error("data must be a String");
if (data.length <= 0 || data.length > 0x1FFF) throw new Error('data length is out of bounds, max 8191 bytes');

// Create packet heading using packet type and data length
const heading = pkTypes[pkType] | data.length

return String.fromCharCode(
16, // DLE (Data Link Escape)
1, // SOH (Start of Heading)
(heading >> 8) &0xFF, // Upper byte of heading
heading & 0xFF // Lower byte of heading
) + data; // Data blob
}

/**
* Take an input buffer and look for the initial control characters and then attempt to parse a
* complete data packet from the buffer. Any complete packet is sent via `emit("packet")` and then
* stripped from `buffer` modifiying it.
* @param {Uint8Array} buffer
*/
function parsePacketsFromBuffer(buffer) {

// Find DLE
const dle = buffer.findIndex(v => v === 0x10)
if (dle < 0) return

// Check for SOH
if (buffer.at(dle + 1) !== 0x1) {
console.log("DLE not followed by SOH")
return
}

// Check there's still space for headers
if (buffer.at(dle + 2) === undefined || buffer.at(dle + 3) === undefined) return
const upper = buffer.at(dle + 2)
const lower = buffer.at(dle + 3)

// Parse heading from 2 bytes after control headers
const heading = new Number(upper << 8) | new Number(lower)
const pkLen = heading & 0x1FFF
const pkTyp = heading & 0xE000

// Ignoring heading bytes, check if there's enough bytes in the buffer to satisfy pkLen
if (buffer.length < dle + 4 + pkLen) return

const packet = buffer.subarray(dle, dle + 4 + pkLen)
Espruino.Core.Serial.emit('packet', pkTyp, packet.subarray(4, packet.length))

buffer.fill(undefined, dle, dle + packet.length)

return
}

/**
* Send a packet
* @param {number} pkType
* @param {string} data
* @param {() => void} callback
*/
function sendPacket(pkType, data, callback) {

function onAck() {
// TODO: What do we actually need to do in the event of an ack
// tidy()
// callback()
}

function onNack(err) {
tidy()
callback(err)
}

function onPacket(rxPkType, data) {
const packetData = String.fromCharCode(...data)
console.log('onPacket', rxPkType, packetData)

// TODO: Depending on the rx type and tx type match up packet types, wait for x number of data
if (pkType === pkTypes.EVAL && rxPkType === pkTypes.RESPONSE) {
tidy()
callback(data)
}
}

// Tidy up the event listeners from this packet task
function tidy() {
Espruino.Core.Serial.removeListener("ack", onAck)
Espruino.Core.Serial.removeListener("nack", onNack)
Espruino.Core.Serial.removeListener("packet", onPacket)
}

// Attach event handlers for this packet event
Espruino.Core.Serial.on("ack", onAck)
Espruino.Core.Serial.on("nack", onNack)
Espruino.Core.Serial.on("packet", onPacket)

// Write packet to serial port
Espruino.Core.Serial.write(createPacket(pkType, data), undefined, function () {
// TODO: Add 1 sec timeout

let dataBuffer = new Uint8Array()

// Each time data comes in, expand the buffer and add the new data to it
// TODO: This seems problematic if there are subsequent/concurrent calls
Espruino.Core.Serial.startListening((data) => {
const newBuffer = new Uint8Array(data)

const tempBuffer = new Uint8Array(dataBuffer.length + newBuffer.length)
tempBuffer.set(dataBuffer, 0)
tempBuffer.set(newBuffer, dataBuffer.length)

dataBuffer = tempBuffer

// Now we've added more data to the buffer, try to parse out any packets
parsePacketsFromBuffer(dataBuffer)
})
})
}

/**
* Download a file - storageFile or normal file
* @param {string} fileName Path to file to download
* @param {(content?: string) => void} callback Call back with contents of file, or undefined if no content
*/
Expand Down Expand Up @@ -1116,6 +1266,7 @@ while (d!==undefined) {console.log(btoa(d));d=f.read(${CHUNKSIZE});}
countBrackets : countBrackets,
getEspruinoPrompt : getEspruinoPrompt,
executeExpression : function(expr,callback) { executeExpression(expr,callback,{exprPrintsResult:false}); },
executeExpressionV2: function(expr,callback) { sendPacket("EVAL",expr,callback); /* TODO: Callback and parseRJSON */ },
executeStatement : function(statement,callback) { executeExpression(statement,callback,{exprPrintsResult:true}); },
downloadFile : downloadFile, // (fileName, callback)
getUploadFileCode : getUploadFileCode, //(fileName, contents);
Expand Down Expand Up @@ -1143,6 +1294,7 @@ while (d!==undefined) {console.log(btoa(d));d=f.read(${CHUNKSIZE});}
asUTF8Bytes : asUTF8Bytes,
isASCII : isASCII,
btoa : btoa,
atob : atob
atob : atob,
createPacket : createPacket
};
}());
Loading