nthmail / lib / outbox.js
all-in-one smtp+imap with minimal setup
git clone http://git.nthia.dev/nthmail

const { pipeline, Writable } = require('stream')
const Inbox = require('./inbox.js')
const Send = require('./send.js')
const Scheduler = require('./scheduler.js')

module.exports = Outbox

function Outbox(opts) {
  if (!(this instanceof Outbox)) return new Outbox(opts)
  this.box = new Inbox({ maildir: opts.outboxDir })
  this.undelivered = new Inbox({ maildir: opts.undeliveredDir })
  this.inbox = opts.inbox
  this._scheduler = new Scheduler
  this._tls = opts.tls
  this._maxRetries = opts.maxRetries ?? 30
  this._delay = opts.delay || function (n) {
    if (n < 4) return 1000*60*5 // every 5 minutes
    if (n < 7) return 1000*60*15 // every 15 minutes
    return 1000*60*60 // every hour
  }
}

Outbox.prototype.start = function (user, cb) {
  let self = this
  if (!cb) cb = noop
  let errors = []
  self.list(user, (err,files) => {
    if (err) return cb(err)
    ;(function next(i) {
      if (i >= files.length) return cb(errors.length > 0 ? errors : null)
      let file = files[i].file
      self.head(user, file, (err,buf) => {
        if (err) {
          errors.push(err)
          return next(i+1)
        }
        let lines = buf.toString().split('\n')
        let last = new Date, retry = 0
        for (let i = 0; i < lines.length; i++) {
          let line = lines[i]
          if (m = /^last-attempt\s*:\s*([^\r\n]+)/i.exec(line)) {
            last = new Date(m[1].trim())
          } else if (m = /^retry\s*:\s*(-\d{3,}|\d{4,})/i.exec(line)) {
            retry = Number(m[1])
          }
        }
        if (retry < 0) return next(i+1) // deactivated
        let ms = this._delay(retry) - (new Date().valueOf() - last.valueOf())
        self._scheduler.push(ms, () => self.send(user, file))
        next(i+1)
      })
    })(0)
  })
}

Outbox.prototype.send = function (user, file, cb) {
  if (!cb) cb = noop
  this.head(user, file, (err,head) => {
    if (err) return cb(err)
    let lines = head.toString().split(/\r?\n/)
    let to = null, from = null, domain = null, m = null
    for (var i = 0; i < lines.length; i++) {
      let line = lines[i]
      if (m = /^to\s*:\s*(\S+)/i.exec(line)) {
        to = m[1]
      } else if (m = /^from\s*:\s*(\S+)/i.exec(line)) {
        from = m[1]
        domain = m[1].replace(/^[^@]*@/,'')
      }
    }
    let opts = {
      tls: this._tls(),
      to, from, domain,
      data: this.body(user, file),
    }
    new Send(opts, (err,sent,code) => {
      if (sent) {
        console.log(`outgoing success: to=${to} from=${from}`)
        this.remove(user, file, err => {
          if (err) cb(err)
          else cb(null, true)
        })
      } else if (400 <= code && code < 500) { // transient negative completion, retry ok
        console.log(`outgoing fail (transient): code=${code}`)
        this._reschedule(user, file, err => {
          if (err) cb(err)
          else cb(null, false)
        })
      } else if (500 <= code && code < 600) { // permanent negative completion, retry not ok
        console.log(`outgoing fail (permanent): code=${code}`)
        // don't delete the message, leave in outbox instead
        this._move(user, file, 'outbox', 'undelivered', (err) => {
          if (err) cb(err)
          else cb(null, false)
        })
      } else { // something strange
        console.log(`outgoing fail: code=${code}`)
        this._move(user, file, 'outbox', 'undelivered', (err) => {
          if (err) cb(err)
          else cb(null, false)
        })
      }
    })
  })
}

Outbox.prototype.write = function (user) {
  let w = this.box.write(user)
  w.once('finish', () => {
    this.send(user, w.file)
  })
  return w
}

