xemail / send.js
command and library to send email
git clone http://git.nthia.dev/xemail

const TLS = require('./lib/starttls.js')
const { Writable, pipeline } = require('stream')
const { nextTick } = process
module.exports = Send

function Send(opts, onfinish) {
  if (!(this instanceof Send)) return new Send(opts, onfinish)
  let self = this
  if (!opts) opts = {}
  TLS.call(this, Object.assign({ isServer: false }, opts))
  this._onfinish = onfinish ?? noop
  this._data = opts.data
  if (typeof this._data === 'string' || Buffer.isBuffer(this._data)) {
    this._data = Readable.from(this._data)
  }
  this._rejectClear = opts.rejectClear ?? true
  if (opts.from === undefined) throw new Error('required parameter not provided: opts.from')
  if (opts.to === undefined) throw new Error('required parameter not provided: opts.to')
  this.from = String(opts.from)
  this.to = (Array.isArray(opts.to) ? opts.to : [opts.to]).map(String)
  if (this.to.length === 0) throw new Error('opts.to must contain one or more recipients')
  this.source = opts.domain ?? this.from.replace(/^[^@]*@/,'')
  this.destination = opts.server ?? this.to[0].replace(/^[^@]*@/,'')
  this._split = split((line,next) => {
    this._writeLine(line, next)
  })
  this._lines = []
  this._code = -1
  this._maxLines = 1024
  this._maxLineBytes = this._maxLines*256
  this._lineBytes = 0
  this._cap = null
  let sequence = [
    (next) => this._ehlo(next),
    (next) => this._upgrade(next),
    (next) => this._ehlo(next),
    (next) => this._sendFromTo(next),
    (next) => this._sendData((err,sent,code) => {
      if (err) return next(err)
      this._onfinish(null, sent, code)
      next()
    }),
    (next) => this._sendQuit(next),
    (next) => { this.destroy(); next() },
  ]
  this._cb = [
    (err,code,msg) => {
      if (err) return next(err)
      if (code < 200 || code >= 300) {
        return this._onfinish(new Error('non-2xx ehlo response: ' + code + ' ' + msg))
      }
      start(0)
    }
  ]
  function start(i) {
    if (i >= sequence.length) return
    sequence[i](err => {
      if (err) self._onfinish(err)
      else start(i+1)
    })
  }
}
Send.prototype = Object.create(TLS.prototype)

Send.prototype._upgrade = function (next) {
  if (!this._cap || !this._cap.some(x => /^starttls\b/i.test(x))) {
    if (this._rejectClear) {
      return nextTick(next, new Error('server does not support tls, '
        + 'refusing to send because rejectClear is enabled'))
    } else {
      return nextTick(next, null)
    }
  }
  this._send('STARTTLS\r\n', (err, code, msg) => {
    if (err) return next(err)
    if (code < 200 || code >= 300) {
      return next(new Error('non-2xx starttls response: ' + code + ' ' + msg))
    }
    this._startTLS()
    next()
  })
}

Send.prototype._ehlo = function (next) {
  this._send('EHLO ' + this.source + '\r\n', (err, code, msg) => {
    if (err) return next(err)
    if (code < 200 || code >= 300) {
      return next(new Error('non-2xx ehlo response: ' + code + ' ' + msg))
    }
    this._cap = msg.split(/\r?\n/)
    next()
  })
}

Send.prototype._sendFromTo = function (next) {
  let self = this
  this._send('MAIL FROM: <' + this.from + '>\r\n', (err, code, msg) => {
    if (err) return next(err)
    if (code < 200 || code >= 300) {
      return next(new Error('non-2xx mail from response: ' + code + ' ' + msg))
    }
    ;(function nextRcpt(i) {
      if (i >= self.to.length) return next()
      self._send('RCPT TO: <' + self.to[i] + '>\r\n', (err, code, msg) => {
        if (err) return next(err)
        if (code < 200 || code >= 300) {
          return next(new Error('non-2xx rcpt-to response: ' + code + ' ' + msg))
        }
        nextRcpt(i+1)
      })
    })(0)
  })
}

Send.prototype._sendData = function (next) {
  let self = this
  let sent = false
  this._send('DATA\r\n', (err, code, msg) => {
    if (err) return next(err)
    if (code < 300 || code >= 400) {
      return next(new Error('non-3xx data start response: ' + code + ' ' + msg))
    }
    let w = new Writable({
      write(buf, enc, cb) {
        self._push(buf)
        cb()
      },
      final(cb) {
        self._send('\r\n.\r\n', (err, code, msg) => {
          if (err) return next(err)
          if (code < 200 || code >= 300) {
            return next(new Error('non-2xx data end response: ' + code + ' ' + msg))
          }
          sent = true
          next(null, true, code)
        })
      },
    })
    pipeline(this._data, w, (err) => {
      if (err) next(err)
      else if (!sent) next(null, false, -1)
    })
  })
}

Send.prototype._sendQuit = function (next) {
  this._send('QUIT\r\n', (err, code, msg) => {
    if (err) return next(err)
    if (code < 200 || code >= 300) {
      return next(new Error('non-2xx data end response: ' + code + ' ' + msg))
    }
    this._end()
    next(null)
    this.emit('quit')
  })
}

Send.prototype._writeBuf = function (buf, enc, cb) {
  this._split(buf, cb)
}

Send.prototype._writeLine = function (buf, next) {
  const line = buf.toString()
  let m = null
  if (m = /^(\d+)-(.*)/.exec(line)) {
    this._code = Number(m[1])
    this._lines.push(m[2])
    this._lineBytes += Buffer.byteLength(m[2])
    if (this._lines.length > this._maxLines) {
      this._lines = []
      this._lineBytes = 0
      this._code = -1
      let cb = this._cb.shift() ?? noop
      cb(new Error('too many lines in reply'))
    }
  } else if (m = /^(\d+)\s(.*)/.exec(line)) {
    this._code = Number(m[1])
    this._lines.push(m[2])
    this._lineBytes += Buffer.byteLength(m[2])
    let msg = this._lines.join('\r\n')
    let code = this._code
    this._lines = []
    this._lineBytes = 0
    this._code = -1
    let cb = this._cb.shift() ?? noop
    cb(null, code, msg)
  } else {
    cb(new Error('no code in response line: ' + line))
  }
  next()
}

Send.prototype._send = function (msg, cb) {
  this._cb.push(cb)
  this._push(msg)
}

function noop() {}

function split(f) {
  let buffer = []
  let bufferLength = 0
  return function write(buf, cb) {
    ;(function next(i) {
      var j = buf.indexOf(0x0a,i)
      if (j < 0) return done(i)
      var line = null
      if (buffer.length > 0) {
        line = Buffer.concat(buffer.concat(buf.slice(i,j)))
        buffer = []
        bufferLength = 0
      } else {
        line = buf.slice(i,j)
      }
      f(line, (err) => {
        if (err) cb(err)
        else next(j+1)
      })
    })(0)
    function done(i) {
      if (i < buf.length-1) {
        buffer.push(buf.slice(i))
        bufferLength += buf.length-i
        if (bufferLength > 8192) {
          return cb(new Error('line too long'))
        }
      }
      cb()
    }
  }
}