aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/portage/util')
-rw-r--r--lib/portage/util/_async/SchedulerInterface.py9
-rw-r--r--lib/portage/util/_eventloop/asyncio_event_loop.py44
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py28
-rw-r--r--lib/portage/util/socks5.py42
4 files changed, 105 insertions, 18 deletions
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py
index 43a42adff..485958491 100644
--- a/lib/portage/util/_async/SchedulerInterface.py
+++ b/lib/portage/util/_async/SchedulerInterface.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Gentoo Authors
+# Copyright 2012-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import gzip
@@ -49,6 +49,13 @@ class SchedulerInterface(SlotObject):
for k in self._event_loop_attrs:
setattr(self, k, getattr(event_loop, k))
+ @property
+ def _loop(self):
+ """
+ Returns the real underlying asyncio loop.
+ """
+ return self._event_loop._loop
+
@staticmethod
def _return_false():
return False
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index b9e96bb20..ee9e4c60e 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,8 +1,9 @@
-# Copyright 2018-2023 Gentoo Authors
+# Copyright 2018-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
import signal
+import threading
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
@@ -14,6 +15,7 @@ except ImportError:
PidfdChildWatcher = None
import portage
+from portage.util import socks5
class AsyncioEventLoop(_AbstractEventLoop):
@@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop):
def __init__(self, loop=None):
loop = loop or _real_asyncio.get_event_loop()
self._loop = loop
- self.run_until_complete = (
- self._run_until_complete
- if portage._internal_caller
- else loop.run_until_complete
- )
+ self.run_until_complete = self._run_until_complete
self.call_soon = loop.call_soon
self.call_soon_threadsafe = loop.call_soon_threadsafe
self.call_later = loop.call_later
self.call_at = loop.call_at
self.is_running = loop.is_running
self.is_closed = loop.is_closed
- self.close = loop.close
+ self.close = self._close
self.create_future = (
loop.create_future
if hasattr(loop, "create_future")
@@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop):
self.get_debug = loop.get_debug
self._wakeup_fd = -1
self._child_watcher = None
+ # Used to drop recursive calls to _close.
+ self._closing = False
+ # Initialized in _run_until_complete.
+ self._is_main = None
if portage._internal_caller:
loop.set_exception_handler(self._internal_caller_exception_handler)
+ def _close(self):
+ """
+ Before closing the main loop, run portage.process.run_exitfuncs()
+ with the event loop running so that anything attached can clean
+ itself up (like the socks5 ProxyManager for bug 925240).
+ """
+ if not (self._closing or self.is_closed()):
+ self._closing = True
+ if self._is_main:
+ self.run_until_complete(self._close_main())
+ self._loop.close()
+ self._closing = False
+
+ async def _close_main(self):
+ # Even though this has an exit hook, invoke it here so that
+ # we can properly wait for it and avoid messages like this:
+ # [ERROR] Task was destroyed but it is pending!
+ if socks5.proxy.is_running():
+ await socks5.proxy.stop()
+
+ portage.process.run_exitfuncs()
+
@staticmethod
def _internal_caller_exception_handler(loop, context):
"""
@@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop):
In order to avoid potential interference with API consumers, this
implementation is only used when portage._internal_caller is True.
"""
+ if self._is_main is None:
+ self._is_main = threading.current_thread() is threading.main_thread()
+
+ if not portage._internal_caller:
+ return self._loop.run_until_complete(future)
+
if self._wakeup_fd != -1:
signal.set_wakeup_fd(self._wakeup_fd)
self._wakeup_fd = -1
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index 22241f335..4eecc46a8 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -325,13 +325,37 @@ def _safe_loop():
def _get_running_loop():
+ """
+ This calls the real asyncio get_running_loop() and wraps that with
+ portage's internal AsyncioEventLoop wrapper. If there is no running
+ asyncio event loop but portage has a reference to another running
+ loop in this thread, then use that instead.
+
+ This behavior enables portage internals to use the real asyncio.run
+ while remaining compatible with internal code that does not use the
+ real asyncio.run.
+ """
+ try:
+ _loop = _real_asyncio.get_running_loop()
+ except RuntimeError:
+ _loop = None
+
with _thread_weakrefs.lock:
if _thread_weakrefs.pid == portage.getpid():
try:
loop = _thread_weakrefs.loops[threading.get_ident()]
except KeyError:
- return None
- return loop if loop.is_running() else None
+ pass
+ else:
+ if _loop is loop._loop:
+ return loop
+ elif _loop is None:
+ return loop if loop.is_running() else None
+
+ # If _loop it not None here it means it was probably a temporary
+ # loop created by asyncio.run, so we don't try to cache it, and
+ # just return a temporary wrapper.
+ return None if _loop is None else _AsyncioEventLoop(loop=_loop)
def _thread_weakrefs_atexit():
diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py
index 6c68ff410..f8fcdf9fc 100644
--- a/lib/portage/util/socks5.py
+++ b/lib/portage/util/socks5.py
@@ -2,15 +2,23 @@
# Copyright 2015-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import asyncio
import errno
import os
import socket
+from typing import Union
+
+import portage
+
+portage.proxy.lazyimport.lazyimport(
+ globals(),
+ "portage.util._eventloop.global_event_loop:global_event_loop",
+)
import portage.data
from portage import _python_interpreter
from portage.data import portage_gid, portage_uid, userpriv_groups
from portage.process import atexit_register, spawn
-from portage.util.futures import asyncio
class ProxyManager:
@@ -57,23 +65,41 @@ class ProxyManager:
**spawn_kwargs,
)
- def stop(self):
+ def stop(self) -> Union[None, asyncio.Future]:
"""
Stop the SOCKSv5 server.
+
+ If there is a running asyncio event loop then asyncio.Future is
+ returned which should be used to wait for the server process
+ to exit.
"""
+ future = None
+ try:
+ loop = asyncio.get_running_loop()
+ except RuntimeError:
+ loop = None
if self._proc is not None:
self._proc.terminate()
- loop = asyncio.get_event_loop()
- if self._proc_waiter is None:
- self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop)
- if loop.is_running():
- self._proc_waiter.add_done_callback(lambda future: future.result())
+ if loop is None:
+ # In this case spawn internals would have used
+ # portage's global loop when attaching a waiter to
+ # self._proc, so we are obligated to use that.
+ global_event_loop().run_until_complete(self._proc.wait())
else:
- loop.run_until_complete(self._proc_waiter)
+ if self._proc_waiter is None:
+ self._proc_waiter = asyncio.ensure_future(
+ self._proc.wait(), loop=loop
+ )
+ future = asyncio.shield(self._proc_waiter)
+
+ if loop is not None and future is None:
+ future = loop.create_future()
+ future.set_result(None)
self.socket_path = None
self._proc = None
self._proc_waiter = None
+ return future
def is_running(self):
"""