Skip to main content

Why is my websocket coroutine not being called in the following code?

I am using alpaca-py, Alpaca's new python package, to create a basic tradebot. My goal is to have the bot make a trade (buy), get data back from Alpaca's webhook regarding whether the order was filled (and some other information), and then make another trade with the same shares (sell). Before attempting to intergrate the webhook, the bot was buying and selling fine. I cannot seem to get a coroutine up and running, however.

I have tried the following:

  1. Move await statements to different areas in the coroutines.
  2. Change the placement of the method and removed async from various methods.
  3. Check Alpaca's documentation. (Unfortunately, alpaca-py launched in 2023 and a lot of their documentation is outdated)
  4. Read the TradingStream code to ensure I am doing everything correctly. All looks good.
  5. Change the asyncio.gather call and run them both as routines. I get the same result.
  6. Add logger statements to the code. This clued me in that my method 'trade_update_handler' isn't being called, as nothing gets printed to the console.
  7. Used 'run()' instead of '_run_forever()' however this causes an error on the webhook side.

I am using Django to run the bot, as I like it's BaseCommand class. I don't think django has anything to do with the issue. Here is my code:

class TradeMaker():
def __init__(self, **kwargs):
    self.paper_bool = kwargs.get('paper_bool', True)
    self.random_bool = kwargs.get('random', True)
    self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
    self.amount = kwargs.get('amount', 40000)
    self.seconds_between = kwargs.get('seconds_between', 4)
    self.log = kwargs.get('log')
    self.trading_client, self.trading_stream, self.account = self.open_client()
    self.trade_update_info = None
    self.order_filled = False
    self.shares_bought = 0
    self.current_symbol = None

def open_client(self):
    trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    try:
        account = trading_client.get_account()
    except Exception as e:
        logger.error(f"Exception in login: {e}")
    return trading_client, trading_stream, account

async def trade_update_handler(self, data):
    logger.info('Trade Update called')
    print("Trade Update:", data)
    if data.event == TradeEvent.FILL:
        if data.order.side == OrderSide.BUY:
            self.order_filled = True
            self.shares_bought = data.order.filled_qty
            self.current_symbol = data.order.symbol

async def run_stream(self):
    logger.info('Subscribing to trade updates')
    self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
    logger.info('Preparing stream')
    await self.trading_stream._run_forever()

async def stop_stream(self):
    logger.info('Stopping stream')
    trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
    await trading_stream.stop()

def get_symbol(self):
    if self.random_bool:
        symbol = random.choice(self.symbol_or_symbols)
        return symbol
    else:
        symbol = self.symbol_or_symbols
        return symbol

def buy(self):
    symbol = self.get_symbol()
    market_order_data = MarketOrderRequest(
        symbol=symbol,
        qty=1,
        side=OrderSide.BUY,
        time_in_force=TimeInForce.DAY
    )
    try:
        market_order_buy = self.trading_client.submit_order(
                order_data=market_order_data
        )
    except Exception as e:
        logger.error(f"Failed to buy {symbol}: {e}")
        return None
    return symbol, market_order_buy

def sell(self, symbol):
    symbol = symbol
    shares = self.shares_bought
    
    market_order_data = MarketOrderRequest(
        symbol=symbol,
        qty=250,
        side=OrderSide.SELL,
        time_in_force=TimeInForce.DAY
    )
    try:
        market_order_sell = self.trading_client.submit_order(
                order_data=market_order_data
        )
    except Exception as e:
        logger.error(f"Failed to sell {symbol}: {e}")
        return None
    return market_order_sell

async def make_trades(self):
    market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
    while datetime.datetime.now() < market_close:
        seconds = self.seconds_between
        try:
            symbol, market_order_buy = self.buy()
            print(f"Bought {symbol}: {market_order_buy}")
        except Exception as e:
            logger.error(f"Failed to buy during trade: {e}")
            return None
        while not self.order_filled:
            logger.info('Waiting for order status update')
            await asyncio.sleep(1)
        sleep(seconds)
        try:
            market_order_sell = self.sell(symbol=symbol)
            print(f"Sold {self.current_symbol}: {market_order_sell}")
        except Exception as e:
            logger.error(f"Failed to sell during trade: {e}")
            return None
        self.order_filled = False
        self.shares_bought = 0
        sleep(seconds)
    print('Market closed, shutting down.')

