You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

174 lines
5.8 KiB

"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""
# The various modules involved in asyncio need to call the original, unpatched
# standard library APIs to work: socket, select, threading, and so on. We
# therefore don't import them on the module level, since that would involve
# their imports getting patched, and instead delay importing them as much as
# possible. Then, we do a little song and dance in Hub.__init__ below so that
# when they're imported they import the original modules (select, socket, etc)
# rather than the patched ones.
import os
import sys
from eventlet.hubs import hub
from eventlet.patcher import _unmonkey_patch_asyncio_all
def is_available():
"""
Indicate whether this hub is available, since some hubs are
platform-specific.
Python always has asyncio, so this is always ``True``.
"""
return True
class Hub(hub.BaseHub):
"""An Eventlet hub implementation on top of an asyncio event loop."""
def __init__(self):
super().__init__()
# Pre-emptively make sure we're using the right modules:
_unmonkey_patch_asyncio_all()
# The presumption is that eventlet is driving the event loop, so we
# want a new one we control.
import asyncio
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.sleep_event = asyncio.Event()
import asyncio.events
if hasattr(asyncio.events, "on_fork"):
# Allow post-fork() child to continue using the same event loop.
# This is a terrible idea.
asyncio.events.on_fork.__code__ = (lambda: None).__code__
else:
# On Python 3.9-3.11, there's a thread local we need to reset.
# Also a terrible idea.
def re_register_loop(loop=self.loop):
asyncio.events._set_running_loop(loop)
os.register_at_fork(after_in_child=re_register_loop)
def add_timer(self, timer):
"""
Register a ``Timer``.
Typically not called directly by users.
"""
super().add_timer(timer)
self.sleep_event.set()
def _file_cb(self, cb, fileno):
"""
Callback called by ``asyncio`` when a file descriptor has an event.
"""
try:
cb(fileno)
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
self.sleep_event.set()
def add(self, evtype, fileno, cb, tb, mark_as_closed):
"""
Add a file descriptor of given event type to the ``Hub``. See the
superclass for details.
Typically not called directly by users.
"""
try:
os.fstat(fileno)
except OSError:
raise ValueError("Invalid file descriptor")
already_listening = self.listeners[evtype].get(fileno) is not None
listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
if not already_listening:
if evtype == hub.READ:
self.loop.add_reader(fileno, self._file_cb, cb, fileno)
else:
self.loop.add_writer(fileno, self._file_cb, cb, fileno)
return listener
def remove(self, listener):
"""
Remove a listener from the ``Hub``. See the superclass for details.
Typically not called directly by users.
"""
super().remove(listener)
evtype = listener.evtype
fileno = listener.fileno
if not self.listeners[evtype].get(fileno):
if evtype == hub.READ:
self.loop.remove_reader(fileno)
else:
self.loop.remove_writer(fileno)
def remove_descriptor(self, fileno):
"""
Remove a file descriptor from the ``asyncio`` loop.
Typically not called directly by users.
"""
have_read = self.listeners[hub.READ].get(fileno)
have_write = self.listeners[hub.WRITE].get(fileno)
super().remove_descriptor(fileno)
if have_read:
self.loop.remove_reader(fileno)
if have_write:
self.loop.remove_writer(fileno)
def run(self, *a, **kw):
"""
Start the ``Hub`` running. See the superclass for details.
"""
import asyncio
async def async_run():
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
while self.closed:
# We ditch all of these first.
self.close_one()
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
try:
await asyncio.wait_for(self.sleep_event.wait(), sleep_time)
except asyncio.TimeoutError:
pass
self.sleep_event.clear()
else:
await asyncio.sleep(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False
self.loop.run_until_complete(async_run())