Skip to main content

Memory leak and stuck in deadlock during AWS boto multithread download

This method is used for downloading a single file just need to pass object name and bucket name.

def download_file(self, object_name: str, bucket_name: str = None, local_directory_path: Path = None, config=None, include_prefix_path: bool = True, minimal_logging: bool = True):
        """Download an S3 object to a file.

        Parameters
        ----------
        bucket_name : str
            S3 bucket name
        prefix : str
            AWS S3 path from where you need to fetch  the aws s3 ls dump
        local_directory_path : Path
            Directory where files need to be downloaded
        """
        # Creating S3 client with our access key
        if self.s3_client is None:
            self.config_dict = self._config_read_utility(read_aws=True, read_audit=True)

            self.client_config = botocore.config.Config(max_pool_connections=50)
            self.s3_client = boto3.client('s3', aws_access_key_id=self.config_dict['aws_access_key_id'],
                                          aws_secret_access_key=self.config_dict['aws_secret_access_key'],
                                          config=self.client_config)

        #  Reading value from config file if None is given
        if bucket_name is None:
            bucket_name = self.config_dict['source_bucket_name']
        if local_directory_path is None:
            local_directory_path = self.config_dict['download_path']
        # Type Check
        if type(local_directory_path) == 'str':
            local_directory_path = Path(local_directory_path)
        # Folder check to make sure the local directory exists
        # if local directory does not exists then create
        if not local_directory_path.exists():
            local_directory_path.mkdir(parents=True, exist_ok=True)
            log.logger_fw.info(f'S3_Download_Files: Local folder created: {str(local_directory_path)}')

        # Downloading file
        try:
            if include_prefix_path:
                file_path = local_directory_path / Path(object_name)
            else:
                file_path = local_directory_path / Path(object_name).name

            # Makding download directory structure based on object name
            if not os.path.exists(str(file_path.parent)):
                os.makedirs(file_path.parent)

            # Checking if file name already exists or not
            unique_file_idx = 0
            while os.path.exists(str(file_path)):
                file_name = file_path.stem + \
                    f'_{unique_file_idx}' + file_path.suffix
                file_path = file_path.parent / file_name

            self.s3_client.download_file(
                bucket_name, object_name, str(file_path), config)

            if not minimal_logging:
                msg = f"Download Complete: ['s3://{bucket_name}/{object_name}']"
                log.logger_fw.info(msg)
        except ClientError as e:
            msg = f"Download failed: ['s3://{bucket_name}/{object_name}']"

            return (False, e)
        return True 

The below method is running multithreading batch download on pandas DataFrame input. Normally it takes 23-29 seconds on average to download 1000 files but sometimes code gets stuck in deadlock. I am not able to pinpoint how to solve this issue. There is one more issue during a longer run this code has a memory leak issue. I am un-allocating all the memory by running a garbage collector if the total memory percentage crosses 40% but still, the memory is not getting clear and it keeps getting piling up

