proxy.core.work.threadless module#

proxy.py#

⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on Network monitoring, controls & Application development, testing, debugging.

copyright
  1. 2013-present by Abhinav Singh and contributors.

license

BSD, see LICENSE for more details.

class proxy.core.work.threadless.Threadless(iid: str, work_queue: proxy.core.work.threadless.T, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None)[source]#

Bases: abc.ABC, Generic[proxy.core.work.threadless.T]

Work executor base class.

Threadless provides an event loop, which is shared across multiple Work instances to handle work.

Threadless takes input a work_klass and an event_queue. work_klass must conform to the Work protocol. Work is received over the event_queue.

When a work is accepted, threadless creates a new instance of work_klass. Threadless will then invoke necessary lifecycle of the Work protocol, allowing work_klass implementation to handle the assigned work.

Example, BaseTcpServerHandler implements Work protocol. It expects a client connection as work payload and hooks into the threadless event loop to handle the client connection.

_abc_impl = <_abc._abc_data object>#
_cleanup(work_id: int) None[source]#
_cleanup_inactive() None[source]#
_create_tasks(work_by_ids: Dict[int, Tuple[List[int], List[int]]]) Set[_asyncio.Task[bool]][source]#
async _run_forever() None[source]#
async _run_once() bool[source]#
async _selected_events() Tuple[Dict[int, Tuple[List[int], List[int]]], bool][source]#

For each work, collects events that they are interested in. Calls select for events of interest.

Returns a 2-tuple containing a dictionary and boolean. Dictionary keys are work IDs and values are 2-tuple containing ready readables & writables.

Returned boolean value indicates whether there is a newly accepted work waiting to be received and queued for processing. This is only applicable when work_queue_fileno returns a valid fd.

_total: int#
async _update_conn_pool_events() None[source]#
async _update_selector() None[source]#
async _update_work_events(work_id: int) None[source]#
_upstream_conn_filenos: Set[int]#
_upstream_conn_pool: Optional['UpstreamConnectionPool']#
async _wait_for_tasks() Set[_asyncio.Task[bool]][source]#
cleanup_inactive_timeout: float#
close_work_queue() None[source]#

Only called if work_queue_fileno returns an integer. If an fd is select-able for work queue, make sure to close the work queue fd now.

create(uid: str, *args: Any) Work[T][source]#
abstract property loop: Optional[asyncio.events.AbstractEventLoop]#
abstract receive_from_work_queue() bool[source]#

Work queue is ready to receive new work.

Receive it and call work_on_tcp_conn.

Return True to tear down the loop.

registered_events_by_work_ids: Dict[int, SelectableEvents]#
run() None[source]#
selector: Optional[selectors.DefaultSelector]#
unfinished: Set['asyncio.Task[bool]']#
wait_timeout: float#
abstract work(*args: Any) None[source]#
abstract work_queue_fileno() Optional[int][source]#

If work queue must be selected before calling receive_from_work_queue then implementation must return work queue fd.

works: Dict[int, 'Work[Any]']#