diff options
Diffstat (limited to 'lib/portage/util')
-rw-r--r-- | lib/portage/util/_async/SchedulerInterface.py | 9 | ||||
-rw-r--r-- | lib/portage/util/_eventloop/asyncio_event_loop.py | 44 | ||||
-rw-r--r-- | lib/portage/util/futures/_asyncio/__init__.py | 28 | ||||
-rw-r--r-- | lib/portage/util/socks5.py | 42 |
4 files changed, 105 insertions, 18 deletions
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py index 43a42adff..485958491 100644 --- a/lib/portage/util/_async/SchedulerInterface.py +++ b/lib/portage/util/_async/SchedulerInterface.py @@ -1,4 +1,4 @@ -# Copyright 2012-2021 Gentoo Authors +# Copyright 2012-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import gzip @@ -49,6 +49,13 @@ class SchedulerInterface(SlotObject): for k in self._event_loop_attrs: setattr(self, k, getattr(event_loop, k)) + @property + def _loop(self): + """ + Returns the real underlying asyncio loop. + """ + return self._event_loop._loop + @staticmethod def _return_false(): return False diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py index b9e96bb20..ee9e4c60e 100644 --- a/lib/portage/util/_eventloop/asyncio_event_loop.py +++ b/lib/portage/util/_eventloop/asyncio_event_loop.py @@ -1,8 +1,9 @@ -# Copyright 2018-2023 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import os import signal +import threading import asyncio as _real_asyncio from asyncio.events import AbstractEventLoop as _AbstractEventLoop @@ -14,6 +15,7 @@ except ImportError: PidfdChildWatcher = None import portage +from portage.util import socks5 class AsyncioEventLoop(_AbstractEventLoop): @@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop): def __init__(self, loop=None): loop = loop or _real_asyncio.get_event_loop() self._loop = loop - self.run_until_complete = ( - self._run_until_complete - if portage._internal_caller - else loop.run_until_complete - ) + self.run_until_complete = self._run_until_complete self.call_soon = loop.call_soon self.call_soon_threadsafe = loop.call_soon_threadsafe self.call_later = loop.call_later self.call_at = loop.call_at self.is_running = loop.is_running self.is_closed = loop.is_closed - self.close = loop.close + self.close = self._close self.create_future = ( loop.create_future if hasattr(loop, "create_future") @@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop): self.get_debug = loop.get_debug self._wakeup_fd = -1 self._child_watcher = None + # Used to drop recursive calls to _close. + self._closing = False + # Initialized in _run_until_complete. + self._is_main = None if portage._internal_caller: loop.set_exception_handler(self._internal_caller_exception_handler) + def _close(self): + """ + Before closing the main loop, run portage.process.run_exitfuncs() + with the event loop running so that anything attached can clean + itself up (like the socks5 ProxyManager for bug 925240). + """ + if not (self._closing or self.is_closed()): + self._closing = True + if self._is_main: + self.run_until_complete(self._close_main()) + self._loop.close() + self._closing = False + + async def _close_main(self): + # Even though this has an exit hook, invoke it here so that + # we can properly wait for it and avoid messages like this: + # [ERROR] Task was destroyed but it is pending! + if socks5.proxy.is_running(): + await socks5.proxy.stop() + + portage.process.run_exitfuncs() + @staticmethod def _internal_caller_exception_handler(loop, context): """ @@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop): In order to avoid potential interference with API consumers, this implementation is only used when portage._internal_caller is True. """ + if self._is_main is None: + self._is_main = threading.current_thread() is threading.main_thread() + + if not portage._internal_caller: + return self._loop.run_until_complete(future) + if self._wakeup_fd != -1: signal.set_wakeup_fd(self._wakeup_fd) self._wakeup_fd = -1 diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index 22241f335..4eecc46a8 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -325,13 +325,37 @@ def _safe_loop(): def _get_running_loop(): + """ + This calls the real asyncio get_running_loop() and wraps that with + portage's internal AsyncioEventLoop wrapper. If there is no running + asyncio event loop but portage has a reference to another running + loop in this thread, then use that instead. + + This behavior enables portage internals to use the real asyncio.run + while remaining compatible with internal code that does not use the + real asyncio.run. + """ + try: + _loop = _real_asyncio.get_running_loop() + except RuntimeError: + _loop = None + with _thread_weakrefs.lock: if _thread_weakrefs.pid == portage.getpid(): try: loop = _thread_weakrefs.loops[threading.get_ident()] except KeyError: - return None - return loop if loop.is_running() else None + pass + else: + if _loop is loop._loop: + return loop + elif _loop is None: + return loop if loop.is_running() else None + + # If _loop it not None here it means it was probably a temporary + # loop created by asyncio.run, so we don't try to cache it, and + # just return a temporary wrapper. + return None if _loop is None else _AsyncioEventLoop(loop=_loop) def _thread_weakrefs_atexit(): diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py index 6c68ff410..f8fcdf9fc 100644 --- a/lib/portage/util/socks5.py +++ b/lib/portage/util/socks5.py @@ -2,15 +2,23 @@ # Copyright 2015-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import asyncio import errno import os import socket +from typing import Union + +import portage + +portage.proxy.lazyimport.lazyimport( + globals(), + "portage.util._eventloop.global_event_loop:global_event_loop", +) import portage.data from portage import _python_interpreter from portage.data import portage_gid, portage_uid, userpriv_groups from portage.process import atexit_register, spawn -from portage.util.futures import asyncio class ProxyManager: @@ -57,23 +65,41 @@ class ProxyManager: **spawn_kwargs, ) - def stop(self): + def stop(self) -> Union[None, asyncio.Future]: """ Stop the SOCKSv5 server. + + If there is a running asyncio event loop then asyncio.Future is + returned which should be used to wait for the server process + to exit. """ + future = None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None if self._proc is not None: self._proc.terminate() - loop = asyncio.get_event_loop() - if self._proc_waiter is None: - self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop) - if loop.is_running(): - self._proc_waiter.add_done_callback(lambda future: future.result()) + if loop is None: + # In this case spawn internals would have used + # portage's global loop when attaching a waiter to + # self._proc, so we are obligated to use that. + global_event_loop().run_until_complete(self._proc.wait()) else: - loop.run_until_complete(self._proc_waiter) + if self._proc_waiter is None: + self._proc_waiter = asyncio.ensure_future( + self._proc.wait(), loop=loop + ) + future = asyncio.shield(self._proc_waiter) + + if loop is not None and future is None: + future = loop.create_future() + future.set_result(None) self.socket_path = None self._proc = None self._proc_waiter = None + return future def is_running(self): """ |