class Command(BaseCommand):
help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
"""
model = None

def add_arguments(self, parser):
    parser.add_argument(
        '--paper',
        type=bool,
        help='Set false to live trade.',
        default=True
    )
    parser.add_argument(
        '--folder',
        type=str,
        help='source folder for files',
        default=''
    )
    parser.add_argument(
        '--symbol',
        type=str,
        help='target symbol, or list of symbols',
        default='AAPL'
    )
    parser.add_argument(
        '--random',
        type=bool,
        help="Set to true if passing a list of symbols to choose randomly from.",
        default=False
    )
    parser.add_argument(
        '--tradevalue',
        type=int,
        help="The amount the bot should trade. e.g. $40000",
        default=40000
    )
    parser.add_argument(
        '--seconds',
        type=int,
        help="The number of seconds the bot should wait between each trade.",
        default=4
    )

def handle(self, **options):
    paper_bool = options['paper']
    random_bool = options['random']
    symbol_or_symbols = options['symbol']
    amount = options['tradevalue']
    seconds_between = options['seconds']
    log = options['folder']
    tm = TradeMaker(
        paper_bool = paper_bool,
        random = random_bool,
        symbol_or_symbols = symbol_or_symbols,
        amount = amount,
        seconds_between = seconds_between,
        log = log
    )
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.gather(
            tm.run_stream(),
            tm.make_trades()
        ))
    except KeyboardInterrupt:
        tm.stop_stream()
        print("Stopped with Interrupt")
    finally:
        tm.stop_stream()
        loop.close()

When I run the command, I get the following output in my terminal (information censored for security):

    python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update

If I run the webhook seprately in another terminal while my bot runs, it works. I can run the following code:

    from alpaca.trading.stream import TradingStream

trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)

async def update_handler(data):
    print(data)

trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()

It will print out all the data as my bot runs. Why would it work seperately, but not in a coroutine?



source https://stackoverflow.com/questions/75892030/why-is-my-websocket-coroutine-not-being-called-in-the-following-code

Comments

Popular posts from this blog

ValueError: X has 10 features, but LinearRegression is expecting 1 features as input

So, I am trying to predict the model but its throwing error like it has 10 features but it expacts only 1. So I am confused can anyone help me with it? more importantly its not working for me when my friend runs it. It works perfectly fine dose anyone know the reason about it? cv = KFold(n_splits = 10) all_loss = [] for i in range(9): # 1st for loop over polynomial orders poly_order = i X_train = make_polynomial(x, poly_order) loss_at_order = [] # initiate a set to collect loss for CV for train_index, test_index in cv.split(X_train): print('TRAIN:', train_index, 'TEST:', test_index) X_train_cv, X_test_cv = X_train[train_index], X_test[test_index] t_train_cv, t_test_cv = t[train_index], t[test_index] reg.fit(X_train_cv, t_train_cv) loss_at_order.append(np.mean((t_test_cv - reg.predict(X_test_cv))**2)) # collect loss at fold all_loss.append(np.mean(loss_at_order)) # collect loss at order plt.plot(np.log(al...

Sorting large arrays of big numeric stings

I was solving bigSorting() problem from hackerrank: Consider an array of numeric strings where each string is a positive number with anywhere from to digits. Sort the array's elements in non-decreasing, or ascending order of their integer values and return the sorted array. I know it works as follows: def bigSorting(unsorted): return sorted(unsorted, key=int) But I didnt guess this approach earlier. Initially I tried below: def bigSorting(unsorted): int_unsorted = [int(i) for i in unsorted] int_sorted = sorted(int_unsorted) return [str(i) for i in int_sorted] However, for some of the test cases, it was showing time limit exceeded. Why is it so? PS: I dont know exactly what those test cases were as hacker rank does not reveal all test cases. source https://stackoverflow.com/questions/73007397/sorting-large-arrays-of-big-numeric-stings

How to load Javascript with imported modules?

I am trying to import modules from tensorflowjs, and below is my code. test.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title </head> <body> <script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@2.0.0/dist/tf.min.js"></script> <script type="module" src="./test.js"></script> </body> </html> test.js import * as tf from "./node_modules/@tensorflow/tfjs"; import {loadGraphModel} from "./node_modules/@tensorflow/tfjs-converter"; const MODEL_URL = './model.json'; const model = await loadGraphModel(MODEL_URL); const cat = document.getElementById('cat'); model.execute(tf.browser.fromPixels(cat)); Besides, I run the server using python -m http.server in my command prompt(Windows 10), and this is the error prompt in the console log of my browser: Failed to loa...