Implement framed decoding for demuxer

This commit is contained in:
silvanshade 2022-06-18 12:49:28 -06:00
parent dfabf15298
commit 82503d3f28
No known key found for this signature in database

View file

@ -14,6 +14,23 @@ export class Bytes {
static decode(input: Uint8Array): string {
return decoder.decode(input);
}
static append<T extends { length: number; set(arr: T, offset: number): void }>(
constructor: { new (length: number): T },
...arrays: T[]
) {
let totalLength = 0;
for (const arr of arrays) {
totalLength += arr.length;
}
const result = new constructor(totalLength);
let offset = 0;
for (const arr of arrays) {
result.set(arr, offset);
offset += arr.length;
}
return result;
}
}
export class Headers {
@ -128,10 +145,44 @@ export class StreamDemuxer extends Queue<Uint8Array> {
// FIXME: we needs to actually do framed reads here since `bytes` may not be a complete message
private async start(): Promise<void> {
let contentLength: null | number = null;
let buffer = new Uint8Array();
for await (const bytes of this) {
const delimited = Bytes.decode(bytes);
const message = JSON.parse(Headers.remove(delimited)) as vsrpc.Message;
buffer = Bytes.append(Uint8Array, buffer, bytes);
// check if the content length is known
if (null == contentLength) {
// if not, try to match the prefixed headers
const match = Bytes.decode(buffer).match(/^Content-Length:\s*(\d+)\s*/);
if (null == match) continue;
// try to parse the content-length from the headers
const length = parseInt(match[1]);
if (isNaN(length)) throw new Error("invalid content length");
// slice the headers since we now have the content length
buffer = buffer.slice(match[0].length);
// set the content length
contentLength = length;
}
// if the buffer doesn't contain a full message; await another iteration
if (buffer.length < contentLength) continue;
// decode buffer to a string
const delimited = Bytes.decode(buffer);
// reset the buffer
buffer = buffer.slice(contentLength);
// reset the contentLength
contentLength = null;
const message = JSON.parse(delimited) as vsrpc.Message;
Tracer.server(message);
// demux the message stream
if (vsrpc.Message.isResponse(message) && null != message.id) {
this.responses.set(message.id, message);
continue;