Source code for asynctradier.clients.streaming_client

import json
from typing import AsyncIterator, Dict, List

import websockets

from asynctradier.common import MarketDataType
from asynctradier.common.market_data import MarketData
from asynctradier.common.order import Order
from asynctradier.utils.webutils import WebUtil


[docs] class StreamingClient: """ A client for streaming market data and order events. Args: session (WebUtil): The session object for making HTTP requests. account_id (str): The ID of the account. token (str): The authentication token. sandbox (bool, optional): Whether to use the sandbox environment. Defaults to False. """ def __init__( self, session: WebUtil, account_id: str, token: str, sandbox: bool = False ) -> None: self.session = session self.account_id = account_id self.token = token self.sandbox = sandbox
[docs] async def get_order(self, order_id: str) -> Order: """ Get an order by its ID. Args: order_id (str): The ID of the order. Returns: Order: The Order object. """ url = f"/v1/accounts/{self.account_id}/orders/{order_id}" params = {"includeTags": "true"} response = await self.session.get(url, params=params) order = response["order"] return Order( **order, )
async def _get_streaming_account_session(self) -> Dict[str, str]: """ Get the streaming account session. Returns: str: The streaming account session. """ url = "/v1/accounts/events/session" response = await self.session.post(url) return response async def _get_streaming_market_data_session(self) -> Dict[str, str]: """ Get the streaming quote session. Returns: str: The streaming quote session. """ url = "/v1/markets/events/session" response = await self.session.post(url) return response
[docs] async def stream_order(self, with_detail: bool = True) -> AsyncIterator[Order]: """ Stream order events. Args: with_detail (bool, optional): Whether to include order details. Defaults to True. """ streaming_session = await self._get_streaming_account_session() uri = streaming_session["stream"]["url"] session_id = streaming_session["stream"]["sessionid"] async with websockets.connect(uri) as websocket: payload = { "events": ["order"], "sessionid": session_id, "excludeAccounts": [], } payload = json.dumps(payload) await websocket.send(payload) while True: response = json.loads(await websocket.recv()) if response["event"] == "heartbeat": continue if response["event"] == "order": if with_detail: order_id = response["id"] yield await self.get_order(order_id) else: yield Order(**response)
[docs] async def stream_market_data( self, symbols: List[str], filters: List[MarketDataType] = None, linebreak: bool = True, valid_only: bool = True, advanced_details: bool = True, ) -> AsyncIterator[MarketData]: """ Streams market data for the given symbols. Args: symbols (List[str]): The list of symbols to stream market data for. filters (List[MarketDataType], optional): The list of market data types to filter. Defaults to None. linebreak (bool, optional): Whether to include linebreaks in the streamed data. Defaults to True. valid_only (bool, optional): Whether to include only valid market data. Defaults to True. advanced_details (bool, optional): Whether to include advanced details in the market data. Defaults to True. Yields: MarketData: The streamed market data. """ streaming_session = await self._get_streaming_market_data_session() uri = "wss://ws.tradier.com/v1/markets/events" session_id = streaming_session["stream"]["sessionid"] if filters is None: filters = [] filters.append(MarketDataType.trade) async with websockets.connect(uri, ssl=True, compression=None) as websocket: payload = { "symbols": symbols, "sessionid": session_id, "linebreak": linebreak, "filter": filters, "validOnly": valid_only, "advancedDetails": advanced_details, } payload = json.dumps(payload) await websocket.send(payload) while True: response = json.loads(await websocket.recv()) yield MarketData(**response)