Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 55 additions & 26 deletions src/transports/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,51 @@ module.exports = class Connection extends EventEmitter {
this.state = SOCK_DISCONNECTED;
this.last_socket_error = null;
this.socket_events = [];
this.requested_disconnect = false;

this.encoding = 'utf8';
this._encodingIsUtf8 = true;

this.write_queue = [];
this.write_draining = false;
}

isConnected() {
return this.state === SOCK_CONNECTED;
}

_encodeData(line) {
return this._encodingIsUtf8 ?
line + '\r\n' :
iconv.encode(line + '\r\n', this.encoding);
}

_flushWriteQueue() {
if (!this.socket) return;
this.write_draining = false;
while (this.write_queue.length > 0) {
const { data, cb } = this.write_queue.shift();
const ok = this.socket.write(data, cb);
if (!ok) {
this.write_draining = true;
return;
}
}
}

writeLine(line, cb) {
if (this.socket && this.isConnected()) {
if (this.encoding !== 'utf8') {
this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb);
const data = this._encodeData(line);
if (this.write_draining) {
this.write_queue.push({ data, cb });
} else {
this.socket.write(line + '\r\n', cb);
const ok = this.socket.write(data, cb);
if (!ok) {
this.write_draining = true;
}
}
} else {
this.debugOut('writeLine() called when not connected');

if (cb) {
process.nextTick(cb);
}
Expand All @@ -63,6 +90,7 @@ module.exports = class Connection extends EventEmitter {

_unbindEvents() {
this.socket_events.forEach(fn => fn());
this.socket_events = [];
}

connect() {
Expand All @@ -76,6 +104,8 @@ module.exports = class Connection extends EventEmitter {
this.disposeSocket();
this.requested_disconnect = false;
this.incoming_buffer = Buffer.from('');
this.write_queue = [];
this.write_draining = false;

// Include server name (SNI) if provided host is not an IP address
if (!this.getAddressFamily(ircd_host)) {
Expand Down Expand Up @@ -125,16 +155,15 @@ module.exports = class Connection extends EventEmitter {
this._onSocketCreate(options, connection);
}).catch(this.onSocketError.bind(this));
} else {
let socket = null;
if ((options.tls || options.ssl) && options.path) {
socket = this.socket = tls.connect({
this.socket = tls.connect({
path: options.path,
rejectUnauthorized: options.rejectUnauthorized,
key: options.client_certificate && options.client_certificate.private_key,
cert: options.client_certificate && options.client_certificate.certificate,
});
} else if (options.tls || options.ssl) {
socket = this.socket = tls.connect({
this.socket = tls.connect({
servername: sni,
host: ircd_host,
port: ircd_port,
Expand All @@ -145,18 +174,18 @@ module.exports = class Connection extends EventEmitter {
family: this.getAddressFamily(options.outgoing_addr)
});
} else if (options.path) {
socket = this.socket = net.connect({
this.socket = net.connect({
path: options.path
});
} else {
socket = this.socket = net.connect({
this.socket = net.connect({
host: ircd_host,
port: ircd_port,
localAddress: options.outgoing_addr,
family: this.getAddressFamily(options.outgoing_addr)
});
}
this._onSocketCreate(options, socket);
this._onSocketCreate(options, this.socket);
}
}

Expand All @@ -181,6 +210,7 @@ module.exports = class Connection extends EventEmitter {
socket instanceof tls.TLSSocket ? 'secureConnect' : 'connect',
this.onSocketFullyConnected.bind(this)
);
this._bindEvent(socket, 'drain', this._flushWriteQueue.bind(this));
this._bindEvent(socket, 'close', this.onSocketClose.bind(this));
this._bindEvent(socket, 'error', this.onSocketError.bind(this));
this._bindEvent(socket, 'data', this.onSocketData.bind(this));
Expand All @@ -205,13 +235,12 @@ module.exports = class Connection extends EventEmitter {
onSocketClose() {
this.debugOut('socketClose()');
this.state = SOCK_DISCONNECTED;
this.emit('close', this.last_socket_error ? this.last_socket_error : false);
this.emit('close', this.last_socket_error || false);
}

onSocketError(err) {
this.debugOut('socketError() ' + err.message);
this.last_socket_error = err;
// this.emit('error', err);
}

onSocketTimeout() {
Expand All @@ -221,29 +250,28 @@ module.exports = class Connection extends EventEmitter {

onSocketData(data) {
// Buffer incoming data because multiple messages can arrive at once
// without necessarily ending in a new line
this.incoming_buffer = Buffer.concat(
[this.incoming_buffer, data],
this.incoming_buffer.length + data.length
);
// without necessarily ending in a new line.
// Only concat if we have a leftover partial line from a previous chunk.
const buffer = this.incoming_buffer.length > 0 ?
Buffer.concat([this.incoming_buffer, data], this.incoming_buffer.length + data.length) :
data;

let startIndex = 0;

while (true) {
// Search for the next new line in the buffered data
const endIndex = this.incoming_buffer.indexOf(0x0A, startIndex) + 1;
const endIndex = buffer.indexOf(0x0A, startIndex) + 1;

// If this message is partial, keep it in the buffer until more data arrives.
// If startIndex is equal to incoming_buffer.length, that means we reached the end
// If startIndex is equal to buffer.length, that means we reached the end
// of the buffer and it ended on a new line, slice will return an empty buffer.
if (endIndex === 0) {
this.incoming_buffer = this.incoming_buffer.slice(startIndex);
this.incoming_buffer = buffer.slice(startIndex);
break;
}

// Slice a single message delimited by a new line, decode it and emit it out
let line = this.incoming_buffer.slice(startIndex, endIndex);
line = iconv.decode(line, this.encoding);
const line = iconv.decode(buffer.slice(startIndex, endIndex), this.encoding);
this.emit('line', line);

startIndex = endIndex;
Expand Down Expand Up @@ -280,19 +308,17 @@ module.exports = class Connection extends EventEmitter {
}

setEncoding(encoding) {
let encoded_test;

this.debugOut('Connection.setEncoding() encoding=' + encoding);

try {
const testString = 'TEST\r\ntest';

encoded_test = iconv.encode(testString, encoding);
const encoded_test = iconv.encode(testString, encoding);
// This test is done to check if this encoding also supports
// the ASCII charset required by the IRC protocols
// (Avoid the use of base64 or incompatible encodings)
if (encoded_test.toString('ascii') === testString) {
this.encoding = encoding;
this._encodingIsUtf8 = (encoding === 'utf8');
return true;
}
return false;
Expand All @@ -308,5 +334,8 @@ module.exports = class Connection extends EventEmitter {
if (net.isIPv6(addr)) {
return 6;
}
// Returns undefined for hostnames or unset addresses.
// Callers rely on this being falsy (e.g. for SNI detection).
return undefined;
}
};