def multithread_download_batch(self, download_meta_data: pd.DataFrame, object_col_name: str = 'pdf', bucket_name: str = None, local_directory_path: Path = None, unique_col_name: str = None, thread_count: int = 20, include_prefix_path: bool = False, thread_join_average_time: int = None) -> str:
        """
        Use for Multithread downloading where you can define the number of thread based on the system configuration,
        networking and AWS bucket configuration. we need to define proper thread count with trial and error method to 
        achieve best performance. During Download performance is majorly affected during I/O which are slowest compare 
        to speed of networking. So based on your system configuration pick best thread count

        Some Motivation toward proper multithreading
        --------------------------------------------
        If I give you a box of work assignments, do I care about when you've taken everything out of the box?

        No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys 
        might still be working on stuff you took out of the box.

        Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with 
        Queue.join will wait until enough task_done calls have been made, not when the queue is empty.

        Link: https://stackoverflow.com/questions/49637086/python-what-is-queue-task-done-used-for

        Real World Analogy Example
        ---------------------------

        The source for ayncio.queue is pretty short.

        -   the number of unfinished tasks goes up by one when you put to the queue.
        -   it goes down by one with you call task_done
        -   join() awaits there being no unfinished tasks.
        -   This makes join useful if and only if you are calling task_done(). 

        Using the classic bank analogy:

        1.) people come in the doors and get in line; door is a producer doing a q.put()
        2.) when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
        3.) When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
            at 5 p.m., the doors are locked door task finishes
        4.) you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
        5.) then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()
        6.) Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while 
            they have a person at his or her window.

        Parameters
        ----------
        download_meta_data : pd.DataFrame
            Dataframe with download information
        object_col_name : str, optional
            Column name where object info is stored in df, by default 'pdf'
        bucket_name : str, optional
            Bucket name of AWS Bucket, by default None
        local_directory_path : Path, optional
            Directory path where files need to be download, by default None
        unique_col_name : str, optional
            If you want ensure unique number for each row download folder, by default None
        thread_count : int, optional
            Number of thread that will be used for downloading, by default 20

        Returns
        -------
        str
            Returns time taken for downloading this dataframe request
        """

        import queue
        import threading
        
        #  Reading value from config file if None is given
        if bucket_name is None:
            bucket_name = self.config_dict['source_bucket_name']

        # If local directory path is custom
        custom_local_directory_path_flag = False
        if unique_col_name != None:
            custom_local_directory_path_flag = True
        else:
            include_prefix_path = True

        # Downloading current batch of gruops
        start_time = time.time()


        # Creating download queue
        download_queue = queue.Queue()

        # Variable denoting the run state of download operation
        running = False

        def download_queue_worker():
            while running:
                try:
                    if (time.time() - start_time) > 60:
                        break

                    # If queue doesnot have any element it will
                    # raise queue.Empty exception
                    task_parameters = download_queue.get(timeout=0.01)
                    if task_parameters is None:
                        continue

                    try:
                        self.download_file(*task_parameters, include_prefix_path=include_prefix_path)
                    finally:
                        download_queue.task_done()

                except queue.Empty:
                    pass
                except :
                    log.logger_fw.info("Error while processing item")

        # Starting condition for download thread
        running = True

        # Creating Download Threads
        worker_threads = []
        for _ in range(thread_count):
            # Create and manage the thread
            thread = threading.Thread(target=download_queue_worker)
            worker_threads.append(thread)
            
            # Start the thread
            thread.start()
            

        # Popuplating the download queue
        for idx, row in download_meta_data.iterrows():
            # Setting Local Directory path
            if custom_local_directory_path_flag:
                local_directory_path =  self.config_dict['download_path']/ \
                    str(row[unique_col_name])

            # Creating task parameter
            task_parameters = (row[object_col_name],
                               bucket_name, local_directory_path)

            download_queue.put(task_parameters)

        # Waiting for all items to finsish processing
        # .task_done() is used to mark .join() that the processing is done.
        # NOTE  If you use .join() and don't call .task_done() for every processed item,
        # your script will hang forever.
        download_queue.join()

        # Stopping condition for download thread
        running = False
        
        
        # Close worker threads
        for thread in worker_threads:
            thread.join()
        


        # Free up the memory
        for thread in worker_threads:
            del thread
        del download_queue
        del worker_threads
        

        msg = f"Download Time Taken: {time.time() - start_time} seconds taken\n"
        print(msg)
        return msg


source https://stackoverflow.com/questions/76607515/memory-leak-and-stuck-in-deadlock-during-aws-boto-multithread-download

Comments

Popular posts from this blog

ValueError: X has 10 features, but LinearRegression is expecting 1 features as input

So, I am trying to predict the model but its throwing error like it has 10 features but it expacts only 1. So I am confused can anyone help me with it? more importantly its not working for me when my friend runs it. It works perfectly fine dose anyone know the reason about it? cv = KFold(n_splits = 10) all_loss = [] for i in range(9): # 1st for loop over polynomial orders poly_order = i X_train = make_polynomial(x, poly_order) loss_at_order = [] # initiate a set to collect loss for CV for train_index, test_index in cv.split(X_train): print('TRAIN:', train_index, 'TEST:', test_index) X_train_cv, X_test_cv = X_train[train_index], X_test[test_index] t_train_cv, t_test_cv = t[train_index], t[test_index] reg.fit(X_train_cv, t_train_cv) loss_at_order.append(np.mean((t_test_cv - reg.predict(X_test_cv))**2)) # collect loss at fold all_loss.append(np.mean(loss_at_order)) # collect loss at order plt.plot(np.log(al...

Sorting large arrays of big numeric stings

I was solving bigSorting() problem from hackerrank: Consider an array of numeric strings where each string is a positive number with anywhere from to digits. Sort the array's elements in non-decreasing, or ascending order of their integer values and return the sorted array. I know it works as follows: def bigSorting(unsorted): return sorted(unsorted, key=int) But I didnt guess this approach earlier. Initially I tried below: def bigSorting(unsorted): int_unsorted = [int(i) for i in unsorted] int_sorted = sorted(int_unsorted) return [str(i) for i in int_sorted] However, for some of the test cases, it was showing time limit exceeded. Why is it so? PS: I dont know exactly what those test cases were as hacker rank does not reveal all test cases. source https://stackoverflow.com/questions/73007397/sorting-large-arrays-of-big-numeric-stings

How to load Javascript with imported modules?

I am trying to import modules from tensorflowjs, and below is my code. test.html <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title </head> <body> <script src="https://cdn.jsdelivr.net/npm/@tensorflow/tfjs@2.0.0/dist/tf.min.js"></script> <script type="module" src="./test.js"></script> </body> </html> test.js import * as tf from "./node_modules/@tensorflow/tfjs"; import {loadGraphModel} from "./node_modules/@tensorflow/tfjs-converter"; const MODEL_URL = './model.json'; const model = await loadGraphModel(MODEL_URL); const cat = document.getElementById('cat'); model.execute(tf.browser.fromPixels(cat)); Besides, I run the server using python -m http.server in my command prompt(Windows 10), and this is the error prompt in the console log of my browser: Failed to loa...