This method is used for downloading a single file just need to pass object name and bucket name.
def download_file(self, object_name: str, bucket_name: str = None, local_directory_path: Path = None, config=None, include_prefix_path: bool = True, minimal_logging: bool = True):
"""Download an S3 object to a file.
Parameters
----------
bucket_name : str
S3 bucket name
prefix : str
AWS S3 path from where you need to fetch the aws s3 ls dump
local_directory_path : Path
Directory where files need to be downloaded
"""
# Creating S3 client with our access key
if self.s3_client is None:
self.config_dict = self._config_read_utility(read_aws=True, read_audit=True)
self.client_config = botocore.config.Config(max_pool_connections=50)
self.s3_client = boto3.client('s3', aws_access_key_id=self.config_dict['aws_access_key_id'],
aws_secret_access_key=self.config_dict['aws_secret_access_key'],
config=self.client_config)
# Reading value from config file if None is given
if bucket_name is None:
bucket_name = self.config_dict['source_bucket_name']
if local_directory_path is None:
local_directory_path = self.config_dict['download_path']
# Type Check
if type(local_directory_path) == 'str':
local_directory_path = Path(local_directory_path)
# Folder check to make sure the local directory exists
# if local directory does not exists then create
if not local_directory_path.exists():
local_directory_path.mkdir(parents=True, exist_ok=True)
log.logger_fw.info(f'S3_Download_Files: Local folder created: {str(local_directory_path)}')
# Downloading file
try:
if include_prefix_path:
file_path = local_directory_path / Path(object_name)
else:
file_path = local_directory_path / Path(object_name).name
# Makding download directory structure based on object name
if not os.path.exists(str(file_path.parent)):
os.makedirs(file_path.parent)
# Checking if file name already exists or not
unique_file_idx = 0
while os.path.exists(str(file_path)):
file_name = file_path.stem + \
f'_{unique_file_idx}' + file_path.suffix
file_path = file_path.parent / file_name
self.s3_client.download_file(
bucket_name, object_name, str(file_path), config)
if not minimal_logging:
msg = f"Download Complete: ['s3://{bucket_name}/{object_name}']"
log.logger_fw.info(msg)
except ClientError as e:
msg = f"Download failed: ['s3://{bucket_name}/{object_name}']"
return (False, e)
return True
The below method is running multithreading batch download on pandas DataFrame input. Normally it takes 23-29 seconds on average to download 1000 files but sometimes code gets stuck in deadlock. I am not able to pinpoint how to solve this issue. There is one more issue during a longer run this code has a memory leak issue. I am un-allocating all the memory by running a garbage collector if the total memory percentage crosses 40% but still, the memory is not getting clear and it keeps getting piling up
def multithread_download_batch(self, download_meta_data: pd.DataFrame, object_col_name: str = 'pdf', bucket_name: str = None, local_directory_path: Path = None, unique_col_name: str = None, thread_count: int = 20, include_prefix_path: bool = False, thread_join_average_time: int = None) -> str:
"""
Use for Multithread downloading where you can define the number of thread based on the system configuration,
networking and AWS bucket configuration. we need to define proper thread count with trial and error method to
achieve best performance. During Download performance is majorly affected during I/O which are slowest compare
to speed of networking. So based on your system configuration pick best thread count
Some Motivation toward proper multithreading
--------------------------------------------
If I give you a box of work assignments, do I care about when you've taken everything out of the box?
No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys
might still be working on stuff you took out of the box.
Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with
Queue.join will wait until enough task_done calls have been made, not when the queue is empty.
Link: https://stackoverflow.com/questions/49637086/python-what-is-queue-task-done-used-for
Real World Analogy Example
---------------------------
The source for ayncio.queue is pretty short.
- the number of unfinished tasks goes up by one when you put to the queue.
- it goes down by one with you call task_done
- join() awaits there being no unfinished tasks.
- This makes join useful if and only if you are calling task_done().
Using the classic bank analogy:
1.) people come in the doors and get in line; door is a producer doing a q.put()
2.) when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
3.) When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
at 5 p.m., the doors are locked door task finishes
4.) you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
5.) then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()
6.) Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while
they have a person at his or her window.
Parameters
----------
download_meta_data : pd.DataFrame
Dataframe with download information
object_col_name : str, optional
Column name where object info is stored in df, by default 'pdf'
bucket_name : str, optional
Bucket name of AWS Bucket, by default None
local_directory_path : Path, optional
Directory path where files need to be download, by default None
unique_col_name : str, optional
If you want ensure unique number for each row download folder, by default None
thread_count : int, optional
Number of thread that will be used for downloading, by default 20
Returns
-------
str
Returns time taken for downloading this dataframe request
"""
import queue
import threading
# Reading value from config file if None is given
if bucket_name is None:
bucket_name = self.config_dict['source_bucket_name']
# If local directory path is custom
custom_local_directory_path_flag = False
if unique_col_name != None:
custom_local_directory_path_flag = True
else:
include_prefix_path = True
# Downloading current batch of gruops
start_time = time.time()
# Creating download queue
download_queue = queue.Queue()
# Variable denoting the run state of download operation
running = False
def download_queue_worker():
while running:
try:
if (time.time() - start_time) > 60:
break
# If queue doesnot have any element it will
# raise queue.Empty exception
task_parameters = download_queue.get(timeout=0.01)
if task_parameters is None:
continue
try:
self.download_file(*task_parameters, include_prefix_path=include_prefix_path)
finally:
download_queue.task_done()
except queue.Empty:
pass
except :
log.logger_fw.info("Error while processing item")
# Starting condition for download thread
running = True
# Creating Download Threads
worker_threads = []
for _ in range(thread_count):
# Create and manage the thread
thread = threading.Thread(target=download_queue_worker)
worker_threads.append(thread)
# Start the thread
thread.start()
# Popuplating the download queue
for idx, row in download_meta_data.iterrows():
# Setting Local Directory path
if custom_local_directory_path_flag:
local_directory_path = self.config_dict['download_path']/ \
str(row[unique_col_name])
# Creating task parameter
task_parameters = (row[object_col_name],
bucket_name, local_directory_path)
download_queue.put(task_parameters)
# Waiting for all items to finsish processing
# .task_done() is used to mark .join() that the processing is done.
# NOTE If you use .join() and don't call .task_done() for every processed item,
# your script will hang forever.
download_queue.join()
# Stopping condition for download thread
running = False
# Close worker threads
for thread in worker_threads:
thread.join()
# Free up the memory
for thread in worker_threads:
del thread
del download_queue
del worker_threads
msg = f"Download Time Taken: {time.time() - start_time} seconds taken\n"
print(msg)
return msg
source https://stackoverflow.com/questions/76607515/memory-leak-and-stuck-in-deadlock-during-aws-boto-multithread-download
Comments
Post a Comment