all-in-one smtp+imap with minimal setup
git clone http://git.nthia.dev/nthmail
const { pipeline, Writable } = require('stream')
const Inbox = require('./inbox.js')
const Send = require('./send.js')
const Scheduler = require('./scheduler.js')
module.exports = Outbox
function Outbox(opts) {
if (!(this instanceof Outbox)) return new Outbox(opts)
this.box = new Inbox({ maildir: opts.outboxDir })
this.undelivered = new Inbox({ maildir: opts.undeliveredDir })
this.inbox = opts.inbox
this._scheduler = new Scheduler
this._tls = opts.tls
this._maxRetries = opts.maxRetries ?? 30
this._delay = opts.delay || function (n) {
if (n < 4) return 1000*60*5 // every 5 minutes
if (n < 7) return 1000*60*15 // every 15 minutes
return 1000*60*60 // every hour
}
}
Outbox.prototype.start = function (user, cb) {
let self = this
if (!cb) cb = noop
let errors = []
self.list(user, (err,files) => {
if (err) return cb(err)
;(function next(i) {
if (i >= files.length) return cb(errors.length > 0 ? errors : null)
let file = files[i].file
self.head(user, file, (err,buf) => {
if (err) {
errors.push(err)
return next(i+1)
}
let lines = buf.toString().split('\n')
let last = new Date, retry = 0
for (let i = 0; i < lines.length; i++) {
let line = lines[i]
if (m = /^last-attempt\s*:\s*([^\r\n]+)/i.exec(line)) {
last = new Date(m[1].trim())
} else if (m = /^retry\s*:\s*(-\d{3,}|\d{4,})/i.exec(line)) {
retry = Number(m[1])
}
}
if (retry < 0) return next(i+1) // deactivated
let ms = this._delay(retry) - (new Date().valueOf() - last.valueOf())
self._scheduler.push(ms, () => self.send(user, file))
next(i+1)
})
})(0)
})
}
Outbox.prototype.send = function (user, file, cb) {
if (!cb) cb = noop
this.head(user, file, (err,head) => {
if (err) return cb(err)
let lines = head.toString().split(/\r?\n/)
let to = null, from = null, domain = null, m = null
for (var i = 0; i < lines.length; i++) {
let line = lines[i]
if (m = /^to\s*:\s*(\S+)/i.exec(line)) {
to = m[1]
} else if (m = /^from\s*:\s*(\S+)/i.exec(line)) {
from = m[1]
domain = m[1].replace(/^[^@]*@/,'')
}
}
let opts = {
tls: this._tls(),
to, from, domain,
data: this.body(user, file),
}
new Send(opts, (err,sent,code) => {
if (sent) {
console.log(`outgoing success: to=${to} from=${from}`)
this.remove(user, file, err => {
if (err) cb(err)
else cb(null, true)
})
} else if (400 <= code && code < 500) { // transient negative completion, retry ok
console.log(`outgoing fail (transient): code=${code}`)
this._reschedule(user, file, err => {
if (err) cb(err)
else cb(null, false)
})
} else if (500 <= code && code < 600) { // permanent negative completion, retry not ok
console.log(`outgoing fail (permanent): code=${code}`)
// don't delete the message, leave in outbox instead
this._move(user, file, 'outbox', 'undelivered', (err) => {
if (err) cb(err)
else cb(null, false)
})
} else { // something strange
console.log(`outgoing fail: code=${code}`)
this._move(user, file, 'outbox', 'undelivered', (err) => {
if (err) cb(err)
else cb(null, false)
})
}
})
})
}
Outbox.prototype.write = function (user) {
let w = this.box.write(user)
w.once('finish', () => {
this.send(user, w.file)
})
return w
}
Outbox.prototype.read = function (user, file) {
return this.box.read(user, file)
}
Outbox.prototype.head = function (user, file, cb) {
return this.box.head(user, file, cb)
}
Outbox.prototype.body = function (user, file) {
return this.box.body(user, file)
}
Outbox.prototype.list = function (user, cb) {
this.box.list(user, cb)
}
Outbox.prototype.remove = function (user, file, cb) {
this.box.remove(user, file, cb)
}
Outbox.prototype._reschedule = function (user, file, cb) {
if (!cb) cb = noop
this.head(user, file, (err,buf) => {
if (err) return cb(err)
let m, lines = bufferLines(buf)
let lastIx = -1, last = null, retry = 0
for (let i = 0; i < lines.length; i++) {
let line = lines[i].toString()
if (m = /^last-attempt\s*:\s*([^\r\n]+)/i.exec(line)) {
lastIx = i
last = new Date(m[1])
} else if (m = /^retry\s*:\s*(-\d{3,}|\d{4,})/i.exec(line)) {
retry = Number(m[1])
if (retry < 0) return cb(null, 'deactivated')
retry++
if (retry > this._maxRetries) {
return this._move(user, file, 'outbox', 'undelivered', (err) => {
if (err) cb(err)
else cb(null, 'undelivered')
})
} else {
lines[i].write(String(retry).padStart(4,'0'), m[0].length-4)
let k = /\r\n$/.test(line) ? 2 : 1
lines[i].write(Array(buf.length-m[0].length-k).fill(' ').join(''), m[0].length)
}
}
}
buf = Buffer.concat(lines)
this.box.rawWrite(user, file, buf, 0, buf.length, 0, (err) => {
if (err) return cb(err)
if (lastIx >= 0) {
let ms = this._delay(retry) - (new Date().valueOf() - last.valueOf())
console.log(
'retry=', retry,
'delay=', this._delay(retry),
'last=', last,
'last.valueOf()=', last.valueOf(),
'new Date().valueOf()=', new Date().valueOf()
)
let date = new Date(new Date().valueOf() + ms)
lines[lastIx].write(date.toISOString(), m[0].length - m[1].length)
let k = /\r\n$/.test(lines[lastIx]) ? 2 : 1
lines[lastIx].write(Array(buf.length-m[0].length-k).fill(' ').join(''), m[0].length)
console.log('SCHEDULER.push',ms,user,file)
ms = Math.max(1000,ms)
this._scheduler.push(ms, () => this.send(user, file))
}
cb(null)
})
})
}
Outbox.prototype.close = function () {
this._scheduler.close()
}
Outbox.prototype._move = function (user, file, src, dst, cb) {
let srcFile = null, dstFile = null
if (src === 'inbox') srcFile = this.inbox.path(user, file)
if (src === 'outbox') srcFile = this.box.path(user, file)
if (src === 'undelivered') srcFile = this.undelivered.path(user, file)
if (dst === 'inbox') dstFile = this.inbox.path(user, file)
if (dst === 'outbox') dstFile = this.box.path(user, file)
if (dst === 'undelivered') dstFile = this.undelivered.path(user, file)
if (src === dst || srcFile === dstFile) {
process.nextTick(cb, new Error('src is same as dst in move'))
} else if (!srcFile) {
process.nextTick(cb, new Error('unsupported src type in move: ' + src))
} else if (!dstFile) {
process.nextTick(cb, new Error('unsupported dst type in move: ' + dst))
} else {
fs.rename(srcFile, dstFile, cb)
}
}
function bufferLines(buf) {
let buffers = []
let i = 0, j = 0
for (; i < buf.length; i++) {
if (buf[i] === 0x0a) {
buffers.push(buf.slice(j,i+1))
j = i+1
}
}
if (j < buf.length) buffers.push(buf.slice(j))
return buffers
}
function noop() {}