Skip to main content

Memory leak and stuck in deadlock during AWS boto multithread download

This method is used for downloading a single file just need to pass object name and bucket name.

def download_file(self, object_name: str, bucket_name: str = None, local_directory_path: Path = None, config=None, include_prefix_path: bool = True, minimal_logging: bool = True):
        """Download an S3 object to a file.

        Parameters
        ----------
        bucket_name : str
            S3 bucket name
        prefix : str
            AWS S3 path from where you need to fetch  the aws s3 ls dump
        local_directory_path : Path
            Directory where files need to be downloaded
        """
        # Creating S3 client with our access key
        if self.s3_client is None:
            self.config_dict = self._config_read_utility(read_aws=True, read_audit=True)

            self.client_config = botocore.config.Config(max_pool_connections=50)
            self.s3_client = boto3.client('s3', aws_access_key_id=self.config_dict['aws_access_key_id'],
                                          aws_secret_access_key=self.config_dict['aws_secret_access_key'],
                                          config=self.client_config)

        #  Reading value from config file if None is given
        if bucket_name is None:
            bucket_name = self.config_dict['source_bucket_name']
        if local_directory_path is None:
            local_directory_path = self.config_dict['download_path']
        # Type Check
        if type(local_directory_path) == 'str':
            local_directory_path = Path(local_directory_path)
        # Folder check to make sure the local directory exists
        # if local directory does not exists then create
        if not local_directory_path.exists():
            local_directory_path.mkdir(parents=True, exist_ok=True)
            log.logger_fw.info(f'S3_Download_Files: Local folder created: {str(local_directory_path)}')

        # Downloading file
        try:
            if include_prefix_path:
                file_path = local_directory_path / Path(object_name)
            else:
                file_path = local_directory_path / Path(object_name).name

            # Makding download directory structure based on object name
            if not os.path.exists(str(file_path.parent)):
                os.makedirs(file_path.parent)

            # Checking if file name already exists or not
            unique_file_idx = 0
            while os.path.exists(str(file_path)):
                file_name = file_path.stem + \
                    f'_{unique_file_idx}' + file_path.suffix
                file_path = file_path.parent / file_name

            self.s3_client.download_file(
                bucket_name, object_name, str(file_path), config)

            if not minimal_logging:
                msg = f"Download Complete: ['s3://{bucket_name}/{object_name}']"
                log.logger_fw.info(msg)
        except ClientError as e:
            msg = f"Download failed: ['s3://{bucket_name}/{object_name}']"

            return (False, e)
        return True 

The below method is running multithreading batch download on pandas DataFrame input. Normally it takes 23-29 seconds on average to download 1000 files but sometimes code gets stuck in deadlock. I am not able to pinpoint how to solve this issue. There is one more issue during a longer run this code has a memory leak issue. I am un-allocating all the memory by running a garbage collector if the total memory percentage crosses 40% but still, the memory is not getting clear and it keeps getting piling up

