PacketConverter class
class PacketConverter {
final _log = new Logger('PacketConverter');
final packets = new ListQueue<List<int>>();
final messages = new ListQueue<List<int>>();
final MAX_DOC_SIZE = 32 * 1024 * 1024;
bool headerMode = true;
int bytesToRead = 4;
List<int> buffer;
int readPos = 0;
List<int> messageBuffer;
int messagesConverted = 0;
final lengthBuffer = new BsonBinary(4);
addPacket(List<int> packet) {
packets.addLast(packet);
//print('addPacket to $this');
if (headerMode) {
handleHeader();
} else {
handleBody();
}
}
handleHeader(){
if (bytesAvailable()>=4) {
headerMode = false;
lengthBuffer.rewind();
readIntoBuffer(lengthBuffer.byteList,0);
int len = lengthBuffer.readInt32();
if (len > MAX_DOC_SIZE) {
throw new MongoDartError('Message length $len over maximum document size');
}
messageBuffer = new List<int>(len);
handleBody();
}
}
handleBody(){
if (bytesAvailable()>=messageBuffer.length-4) {
headerMode = true;
messageBuffer.setRange(0,4,lengthBuffer.byteList);
readIntoBuffer(messageBuffer,4);
messagesConverted++;
messages.addLast(messageBuffer);
handleHeader();
}
}
/// Length of all packets with current read position on first packet subtracted
int bytesAvailable() => packets.fold(- readPos,(value, element) => value + element.length) ;
void readIntoBuffer(List<int> buffer, int pos) {
if(buffer.length - pos > bytesAvailable()) {
// print('$this $buffer $pos');
throw new MongoDartError('Bad state. Read buffer too big');
}
int writePos = pos;
while (writePos < buffer.length) {
writePos += _readPacketIntoBuffer(buffer, writePos);
}
if (writePos < buffer.length) {
throw new MongoDartError('Bad state. Buffer was not written fully');
}
}
int _readPacketIntoBuffer(List<int> buffer, int pos) {
int bytesRead = min(buffer.length - pos,packets.first.length - readPos);
buffer.setRange(pos,pos+bytesRead,packets.first,readPos);
if (readPos + bytesRead == packets.first.length) {
readPos = 0;
packets.removeFirst();
} else {
readPos += bytesRead;
}
return bytesRead;
}
String toString() => 'PacketConverter(readPos: $readPos, headerMode: $headerMode, packets: $packets)';
bool get isClear => this.packets.isEmpty && messages.isEmpty && headerMode;
}
Properties
final lengthBuffer #
final lengthBuffer = new BsonBinary(4)
final MAX_DOC_SIZE #
final MAX_DOC_SIZE = 32 * 1024 * 1024
final messages #
final messages = new ListQueue<List<int>>()
final packets #
final packets = new ListQueue<List<int>>()
Methods
dynamic addPacket(List<int> packet) #
addPacket(List<int> packet) {
packets.addLast(packet);
//print('addPacket to $this');
if (headerMode) {
handleHeader();
} else {
handleBody();
}
}
int bytesAvailable() #
Length of all packets with current read position on first packet subtracted
int bytesAvailable() => packets.fold(- readPos,(value, element) => value + element.length) ;
dynamic handleBody() #
handleBody(){
if (bytesAvailable()>=messageBuffer.length-4) {
headerMode = true;
messageBuffer.setRange(0,4,lengthBuffer.byteList);
readIntoBuffer(messageBuffer,4);
messagesConverted++;
messages.addLast(messageBuffer);
handleHeader();
}
}
dynamic handleHeader() #
handleHeader(){
if (bytesAvailable()>=4) {
headerMode = false;
lengthBuffer.rewind();
readIntoBuffer(lengthBuffer.byteList,0);
int len = lengthBuffer.readInt32();
if (len > MAX_DOC_SIZE) {
throw new MongoDartError('Message length $len over maximum document size');
}
messageBuffer = new List<int>(len);
handleBody();
}
}
void readIntoBuffer(List<int> buffer, int pos) #
void readIntoBuffer(List<int> buffer, int pos) {
if(buffer.length - pos > bytesAvailable()) {
// print('$this $buffer $pos');
throw new MongoDartError('Bad state. Read buffer too big');
}
int writePos = pos;
while (writePos < buffer.length) {
writePos += _readPacketIntoBuffer(buffer, writePos);
}
if (writePos < buffer.length) {
throw new MongoDartError('Bad state. Buffer was not written fully');
}
}