bundle.website.builtin.components.websocket.base

Submodules

Classes

WebSocketBaseComponent

Base websocket component with default params and shared assets.

WebSocketComponentParams

Shared websocket parameters for component instances.

GPXComponentParams

Shared params for GPX websocket components.

GPXWebSocketBaseComponent

Base class for graph-oriented websocket components.

MessageRouter

Typed dispatcher that routes websocket payloads by their type field.

AckMessage

Outgoing keepalive acknowledgement.

ErrorMessage

Outgoing protocol error message.

KeepAliveMessage

Incoming keepalive ping message.

Functions

drain_text(→ None)

Consume and discard incoming text frames until disconnect.

every(→ TaskFactory)

Return a task factory that runs tick periodically every seconds.

keepalive_loop(→ None)

Default keepalive protocol: keepalive -> keepalive_ack.

receive_json(→ TaskFactory)

Return a task factory that reads JSON objects and forwards them to handle.

run_websocket(→ None)

Run one websocket session with composable async tasks.

Package Contents

async bundle.website.builtin.components.websocket.base.drain_text(websocket: fastapi.WebSocket) None

Consume and discard incoming text frames until disconnect.

bundle.website.builtin.components.websocket.base.every(seconds: float, tick: TaskFactory) TaskFactory

Return a task factory that runs tick periodically every seconds.

async bundle.website.builtin.components.websocket.base.keepalive_loop(websocket: fastapi.WebSocket) None

Default keepalive protocol: keepalive -> keepalive_ack.

bundle.website.builtin.components.websocket.base.receive_json(handle: collections.abc.Callable[[fastapi.WebSocket, dict], collections.abc.Awaitable[None]]) TaskFactory

Return a task factory that reads JSON objects and forwards them to handle.

async bundle.website.builtin.components.websocket.base.run_websocket(websocket: fastapi.WebSocket, *task_factories: TaskFactory) None

Run one websocket session with composable async tasks.

The function accepts the connection, starts all tasks, waits for completion, and performs robust cancellation cleanup when the socket closes.

class bundle.website.builtin.components.websocket.base.WebSocketBaseComponent(/, **data: Any)

Bases: bundle.website.core.component.Component

Base websocket component with default params and shared assets.

shared_assets: ClassVar[tuple[str, Ellipsis]] = ('websocket/base/component.css',)
params: WebSocketComponentParams = None
classmethod shared_asset_paths() list[str]
classmethod component_asset_paths_for(component_file: str | pathlib.Path, *, asset_filenames: collections.abc.Iterable[str] | None = None) list[str]
async handle_websocket(websocket: fastapi.WebSocket) None

Default websocket handler (keepalive protocol).

build_routers()

Attach the component websocket route using the configured endpoint.

class bundle.website.builtin.components.websocket.base.WebSocketComponentParams(/, **data: Any)

Bases: bundle.core.data.Data

Shared websocket parameters for component instances.

endpoint: str = '/ws/default'
class bundle.website.builtin.components.websocket.base.GPXComponentParams(/, **data: Any)

Bases: bundle.website.builtin.components.graphic.threeD.GraphicThreeDComponentParams, bundle.website.builtin.components.websocket.base.component.WebSocketComponentParams

Shared params for GPX websocket components.

graph_id: str = 'gpx'
class bundle.website.builtin.components.websocket.base.GPXWebSocketBaseComponent(/, **data: Any)

Bases: bundle.website.builtin.components.websocket.base.component.WebSocketBaseComponent, bundle.website.builtin.components.graphic.threeD.GraphicThreeDComponent

Base class for graph-oriented websocket components.

params: GPXComponentParams = None
class bundle.website.builtin.components.websocket.base.MessageRouter

Typed dispatcher that routes websocket payloads by their type field.

on(message_type: type[bundle.website.core.ws_messages.MessageT], handler: collections.abc.Callable[[fastapi.WebSocket, bundle.website.core.ws_messages.MessageT], collections.abc.Awaitable[None]]) MessageRouter

Register a callback for a Data message model.

async dispatch(websocket: fastapi.WebSocket, payload: dict) None

Deserialize payload and execute the registered typed callback.

class bundle.website.builtin.components.websocket.base.AckMessage(/, **data: Any)

Bases: bundle.core.data.Data, bundle.website.core.ws_messages.WebSocketDataMixin

Outgoing keepalive acknowledgement.

type: Literal['keepalive_ack'] = 'keepalive_ack'
sent_at: int | None = None
received_at: int
server_rx_packets: int = 0
server_tx_packets: int = 0
server_rx_bytes: int = 0
server_tx_bytes: int = 0
request_frame_bytes: int = 0
request_payload_bytes: int = 0
ack_frame_bytes: int = 0
class bundle.website.builtin.components.websocket.base.ErrorMessage(/, **data: Any)

Bases: bundle.core.data.Data, bundle.website.core.ws_messages.WebSocketDataMixin

Outgoing protocol error message.

type: Literal['error'] = 'error'
message: str
class bundle.website.builtin.components.websocket.base.KeepAliveMessage(/, **data: Any)

Bases: bundle.core.data.Data, bundle.website.core.ws_messages.WebSocketDataMixin

Incoming keepalive ping message.

type: Literal['keepalive'] = 'keepalive'
sent_at: int | None = None
payload: str | None = None