단순히 SELECT해서 INSERT하면 끝날 것 같지만, 실무에서는 데이터의 양, 네트워크 대역폭, 그리고 '멱등성(Idempotency)' 보장 등 고려해야 할 변수가 너무나 많습니다. 오늘은 Apache Airflow 환경에서 MySQL 간 데이터를 이행하는 가장 대표적인 방법 3가지를 상세히 파헤쳐 보겠습니다.


1. 가장 표준적인 방법: GenericTransfer 오퍼레이터

Airflow에서 제공하는 범용 데이터 전송 일꾼입니다. 소스 DB의 연결 정보와 타겟 DB의 연결 정보만 있으면 아주 간단하게 구현됩니다.

  • 동작 원리: 소스 DB에서 데이터를 메모리로 읽어 들인 후, 타겟 DB에 INSERT 문으로 밀어 넣습니다.
  • 장점: 코드가 매우 단순하며, 소스와 타겟의 DB 종류가 달라도 사용 가능합니다.
  • 단점: 대량 데이터 처리 시 Airflow 워커 서버의 메모리에 부하가 걸릴 수 있습니다. (수만 건 이내 권장)
Python
 
from airflow.operators.generic_transfer import GenericTransfer

transfer_task = GenericTransfer(
    task_id='mysql_to_mysql_standard',
    source_conn_id='source_db_conn',      # 소스 MySQL 연결 ID
    destination_conn_id='target_db_conn', # 타겟 MySQL 연결 ID
    destination_table='target_table_name',
    sql="SELECT col1, col2 FROM source_table WHERE base_date = '{{ ds }}'",
    pre_execute_sql="TRUNCATE TABLE target_table_name" # 이행 전 기존 데이터 삭제(멱등성)
)

2. 대량 데이터 처리를 위한 최적: MySqlHook + Bulk Load

데이터가 수백만 건 이상이라면 일반적인 INSERT로는 시간이 너무 오래 걸립니다. 이때는 MySQL의 고속 적재 명령인 LOAD DATA INFILE 방식을 활용해야 합니다.

  • 동작 원리: 소스 데이터를 CSV 파일로 추출(Dump)한 뒤, 타겟 DB에서 파일을 통째로 읽어 적재합니다.
  • DA의 관점: 네트워크 부하를 줄이고 DB 엔진의 성능을 최대치로 활용하는 방식입니다.
Python
 
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator

def bulk_migration():
    source_hook = MySqlHook(mysql_conn_id='source_db_conn')
    target_hook = MySqlHook(mysql_conn_id='target_db_conn')
    
    # 1. 데이터를 Pandas나 CSV 형태로 추출
    df = source_hook.get_pandas_df(sql="SELECT * FROM source_table")
    tmp_path = '/tmp/migration_data.csv'
    df.to_csv(tmp_path, index=False, header=False)
    
    # 2. 타겟 DB에 벌크 로드 실행
    target_hook.bulk_load(table='target_table_name', tmp_file=tmp_path)

migration_task = PythonOperator(
    task_id='mysql_to_mysql_bulk',
    python_callable=bulk_migration
)

3. 정교한 가공이 필요할 때: MySqlHook + insert_rows

데이터를 단순히 옮기는 게 아니라, 행(Row)별로 유효성을 검사하거나 특정 로직을 태워야 할 때 사용합니다.

  • 특징: 데이터를 '청크(Chunk)' 단위로 쪼개서 처리할 수 있어 메모리 관리에 효율적입니다.
  • 활용 예시:
  • Python
     
    def chunk_migration():
        source_hook = MySqlHook(mysql_conn_id='source_db_conn')
        target_hook = MySqlHook(mysql_conn_id='target_db_conn')
    
        # 데이터를 한꺼번에 가져오지 않고 커서를 통해 스트리밍
        conn = source_hook.get_conn()
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM source_table")
    
        while True:
            rows = cursor.fetchmany(1000) # 1,000건씩 끊어서 처리
            if not rows:
                break
            # 타겟 DB에 1,000건씩 배치 삽입
            target_hook.insert_rows(table='target_table_name', rows=rows)
    

이행 업무 수행 시 체크리스트 (멱등성 전략)

데이터 이행 중 배치가 실패해서 재실행할 때, 데이터가 중복으로 쌓이면 안 됩니다. DA는 항상 다음의 '멱등성(Idempotency)' 전략 중 하나를 선택해야 합니다.

  1. Truncate & Insert: 이행 시작 전 타겟 테이블을 비우고 전체를 다시 넣는 방식 (가장 확실함).
  2. Delete & Insert: 특정 날짜({{ ds }}) 데이터만 삭제한 후 해당 날짜 데이터만 다시 넣는 방식 (일별 배치에 적합).
  3. Upsert (ON DUPLICATE KEY UPDATE): PK가 겹치면 업데이트하고, 없으면 새로 넣는 방식 (실시간성에 유리).

[맺음말]

데이터 이행은 단순히 '복사-붙여넣기'가 아닙니다. 데이터의 성격과 규모에 따라 GenericTransfer의 간결함, Bulk Load의 속도, Hook의 유연함 중 무엇을 선택할지가 DA의 역량을 결정합니다.

통합 프로젝트의 성공적인 데이터 이행을 위해 우리 팀에 가장 맞는 패턴을 적용해 보시기 바랍니다!

 

+ Recent posts