I'm trying to automate the creation of batch pipelines from backend mySQLL to BiqQuery and I've ran into issue with my script.
It initialises a DAG with it's own class (DagCreator) and then passes it as a instance variable in my MySQLBatchPipelines class.
The tasks and the order of the tasks are contained in methods inside my MySQLBatchPipelines instance.
How would I add those tasks and the tasks list to my existing DAG?
Here's some example code so you understand my query:
class DagCreator:
def __init__(
self,
dag_id,
start_date,
time_delay: int
):
self.dag_id = dag_id,
self.start_date = start_date,
self.time_delay = time_delay
print(self.dag_id)
def DagToReturn(self):
args = {
'owner': 'Data Lake',
'depends_on_past': True,
'start_date': self.start_date,
'email_on_failure': False,
'email_on_retry': False,
'on_failure_callback': 'failed_dag_slack_alert',
'retries': 0,
'max_active_runs': 1,
'max_file_size': int(50e6)
}
dag_to_return = DAG(
dag_id=f'{self.dag_id}_batch',
schedule_interval=f'{self.time_delay} * * *',
concurrency=4,
default_args=args
)
return dag_to_return
dag = DagCreator('table_name', dt.datetime(2022, 1, 15), 7)
dag_returned = dag.DagToReturn()
dag_returned
This is the second class and it's methods:
class MySQLBatchPipeline:
'''
Class to generate MySQL batch pipelines that store CSV's
in GCS then import them into BigQuery
'''
export_format='CSV'
def __init__(
self,
dag,
....
# lots of methods that return tasks like these
def uk_source_table_extract_task(self):
uk_source_table_extract_task = MySqlToGoogleCloudStorageOperator(
sql=self.queries['new_rows_batch_query'], # <--- needs testing
bucket=self.gcs_bucket,
filename=self.extract_gcs_path,
approx_max_file_size_bytes=MAX_FILE_SIZE, # <-- is max_file_size necc? is in dag arg's is it necc here
mysql_conn_id=self.mysql_connection_id,
export_format=self.export_format,
google_cloud_storage_conn_id=self.gcs_connection_id,
params={
'export_format': self.export_format,
'country': 'uk',
'time_delay': self.time_delay
},
task_id='uk_source_table_extract_task',
dag=self.dag
return uk_expected_stg_row_count_extract_task
# this the the workflow I want to add to the existing DAG I've passed a instance attribute
def uk_workflow(self):
uk_source_table_extract_task.set_downstream(
[
uk_table_name_load_task,
uk_truncate_table_name_staging_task,
uk_expected_stg_row_count_extract_task
]
)
uk_truncate_table_name_staging_task.set_downstream(
uk_table_name_load_task
# etc etc until the final task
source https://stackoverflow.com/questions/70756217/how-to-add-task-list-to-existing-dag
Comments
Post a Comment