Source code for bonzo.server

"""A non-blocking, single-threaded SMTP server."""
import email
import inspect
import socket
import sys

from tornado.escape import to_unicode, utf8
from tornado.ioloop import IOLoop
from tornado.log import app_log, gen_log
from tornado.tcpserver import TCPServer

from bonzo import errors, version

CRLF = '\r\n'


[docs] class SMTPServer(TCPServer): """A non-blocking, single-threaded SMTP server. A server is defined by a request callback that takes an instance of :class:`~bonzo.server.SMTPRequest` as an argument. A simple example server that handles the request with the received message: .. code:: python import asyncio from bonzo.server import SMTPServer async def handle_request(request): do_something_with_the_message(request.message) await request.finish_async() async def main(): smtp_server = SMTPServer(handle_request) smtp_server.listen(2525) await asyncio.Event().wait() asyncio.run(main()) """ def __init__(self, request_callback, **kwargs): self.request_callback = request_callback TCPServer.__init__(self, **kwargs)
[docs] async def handle_stream(self, stream, address): """Handles the stream by executing the request callback.""" connection = SMTPConnection(stream, address, self.request_callback) await connection.run()
[docs] class SMTPConnection(object): """Handles a connection to an SMTP client, executing SMTP commands. This class uses its :attr:`COMMAND` and :attr:`DATA` attributes as a simple "enum" to manage the connection state. """ COMMAND = 0 """Used to set the state to receive any command.""" DATA = 1 """Used to set the state to receive data.""" def __init__(self, stream, address, request_callback): self.stream = stream self.address = address self.request_callback = request_callback self.__hostname = None self.__data = None self._close_callback = None self._waiting_for_request = None self.reset_arguments() if self.stream.socket.family in (socket.AF_INET, socket.AF_INET6): self.remote_ip = self.address[0] else: # Unix (or other) socket; fake the remote address self.remote_ip = '0.0.0.0' # pragma: no cover self.stream.set_close_callback(self._on_connection_close)
[docs] async def run(self): """Runs the SMTP command loop for this connection.""" try: await self.write('220 Bonzo SMTP Server %s' % version) delimiter = CRLF while not self.stream.closed(): line = await self.stream.read_until(utf8(delimiter)) delimiter = await self._handle_line(line) except Exception as e: if not self.stream.closed(): await self._handle_request_exception(e) finally: self._clear_request_state()
def reset_arguments(self): self.__state = self.COMMAND self.__mail = None self.__rcpt = [] def _clear_request_state(self): """Clears the per-request state.""" self.reset_arguments() self._close_callback = None if (self._waiting_for_request is not None and not self._waiting_for_request.done()): self._waiting_for_request.set_result(None) self._waiting_for_request = None
[docs] def set_close_callback(self, callback): """Sets a callback that will be run when the connection is closed.""" self._close_callback = callback
def _on_connection_close(self): if self._close_callback is not None: callback = self._close_callback self._close_callback = None callback() if (self._waiting_for_request is not None and not self._waiting_for_request.done()): self._waiting_for_request.set_result(None)
[docs] def close(self): """Close the stream.""" self.stream.close() self._clear_request_state()
[docs] async def write(self, chunk): """Writes a chunk of output to the stream.""" if not self.stream.closed(): await self.stream.write(utf8(chunk + CRLF))
[docs] async def write_ok(self, message='Ok'): """Writes a successful ``250`` status response.""" await self.write('%s %s' % (250, message))
[docs] def finish(self): """Finishes the request by closing the connection.""" self.close()
async def _handle_line(self, line): try: if self.__state == self.COMMAND: line = to_unicode(line)[:-2] # Remove delimiter '\r\n' if not line.strip(): raise errors.UnrecognisedCommand() i = line.find(' ') if i < 0: command = line arg = None else: command = line[:i] arg = line[i + 1:].strip() method = getattr(self, 'command_' + command.lower(), None) if not method: raise errors.NotImplementedCommand(command) result = method(arg) if inspect.isawaitable(result): result = await result return result or CRLF elif self.__state == self.DATA: line = to_unicode(line)[:-5] # Remove delimiter '\r\n.\r\n' data = [] for text in line.split(CRLF): if text and text[0] == '.': data.append(text[1:]) else: data.append(text) await self._on_data('\n'.join(data)) return CRLF else: raise errors.InternalConfusion() except Exception as e: await self._handle_request_exception(e) return CRLF def _request_summary(self): return '' def log_exception(self, typ, value, tb): if isinstance(value, errors.SMTPError): if value.log_message: _format = '%d %s' + value.log_message args = ([value.status_code, self._request_summary()] + list(value.args)) gen_log.warning(_format, *args) else: app_log.error('Uncaught exception %s', self._request_summary(), exc_info=(typ, value, tb)) async def _handle_request_exception(self, e): self.log_exception(*sys.exc_info()) if not isinstance(e, errors.SMTPError): e = errors.InternalConfusion() await self.write('%d %s' % (e.status_code, e.message)) def __getaddr(self, keyword, arg): address = None keylen = len(keyword) if arg[:keylen].upper() == keyword: address = arg[keylen:].strip() if not address: pass elif address[0] == '<' and address[-1] == '>' and address != '<>': # Addresses can be in the form <person@dom.com> but watch out # for null address, e.g. <> address = address[1:-1] return address
[docs] async def command_helo(self, arg): """Handles the ``HELO`` SMTP command.""" if not arg: raise errors.BadArguments('HELO hostname') if self.__hostname: raise errors.BadSequence('Duplicate HELO/EHLO') self.__hostname = arg await self.write('250 Hello %s' % self.remote_ip)
[docs] async def command_noop(self, arg): """Handles the ``NOOP`` SMTP command.""" if arg: raise errors.BadArguments('NOOP') await self.write_ok()
[docs] async def command_quit(self, arg): """Handles the ``QUIT`` SMTP command.""" await self.write('221 Bye') self.finish()
[docs] async def command_mail(self, arg): """Handles the ``MAIL`` SMTP command.""" if not self.__hostname: raise errors.BadSequence('Error: need HELO command') address = self.__getaddr('FROM:', arg) if arg else None if not address: raise errors.BadArguments('MAIL FROM:<address>') if self.__mail: raise errors.BadSequence('Error: nested MAIL command') self.__mail = address await self.write_ok()
[docs] async def command_rcpt(self, arg): """Handles the ``RCPT`` SMTP command.""" if not self.__mail: raise errors.BadSequence('Error: need MAIL command') address = self.__getaddr('TO:', arg) if arg else None if not address: raise errors.BadArguments('RCPT TO:<address>') self.__rcpt.append(address) await self.write_ok()
[docs] async def command_rset(self, arg): """Handles the ``RSET`` SMTP command.""" if arg: raise errors.BadArguments('RSET') self.reset_arguments() await self.write_ok()
[docs] async def command_data(self, arg): """Handles the ``DATA`` SMTP command.""" if not self.__rcpt: raise errors.BadSequence('Error: need RCPT command') if arg: raise errors.BadArguments('DATA') self.__state = self.DATA await self.write('354 End data with <CR><LF>.<CR><LF>') return '{0}.{0}'.format(CRLF)
async def _on_data(self, data): self.__data = data request = SMTPRequest(self, self.remote_ip, 'DATA', hostname=self.__hostname, mail=self.__mail, rcpt=self.__rcpt, data=self.__data) finish_waiter = self.wait_for_request_finish() result = self.request_callback(request) if inspect.isawaitable(result): await result await finish_waiter async def request_finished(self): self.reset_arguments() await self.write_ok() def wait_for_request_finish(self): if self._waiting_for_request is None or self._waiting_for_request.done(): loop = IOLoop.current().asyncio_loop self._waiting_for_request = loop.create_future() return self._waiting_for_request def notify_request_finished(self): if (self._waiting_for_request is not None and not self._waiting_for_request.done()): self._waiting_for_request.set_result(None)
[docs] class SMTPRequest(object): """A single SMTP request. """ def __init__(self, connection, remote_ip, command, hostname=None, mail=None, rcpt=None, data=None): self.connection = connection self.remote_ip = remote_ip self.command = command self.hostname = hostname self.mail = mail self.rcpt = rcpt or [] self.data = data @property def message(self): """Returns an instance of a subclass from the :class:`email.mime.base.MIMEBase` class. It's actually parsed from the data received using the :meth:`~email.message_from_string` method. """ if not hasattr(self, '_message'): self._message = email.message_from_string(self.data) return self._message
[docs] async def finish_async(self): """Writes a successful response to the connection.""" try: await self.connection.request_finished() finally: self.connection.notify_request_finished()
[docs] def finish(self): """Writes a successful response to the connection.""" IOLoop.current().spawn_callback(self.finish_async)
def __repr__(self): attrs = ('remote_ip', 'hostname', 'mail', 'rcpt') args = ', '.join(["%s='%s'" % (n, getattr(self, n)) for n in attrs]) return '%s (%s)' % (self.__class__.__name__, args)