proxy.core.work package#

Subpackages#

Submodules#

Module contents#

proxy.py#

⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on Network monitoring, controls & Application development, testing, debugging.

copyright
  1. 2013-present by Abhinav Singh and contributors.

license

BSD, see LICENSE for more details.

class proxy.core.work.BaseLocalExecutor(*args: Any, **kwargs: Any)[source]#

Bases: proxy.core.work.threadless.Threadless[proxy.common.backports.NonBlockingQueue]

A threadless executor implementation which uses a queue to receive new work.

_abc_impl = <_abc._abc_data object>#
property loop: Optional[asyncio.events.AbstractEventLoop]#
receive_from_work_queue() bool[source]#

Work queue is ready to receive new work.

Receive it and call work_on_tcp_conn.

Return True to tear down the loop.

abstract work(*args: Any) None[source]#
work_queue_fileno() Optional[int][source]#

If work queue must be selected before calling receive_from_work_queue then implementation must return work queue fd.

class proxy.core.work.BaseRemoteExecutor(*args: Any, **kwargs: Any)[source]#

Bases: proxy.core.work.threadless.Threadless[multiprocessing.connection.Connection]

A threadless executor implementation which receives work over a connection.

_abc_impl = <_abc._abc_data object>#
close_work_queue() None[source]#

Only called if work_queue_fileno returns an integer. If an fd is select-able for work queue, make sure to close the work queue fd now.

property loop: Optional[asyncio.events.AbstractEventLoop]#
receive_from_work_queue() bool[source]#

Work queue is ready to receive new work.

Receive it and call work_on_tcp_conn.

Return True to tear down the loop.

abstract work(*args: Any) None[source]#
work_queue_fileno() Optional[int][source]#

If work queue must be selected before calling receive_from_work_queue then implementation must return work queue fd.

class proxy.core.work.Threadless(iid: str, work_queue: proxy.core.work.threadless.T, flags: argparse.Namespace, event_queue: Optional[EventQueue] = None)[source]#

Bases: abc.ABC, Generic[proxy.core.work.threadless.T]

Work executor base class.

Threadless provides an event loop, which is shared across multiple Work instances to handle work.

Threadless takes input a work_klass and an event_queue. work_klass must conform to the Work protocol. Work is received over the event_queue.

When a work is accepted, threadless creates a new instance of work_klass. Threadless will then invoke necessary lifecycle of the Work protocol, allowing work_klass implementation to handle the assigned work.

Example, BaseTcpServerHandler implements Work protocol. It expects a client connection as work payload and hooks into the threadless event loop to handle the client connection.

_abc_impl = <_abc._abc_data object>#
_cleanup(work_id: int) None[source]#
_cleanup_inactive() None[source]#
_create_tasks(work_by_ids: Dict[int, Tuple[List[int], List[int]]]) Set[_asyncio.Task[bool]][source]#
async _run_forever() None[source]#
async _run_once() bool[source]#
async _selected_events() Tuple[Dict[int, Tuple[List[int], List[int]]], bool][source]#

For each work, collects events that they are interested in. Calls select for events of interest.

Returns a 2-tuple containing a dictionary and boolean. Dictionary keys are work IDs and values are 2-tuple containing ready readables & writables.

Returned boolean value indicates whether there is a newly accepted work waiting to be received and queued for processing. This is only applicable when work_queue_fileno returns a valid fd.

async _update_conn_pool_events() None[source]#
async _update_selector() None[source]#
async _update_work_events(work_id: int) None[source]#
async _wait_for_tasks() Set[_asyncio.Task[bool]][source]#
close_work_queue() None[source]#

Only called if work_queue_fileno returns an integer. If an fd is select-able for work queue, make sure to close the work queue fd now.

create(uid: str, *args: Any) Work[T][source]#
abstract property loop: Optional[asyncio.events.AbstractEventLoop]#
abstract receive_from_work_queue() bool[source]#

Work queue is ready to receive new work.

Receive it and call work_on_tcp_conn.

Return True to tear down the loop.

run() None[source]#
abstract work(*args: Any) None[source]#
abstract work_queue_fileno() Optional[int][source]#

If work queue must be selected before calling receive_from_work_queue then implementation must return work queue fd.

class proxy.core.work.ThreadlessPool(flags: argparse.Namespace, executor_klass: Type[T], event_queue: Optional[EventQueue] = None)[source]#

Bases: object

Manages lifecycle of threadless pool and delegates work to them using a round-robin strategy.

Example usage:

with ThreadlessPool(flags=...) as pool:
    while True:
        time.sleep(1)

If necessary, start multiple threadless pool with different work classes.

_shutdown_workers() None[source]#

Pop a running threadless worker and clean it up.

_start_worker(index: int) None[source]#

Starts a threadless worker.

setup() None[source]#

Setup threadless processes.

shutdown() None[source]#

Shutdown threadless processes.

class proxy.core.work.Work(work: proxy.core.work.work.T, flags: argparse.Namespace, event_queue: Optional[proxy.core.event.queue.EventQueue] = None, uid: Optional[str] = None, upstream_conn_pool: Optional[UpstreamConnectionPool] = None)[source]#

Bases: abc.ABC, Generic[proxy.core.work.work.T]

Implement Work to hook into the event loop provided by Threadless process.

_abc_impl = <_abc._abc_data object>#
abstract static create(*args: Any) proxy.core.work.work.T[source]#

Implementations are responsible for creation of work objects from incoming args. This helps keep work core agnostic to creation of externally defined work class objects.

async get_events() Dict[int, int][source]#

Return sockets and events (read or write) that we are interested in.

async handle_events(_readables: List[int], _writables: List[int]) bool[source]#

Handle readable and writable sockets.

Return True to shutdown work.

initialize() None[source]#

Perform any resource initialization.

is_inactive() bool[source]#

Return True if connection should be considered inactive.

publish_event(event_name: int, event_payload: Dict[str, Any], publisher_id: Optional[str] = None) None[source]#

Convenience method provided to publish events into the global event queue.

run() None[source]#

run() method is not used by Threadless. It’s here for backward compatibility with threaded mode where work class is started as a separate thread.

shutdown() None[source]#

Implementation must close any opened resources here and call super().shutdown().

proxy.core.work.delegate_work_to_pool(worker_pid: int, work_queue: connection.Connection, work_lock: multiprocessing.synchronize.Lock, conn: socket.socket, addr: Optional[HostPort], unix_socket_path: Optional[str] = None) None[source]#

Utility method to delegate a work to threadless executor pool.

proxy.core.work.start_threaded_work(flags: argparse.Namespace, conn: socket.socket, addr: Optional[HostPort], event_queue: Optional[proxy.core.event.queue.EventQueue] = None, publisher_id: Optional[str] = None) Tuple[Work[T], threading.Thread][source]#

Utility method to start a work in a new thread.