diff --git a/packages/app/src/codec.ts b/packages/app/src/codec.ts index 9640835..5c1753e 100644 --- a/packages/app/src/codec.ts +++ b/packages/app/src/codec.ts @@ -14,6 +14,23 @@ export class Bytes { static decode(input: Uint8Array): string { return decoder.decode(input); } + + static append( + constructor: { new (length: number): T }, + ...arrays: T[] + ) { + let totalLength = 0; + for (const arr of arrays) { + totalLength += arr.length; + } + const result = new constructor(totalLength); + let offset = 0; + for (const arr of arrays) { + result.set(arr, offset); + offset += arr.length; + } + return result; + } } export class Headers { @@ -128,10 +145,44 @@ export class StreamDemuxer extends Queue { // FIXME: we needs to actually do framed reads here since `bytes` may not be a complete message private async start(): Promise { + let contentLength: null | number = null; + let buffer = new Uint8Array(); + for await (const bytes of this) { - const delimited = Bytes.decode(bytes); - const message = JSON.parse(Headers.remove(delimited)) as vsrpc.Message; + buffer = Bytes.append(Uint8Array, buffer, bytes); + + // check if the content length is known + if (null == contentLength) { + // if not, try to match the prefixed headers + const match = Bytes.decode(buffer).match(/^Content-Length:\s*(\d+)\s*/); + if (null == match) continue; + + // try to parse the content-length from the headers + const length = parseInt(match[1]); + if (isNaN(length)) throw new Error("invalid content length"); + + // slice the headers since we now have the content length + buffer = buffer.slice(match[0].length); + + // set the content length + contentLength = length; + } + + // if the buffer doesn't contain a full message; await another iteration + if (buffer.length < contentLength) continue; + + // decode buffer to a string + const delimited = Bytes.decode(buffer); + + // reset the buffer + buffer = buffer.slice(contentLength); + // reset the contentLength + contentLength = null; + + const message = JSON.parse(delimited) as vsrpc.Message; Tracer.server(message); + + // demux the message stream if (vsrpc.Message.isResponse(message) && null != message.id) { this.responses.set(message.id, message); continue;