Skip to main content

Why is my websocket coroutine not being called in the following code?

I am using alpaca-py, Alpaca's new python package, to create a basic tradebot. My goal is to have the bot make a trade (buy), get data back from Alpaca's webhook regarding whether the order was filled (and some other information), and then make another trade with the same shares (sell). Before attempting to intergrate the webhook, the bot was buying and selling fine. I cannot seem to get a coroutine up and running, however.

I have tried the following:

  1. Move await statements to different areas in the coroutines.
  2. Change the placement of the method and removed async from various methods.
  3. Check Alpaca's documentation. (Unfortunately, alpaca-py launched in 2023 and a lot of their documentation is outdated)
  4. Read the TradingStream code to ensure I am doing everything correctly. All looks good.
  5. Change the asyncio.gather call and run them both as routines. I get the same result.
  6. Add logger statements to the code. This clued me in that my method 'trade_update_handler' isn't being called, as nothing gets printed to the console.
  7. Used 'run()' instead of '_run_forever()' however this causes an error on the webhook side.

I am using Django to run the bot, as I like it's BaseCommand class. I don't think django has anything to do with the issue. Here is my code:

class TradeMaker():
def __init__(self, **kwargs):
    self.paper_bool = kwargs.get('paper_bool', True)
    self.random_bool = kwargs.get('random', True)
    self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
    self.amount = kwargs.get('amount', 40000)
    self.seconds_between = kwargs.get('seconds_between', 4)
    self.log = kwargs.get('log')
    self.trading_client, self.trading_stream, self.account = self.open_client()
    self.trade_update_info = None
    self.order_filled = False
    self.shares_bought = 0
    self.current_symbol = None

def open_client(self):
    trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    try:
        account = trading_client.get_account()
    except Exception as e:
        logger.error(f"Exception in login: {e}")
    return trading_client, trading_stream, account

async def trade_update_handler(self, data):
    logger.info('Trade Update called')
    print("Trade Update:", data)
    if data.event == TradeEvent.FILL:
        if data.order.side == OrderSide.BUY:
            self.order_filled = True
            self.shares_bought = data.order.filled_qty
            self.current_symbol = data.order.symbol

async def run_stream(self):
    logger.info('Subscribing to trade updates')
    self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
    logger.info('Preparing stream')
    await self.trading_stream._run_forever()

async def stop_stream(self):
    logger.info('Stopping stream')
    trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    await trading_stream.stop()

def get_symbol(self):
    if self.random_bool:
        symbol = random.choice(self.symbol_or_symbols)
        return symbol
    else:
        symbol = self.symbol_or_symbols
        return symbol

def buy(self):
    symbol = self.get_symbol()
    market_order_data = MarketOrderRequest(
        symbol=symbol,
        qty=1,
        side=OrderSide.BUY,
        time_in_force=TimeInForce.DAY
    )
    try:
        market_order_buy = self.trading_client.submit_order(
                order_data=market_order_data
        )
    except Exception as e:
        logger.error(f"Failed to buy {symbol}: {e}")
        return None
    return symbol, market_order_buy

def sell(self, symbol):
    symbol = symbol
    shares = self.shares_bought
    
    market_order_data = MarketOrderRequest(
        symbol=symbol,
        qty=250,
        side=OrderSide.SELL,
        time_in_force=TimeInForce.DAY
    )
    try:
        market_order_sell = self.trading_client.submit_order(
                order_data=market_order_data
        )
    except Exception as e:
        logger.error(f"Failed to sell {symbol}: {e}")
        return None
    return market_order_sell

async def make_trades(self):
    market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
    while datetime.datetime.now() < market_close:
        seconds = self.seconds_between
        try:
            symbol, market_order_buy = self.buy()
            print(f"Bought {symbol}: {market_order_buy}")
        except Exception as e:
            logger.error(f"Failed to buy during trade: {e}")
            return None
        while not self.order_filled:
            logger.info('Waiting for order status update')
            await asyncio.sleep(1)
        sleep(seconds)
        try:
            market_order_sell = self.sell(symbol=symbol)
            print(f"Sold {self.current_symbol}: {market_order_sell}")
        except Exception as e:
            logger.error(f"Failed to sell during trade: {e}")
            return None
        self.order_filled = False
        self.shares_bought = 0
        sleep(seconds)
    print('Market closed, shutting down.')