Outbox.prototype.read = function (user, file) {
  return this.box.read(user, file)
}
Outbox.prototype.head = function (user, file, cb) {
  return this.box.head(user, file, cb)
}
Outbox.prototype.body = function (user, file) {
  return this.box.body(user, file)
}
Outbox.prototype.list = function (user, cb) {
  this.box.list(user, cb)
}
Outbox.prototype.remove = function (user, file, cb) {
  this.box.remove(user, file, cb)
}
Outbox.prototype._reschedule = function (user, file, cb) {
  if (!cb) cb = noop
  this.head(user, file, (err,buf) => {
    if (err) return cb(err)
    let m, lines = bufferLines(buf)
    let lastIx = -1, last = null, retry = 0
    for (let i = 0; i < lines.length; i++) {
      let line = lines[i].toString()
      if (m = /^last-attempt\s*:\s*([^\r\n]+)/i.exec(line)) {
        lastIx = i
        last = new Date(m[1])
      } else if (m = /^retry\s*:\s*(-\d{3,}|\d{4,})/i.exec(line)) {
        retry = Number(m[1])
        if (retry < 0) return cb(null, 'deactivated')
        retry++
        if (retry > this._maxRetries) {
          return this._move(user, file, 'outbox', 'undelivered', (err) => {
            if (err) cb(err)
            else cb(null, 'undelivered')
          })
        } else {
          lines[i].write(String(retry).padStart(4,'0'), m[0].length-4)
          let k = /\r\n$/.test(line) ? 2 : 1
          lines[i].write(Array(buf.length-m[0].length-k).fill(' ').join(''), m[0].length)
        }
      }
    }
    buf = Buffer.concat(lines)
    this.box.rawWrite(user, file, buf, 0, buf.length, 0, (err) => {
      if (err) return cb(err)
      if (lastIx >= 0) {
        let ms = this._delay(retry) - (new Date().valueOf() - last.valueOf())
        console.log(
          'retry=', retry,
          'delay=', this._delay(retry),
          'last=', last,
          'last.valueOf()=', last.valueOf(),
          'new Date().valueOf()=', new Date().valueOf()
        )
        let date = new Date(new Date().valueOf() + ms)
        lines[lastIx].write(date.toISOString(), m[0].length - m[1].length)
        let k = /\r\n$/.test(lines[lastIx]) ? 2 : 1
        lines[lastIx].write(Array(buf.length-m[0].length-k).fill(' ').join(''), m[0].length)
        console.log('SCHEDULER.push',ms,user,file)
        ms = Math.max(1000,ms)
        this._scheduler.push(ms, () => this.send(user, file))
      }
      cb(null)
    })
  })
}

Outbox.prototype.close = function () {
  this._scheduler.close()
}

Outbox.prototype._move = function (user, file, src, dst, cb) {
  let srcFile = null, dstFile = null
  if (src === 'inbox') srcFile = this.inbox.path(user, file)
  if (src === 'outbox') srcFile = this.box.path(user, file)
  if (src === 'undelivered') srcFile = this.undelivered.path(user, file)
  if (dst === 'inbox') dstFile = this.inbox.path(user, file)
  if (dst === 'outbox') dstFile = this.box.path(user, file)
  if (dst === 'undelivered') dstFile = this.undelivered.path(user, file)
  if (src === dst || srcFile === dstFile) {
    process.nextTick(cb, new Error('src is same as dst in move'))
  } else if (!srcFile) {
    process.nextTick(cb, new Error('unsupported src type in move: ' + src))
  } else if (!dstFile) {
    process.nextTick(cb, new Error('unsupported dst type in move: ' + dst))
  } else {
    fs.rename(srcFile, dstFile, cb)
  }
}

function bufferLines(buf) {
  let buffers = []
  let i = 0, j = 0
  for (; i < buf.length; i++) {
    if (buf[i] === 0x0a) {
      buffers.push(buf.slice(j,i+1))
      j = i+1
    }
  }
  if (j < buf.length) buffers.push(buf.slice(j))
  return buffers
}

function noop() {}