Latest posts for tag hacks
gitpython: list all files in a git commit
A little gitpython recipe to list the paths of all files in a commit:
#!/usr/bin/python3
import git
from pathlib import Path
import sys
def list_paths(root_tree, path=Path(".")):
for blob in root_tree.blobs:
yield path / blob.name
for tree in root_tree.trees:
yield from list_paths(tree, path / tree.name)
repo = git.Repo(".", search_parent_directories=True)
commit = repo.commit(sys.argv[1])
for path in list_paths(commit.tree):
print(path)
It can be a good base, for example, for writing a script that, given two git branches, shows which django migrations are in one and not in the other, without doing any git checkout of the code.
Starting tornado on a random free port
One of the software I maintain for work is a GUI data browser that uses Tornado as a backend and a web browser as a front-end.
It is quite convenient to start the command and have the browser open automatically on the right URL. It's quite annoying to start the command and be told that the default port is already in use.
I've needed this trick quite often, also when writing unit tests, and it's time I note it down somewhere, so it's easier to find than going through Tornado's unittest code where I found it the first time.
This is how to start Tornado on a free random port:
from tornado.options import define, options
import tornado.netutil
import tornado.httpserver
define("web_port", type=int, default=None, help="listening port for web interface")
application = Application(self.db_url)
if options.web_port is None:
sockets = tornado.netutil.bind_sockets(0, '127.0.0.1')
self.web_port = sockets[0].getsockname()[:2][1]
server = tornado.httpserver.HTTPServer(application)
server.add_sockets(sockets)
else:
server = tornado.httpserver.HTTPServer(application)
server.listen(options.web_port)
Getting rusage of child processes on python asyncio
I am writing a little application server for microservices written as compiled
binaries, and I would like to log execution statistics from getrusage(2)
.
The application server is written using asyncio, and processes are managed using asyncio subprocesses.
Unfortunately, asyncio uses os.waitpid
instead of os.wait4
to reap child
processes, and to get rusage information one has to delve into the asyncio
innards, and provide a custom ChildWatcher
implementation. Here's how I did
it:
import asyncio
from asyncio.log import logger
from contextlib import contextmanager
import os
class ExtendedResults:
def __init__(self):
self.rusage = None
self.returncode = None
class SafeChildWatcherWithRusage(asyncio.SafeChildWatcher):
"""
SafeChildWatcher that uses os.wait4 to also get rusage information.
"""
rusage_results = {}
@classmethod
@contextmanager
def monitor(cls, proc):
"""
Return an ExtendedResults that gets filled when the process exits
"""
assert proc.pid > 0
pid = proc.pid
extended_results = ExtendedResults()
cls.rusage_results[pid] = extended_results
try:
yield extended_results
finally:
cls.rusage_results.pop(pid, None)
def _do_waitpid(self, expected_pid):
# The original is in asyncio/unix_events.py; on new python versions, it
# makes sense to check changes to it and port them here
assert expected_pid > 0
try:
pid, status, rusage = os.wait4(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
else:
if pid == 0:
# The child process is still alive.
return
returncode = self._compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
extended_results = self.rusage_results.get(pid)
if extended_results is not None:
extended_results.rusage = rusage
extended_results.returncode = returncode
try:
callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
if self._loop.get_debug():
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
callback(pid, returncode, *args)
@classmethod
def install(cls):
loop = asyncio.get_event_loop()
child_watcher = cls()
child_watcher.attach_loop(loop)
asyncio.set_child_watcher(child_watcher)
To use it:
from .hacks import SafeChildWatcherWithRusage
SafeChildWatcherWithRusage.install()
...
@coroutine
def run(self, *args, **kw):
kw["stdin"] = asyncio.subprocess.PIPE
kw["stdout"] = asyncio.subprocess.PIPE
kw["stderr"] = asyncio.subprocess.PIPE
self.started = time.time()
self.proc = yield from asyncio.create_subprocess_exec(*args, **kw)
from .hacks import SafeChildWatcherWithRusage
with SafeChildWatcherWithRusage.monitor(self.proc) as results:
yield from asyncio.tasks.gather(
self.write_stdin(self.proc.stdin),
self.read_stdout(self.proc.stdout),
self.read_stderr(self.proc.stderr)
)
self.returncode = yield from self.proc.wait()
self.rusage = results.rusage
self.ended = time.time()
Serving debian-distributed javascript libraries in Tornado
Debian conveniently distribute JavaScript libraries, and expects packaged software to use them rather than embedding their own copy.
Here is a convenient custom StaticFileHandler for Tornado that looks for the Debian-distributed versions of JavaScript libraries, and falls back to the vendored versions if they are not found:
from tornado import web
import pathlib
class StaticFileHandler(web.StaticFileHandler):
"""
StaticFileHandler that allows overriding paths in the static directory with
system provided versions
"""
SYSTEM_ASSET_PATH = pathlib.Path("/usr/share/javascript")
@classmethod
def get_absolute_path(self, root, path):
path = pathlib.PurePath(path)
if not path.parts:
return super().get_absolute_path(root, path)
system_dir = self.SYSTEM_ASSET_PATH.joinpath(path.parts[0])
if system_dir.is_dir():
# If that asset directory exists in the system, look for things in
# there
return self.SYSTEM_ASSET_PATH.joinpath(path)
else:
# Else go ahead with the default static dir
return super().get_absolute_path(root, path)
def validate_absolute_path(self, root, absolute_path):
"""
Rewrite of tornado's validate_absolute_path not to raise an error for
paths in /usr/share/javascript/
"""
root = pathlib.Path(root)
absolute_path = pathlib.Path(absolute_path)
is_system_root = absolute_path.parts[:len(self.SYSTEM_ASSET_PATH.parts)] == self.SYSTEM_ASSET_PATH.parts
is_static_root = absolute_path.parts[:len(root.parts)] == root.parts
if not is_system_root and not is_static_root:
raise web.HTTPError(403, "%s is not in root static directory or system assets path",
self.path)
if absolute_path.is_dir() and self.default_filename is not None:
# need to look at the request.path here for when path is empty
# but there is some prefix to the path that was already
# trimmed by the routing
if not self.request.path.endswith("/"):
self.redirect(self.request.path + "/", permanent=True)
return
absolute_path = absolute_path.joinpath(self.default_filename)
if not absolute_path.exists():
raise web.HTTPError(404)
if not absolute_path.is_file():
raise web.HTTPError(403, "%s is not a file", self.path)
return str(absolute_path)
This is how to use it:
class DebianApplication(tornado.web.Application):
def __init__(self, *args, **settings):
from .static import StaticFileHandler
settings.setdefault("static_handler_class", StaticFileHandler)
super().__init__(*args, **settings)
And from HTML it's simply a matter of matching the first path component to what
is used by Debian's packages under /usr/share/javascript
:
<link rel="stylesheet" href="{{static_url('bootstrap4/css/bootstrap.min.css')}}">
<script src="{{static_url('jquery/jquery.min.js')}}"></script>
<script src="{{static_url('popper.js/umd/popper.min.js')}}"></script>
<script src="{{static_url('bootstrap4/js/bootstrap.min.js')}}"></script>
I find it quite convenient: this way I can start writing prototype code without worrying about fetching javascript libraries to bundle.
I only need to start worrying about it if I need to deploy outside of Debian,
or to old stable versions of Debian that don't contain the required JavaScript
dependencies. In that case, I just cp -r
from a working
/usr/share/javascript
into Tornado's static directory, and I'm done.
Python hacks: opening a compressed mailbox
Python mailbox.mbox
is not good at opening compressed mailboxes:
>>> import mailbox
>>> print(len(mailbox.mbox("/tmp/test.mbox")))
9
>>> print(len(mailbox.mbox("/tmp/test.mbox.gz")))
0
>>> print(len(mailbox.mbox("/tmp/test1.mbox.xz")))
0
For a prototype rewrite of the MIA team's Echelon (the engine behind mia-query), I needed to scan compressed mailboxes, and I had to work around this limitation.
Here is the alternative mailbox.mbox
implementation:
import lzma
import gzip
import bz2
import mailbox
class StreamMbox(mailbox.mbox):
"""
mailbox.mbox does not support opening a stream, which is sad.
This is a subclass that works around it
"""
def __init__(self, fd: BinaryIO, factory=None, create: bool = True):
# Do not call parent __init__, just redo everything here to be able to
# open a stream. This will need to be re-reviewed for every new version
# of python's stdlib.
# Mailbox constructor
self._path = None
self._factory = factory
# _singlefileMailbox constructor
self._file = fd
self._toc = None
self._next_key = 0
self._pending = False # No changes require rewriting the file.
self._pending_sync = False # No need to sync the file
self._locked = False
self._file_length = None # Used to record mailbox size
# mbox constructor
self._message_factory = mailbox.mboxMessage
def flush(self):
raise NotImplementedError("StreamMbox is a readonly class")
class UsageExample:
DECOMPRESS = {
".xz": lzma.open,
".gz": gzip.open,
".bz2": bz2.open,
}
@classmethod
def scan(cls, path: Path) -> Generator[ScannedEmail, None, None]:
decompress = cls.DECOMPRESS.get(path.suffix)
if decompress is None:
with open(path.as_posix(), "rb") as fd:
yield from cls.scan_fd(path, fd)
else:
with decompress(path.as_posix(), "rb") as fd:
yield from cls.scan_fd(path, fd)
@classmethod
def scan_fd(cls, path: Path, fd: BinaryIO) -> Generator[ScannedEmail, None, None]:
mbox = StreamMbox(fd)
for msg in mbox:
...