class Command(BaseCommand):
help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
"""
model = None

def add_arguments(self, parser):
    parser.add_argument(
        '--paper',
        type=bool,
        help='Set false to live trade.',
        default=True
    )
    parser.add_argument(
        '--folder',
        type=str,
        help='source folder for files',
        default=''
    )
    parser.add_argument(
        '--symbol',
        type=str,
        help='target symbol, or list of symbols',
        default='AAPL'
    )
    parser.add_argument(
        '--random',
        type=bool,
        help="Set to true if passing a list of symbols to choose randomly from.",
        default=False
    )
    parser.add_argument(
        '--tradevalue',
        type=int,
        help="The amount the bot should trade. e.g. $40000",
        default=40000
    )
    parser.add_argument(
        '--seconds',
        type=int,
        help="The number of seconds the bot should wait between each trade.",
        default=4
    )

def handle(self, **options):
    paper_bool = options['paper']
    random_bool = options['random']
    symbol_or_symbols = options['symbol']
    amount = options['tradevalue']
    seconds_between = options['seconds']
    log = options['folder']
    tm = TradeMaker(
        paper_bool = paper_bool,
        random = random_bool,
        symbol_or_symbols = symbol_or_symbols,
        amount = amount,
        seconds_between = seconds_between,
        log = log
    )
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.gather(
            tm.run_stream(),
            tm.make_trades()
        ))
    except KeyboardInterrupt:
        tm.stop_stream()
        print("Stopped with Interrupt")
    finally:
        tm.stop_stream()
        loop.close()

When I run the command, I get the following output in my terminal (information censored for security):

    python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update

If I run the webhook seprately in another terminal while my bot runs, it works. I can run the following code:

    from alpaca.trading.stream import TradingStream

trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)

async def update_handler(data):
    print(data)

trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()

It will print out all the data as my bot runs. Why would it work seperately, but not in a coroutine?



source https://stackoverflow.com/questions/75892030/why-is-my-websocket-coroutine-not-being-called-in-the-following-code

Comments

Popular posts from this blog

How to show number of registered users in Laravel based on usertype?

i'm trying to display data from the database in the admin dashboard i used this: <?php use Illuminate\Support\Facades\DB; $users = DB::table('users')->count(); echo $users; ?> and i have successfully get the correct data from the database but what if i want to display a specific data for example in this user table there is "usertype" that specify if the user is normal user or admin i want to user the same code above but to display a specific usertype i tried this: <?php use Illuminate\Support\Facades\DB; $users = DB::table('users')->count()->WHERE usertype =admin; echo $users; ?> but it didn't work, what am i doing wrong? source https://stackoverflow.com/questions/68199726/how-to-show-number-of-registered-users-in-laravel-based-on-usertype

Why is my reports service not connecting?

I am trying to pull some data from a Postgres database using Node.js and node-postures but I can't figure out why my service isn't connecting. my routes/index.js file: const express = require('express'); const router = express.Router(); const ordersCountController = require('../controllers/ordersCountController'); const ordersController = require('../controllers/ordersController'); const weeklyReportsController = require('../controllers/weeklyReportsController'); router.get('/orders_count', ordersCountController); router.get('/orders', ordersController); router.get('/weekly_reports', weeklyReportsController); module.exports = router; My controllers/weeklyReportsController.js file: const weeklyReportsService = require('../services/weeklyReportsService'); const weeklyReportsController = async (req, res) => { try { const data = await weeklyReportsService; res.json({data}) console...

How to split a rinex file if I need 24 hours data

Trying to divide rinex file using the command gfzrnx but getting this error. While doing that getting this error msg 'gfzrnx' is not recognized as an internal or external command Trying to split rinex file using the command gfzrnx. also install'gfzrnx'. my doubt is I need to run this program in 'gfzrnx' or in 'cmdprompt'. I am expecting a rinex file with 24 hrs or 1 day data.I Have 48 hrs data in RINEX format. Please help me to solve this issue. source https://stackoverflow.com/questions/75385367/how-to-split-a-rinex-file-if-i-need-24-hours-data