def multithread_download_batch(self, download_meta_data: pd.DataFrame, object_col_name: str = 'pdf', bucket_name: str = None, local_directory_path: Path = None, unique_col_name: str = None, thread_count: int = 20, include_prefix_path: bool = False, thread_join_average_time: int = None) -> str:
        """
        Use for Multithread downloading where you can define the number of thread based on the system configuration,
        networking and AWS bucket configuration. we need to define proper thread count with trial and error method to 
        achieve best performance. During Download performance is majorly affected during I/O which are slowest compare 
        to speed of networking. So based on your system configuration pick best thread count

        Some Motivation toward proper multithreading
        --------------------------------------------
        If I give you a box of work assignments, do I care about when you've taken everything out of the box?

        No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys 
        might still be working on stuff you took out of the box.

        Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with 
        Queue.join will wait until enough task_done calls have been made, not when the queue is empty.

        Link: https://stackoverflow.com/questions/49637086/python-what-is-queue-task-done-used-for

        Real World Analogy Example
        ---------------------------

        The source for ayncio.queue is pretty short.

        -   the number of unfinished tasks goes up by one when you put to the queue.
        -   it goes down by one with you call task_done
        -   join() awaits there being no unfinished tasks.
        -   This makes join useful if and only if you are calling task_done(). 

        Using the classic bank analogy:

        1.) people come in the doors and get in line; door is a producer doing a q.put()
        2.) when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
        3.) When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
            at 5 p.m., the doors are locked door task finishes
        4.) you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
        5.) then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()
        6.) Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while 
            they have a person at his or her window.

        Parameters
        ----------
        download_meta_data : pd.DataFrame
            Dataframe with download information
        object_col_name : str, optional
            Column name where object info is stored in df, by default 'pdf'
        bucket_name : str, optional
            Bucket name of AWS Bucket, by default None
        local_directory_path : Path, optional
            Directory path where files need to be download, by default None
        unique_col_name : str, optional
            If you want ensure unique number for each row download folder, by default None
        thread_count : int, optional
            Number of thread that will be used for downloading, by default 20

        Returns
        -------
        str
            Returns time taken for downloading this dataframe request
        """

        import queue
        import threading
        
        #  Reading value from config file if None is given
        if bucket_name is None:
            bucket_name = self.config_dict['source_bucket_name']

        # If local directory path is custom
        custom_local_directory_path_flag = False
        if unique_col_name != None:
            custom_local_directory_path_flag = True
        else:
            include_prefix_path = True

        # Downloading current batch of gruops
        start_time = time.time()


        # Creating download queue
        download_queue = queue.Queue()

        # Variable denoting the run state of download operation
        running = False

        def download_queue_worker():
            while running:
                try:
                    if (time.time() - start_time) > 60:
                        break

                    # If queue doesnot have any element it will
                    # raise queue.Empty exception
                    task_parameters = download_queue.get(timeout=0.01)
                    if task_parameters is None:
                        continue

                    try:
                        self.download_file(*task_parameters, include_prefix_path=include_prefix_path)
                    finally:
                        download_queue.task_done()

                except queue.Empty:
                    pass
                except :
                    log.logger_fw.info("Error while processing item")

        # Starting condition for download thread
        running = True

        # Creating Download Threads
        worker_threads = []
        for _ in range(thread_count):
            # Create and manage the thread
            thread = threading.Thread(target=download_queue_worker)
            worker_threads.append(thread)
            
            # Start the thread
            thread.start()
            

        # Popuplating the download queue
        for idx, row in download_meta_data.iterrows():
            # Setting Local Directory path
            if custom_local_directory_path_flag:
                local_directory_path =  self.config_dict['download_path']/ \
                    str(row[unique_col_name])

            # Creating task parameter
            task_parameters = (row[object_col_name],
                               bucket_name, local_directory_path)

            download_queue.put(task_parameters)

        # Waiting for all items to finsish processing
        # .task_done() is used to mark .join() that the processing is done.
        # NOTE  If you use .join() and don't call .task_done() for every processed item,
        # your script will hang forever.
        download_queue.join()

        # Stopping condition for download thread
        running = False
        
        
        # Close worker threads
        for thread in worker_threads:
            thread.join()
        


        # Free up the memory
        for thread in worker_threads:
            del thread
        del download_queue
        del worker_threads
        

        msg = f"Download Time Taken: {time.time() - start_time} seconds taken\n"
        print(msg)
        return msg


source https://stackoverflow.com/questions/76607515/memory-leak-and-stuck-in-deadlock-during-aws-boto-multithread-download

Comments

Popular posts from this blog

How to show number of registered users in Laravel based on usertype?

i'm trying to display data from the database in the admin dashboard i used this: <?php use Illuminate\Support\Facades\DB; $users = DB::table('users')->count(); echo $users; ?> and i have successfully get the correct data from the database but what if i want to display a specific data for example in this user table there is "usertype" that specify if the user is normal user or admin i want to user the same code above but to display a specific usertype i tried this: <?php use Illuminate\Support\Facades\DB; $users = DB::table('users')->count()->WHERE usertype =admin; echo $users; ?> but it didn't work, what am i doing wrong? source https://stackoverflow.com/questions/68199726/how-to-show-number-of-registered-users-in-laravel-based-on-usertype

Why is my reports service not connecting?

I am trying to pull some data from a Postgres database using Node.js and node-postures but I can't figure out why my service isn't connecting. my routes/index.js file: const express = require('express'); const router = express.Router(); const ordersCountController = require('../controllers/ordersCountController'); const ordersController = require('../controllers/ordersController'); const weeklyReportsController = require('../controllers/weeklyReportsController'); router.get('/orders_count', ordersCountController); router.get('/orders', ordersController); router.get('/weekly_reports', weeklyReportsController); module.exports = router; My controllers/weeklyReportsController.js file: const weeklyReportsService = require('../services/weeklyReportsService'); const weeklyReportsController = async (req, res) => { try { const data = await weeklyReportsService; res.json({data}) console...

How to split a rinex file if I need 24 hours data

Trying to divide rinex file using the command gfzrnx but getting this error. While doing that getting this error msg 'gfzrnx' is not recognized as an internal or external command Trying to split rinex file using the command gfzrnx. also install'gfzrnx'. my doubt is I need to run this program in 'gfzrnx' or in 'cmdprompt'. I am expecting a rinex file with 24 hrs or 1 day data.I Have 48 hrs data in RINEX format. Please help me to solve this issue. source https://stackoverflow.com/questions/75385367/how-to-split-a-rinex-file-if-i-need-24-hours-data