단순한 Python Operator만으로는 대용량 데이터 이행의 속도와 메모리 이슈를 해결하기 어렵습니다. 이때는 Airflow를 스케줄러로 활용하고, 실제 이행은 MySQL 전용 고속 툴인 mydumper와 myloader에게 맡기는 하이브리드 전략이 필요합니다.

왜 이 방식이 더 빠른지, 그리고 실제 운영 환경에서 바로 복사해 쓸 수 있는 코드는 무엇인지 정리합니다.

1. 왜 mydumper & myloader인가? (vs Airflow Native)

비교 항목 Airflow Native (Python) mydumper & myloader
처리 방식 행(Row) 단위 처리 또는 Pandas 변환 파일 기반 멀티스레드 덤프/로드
속도 상대적으로 느림 (Single Thread 위주) 매우 빠름 (Multi-threaded)
메모리 의존도 매우 높음 (Worker 메모리 부족 위험) 매우 낮음 (OS 버퍼 활용)
안정성 대량 데이터 시 에러 가능성 있음 대규모 마이그레이션 전용으로 설계됨

2. Airflow BashOperator를 통한 실전 이행 패턴

Airflow 서버(Worker)에 mydumper와 myloader가 설치되어 있다면, BashOperator를 사용하여 가장 강력한 성능의 이행 파이프라인을 구축할 수 있습니다.

이 코드는 소스에서 데이터를 덤프한 뒤, 즉시 타겟에 로드하는 일련의 과정을 담고 있습니다.

Python
 
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 운영 환경 설정값
SOURCE_HOST = "source-db-host"
TARGET_HOST = "target-db-host"
DB_USER = "admin"
DB_PASS = "your_password"
DB_NAME = "target_database"
DUMP_PATH = "/tmp/airflow_migration_dump"

default_args = {
    'owner': 'migration_admin',
    'start_date': datetime(2025, 1, 1),
    'retries': 0
}

with DAG(
    'mysql_high_speed_migration',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
    tags=['migration', 'native_tool']
) as dag:

    # 1. 기존 임시 디렉토리 청소 및 생성
    prepare_dir = BashOperator(
        task_id='prepare_directory',
        bash_command=f'rm -rf {DUMP_PATH} && mkdir -p {DUMP_PATH}'
    )

    # 2. mydumper: 멀티스레드 고속 덤프 (4스레드 예시)
    # -v 3: 상세 로그, -c: 압축, -t 4: 4개 스레드 사용
    dump_data = BashOperator(
        task_id='fast_dump',
        bash_command=f"""
            mydumper \
            -h {SOURCE_HOST} \
            -u {DB_USER} \
            -p {DB_PASS} \
            -B {DB_NAME} \
            -o {DUMP_PATH} \
            -t 4 -c -v 3
        """
    )

    # 3. myloader: 멀티스레드 고속 적재
    # -o: 테이블이 존재하면 덮어쓰기(Overwrite)
    load_data = BashOperator(
        task_id='fast_load',
        bash_command=f"""
            myloader \
            -h {TARGET_HOST} \
            -u {DB_USER} \
            -p {DB_PASS} \
            -B {DB_NAME} \
            -d {DUMP_PATH} \
            -t 4 -o -v 3
        """
    )

    # 4. 완료 후 임시 파일 삭제
    cleanup = BashOperator(
        task_id='cleanup_files',
        bash_command=f'rm -rf {DUMP_PATH}'
    )

    prepare_dir >> dump_data >> load_data >> cleanup

3. 운영 시 주의사항 (Critical Point)

  • 보안 이슈: 위 샘플처럼 패스워드를 bash_command에 직접 노출하는 것은 보안상 위험합니다. 실제 운영 환경에서는 ~/.my.cnf 파일에 접속 정보를 미리 저장해두거나, Airflow의 Variable에서 값을 가져와 환경변수로 넘기는 방식을 권장합니다.
  • 디스크 공간: mydumper는 데이터를 로컬 파일로 생성하므로, Airflow Worker 서버의 /tmp 공간이나 지정한 경로에 충분한 디스크 용량이 있는지 확인해야 합니다.
  • 네트워크 대역폭: 멀티스레드로 데이터를 밀어 넣으면 네트워크 대역폭을 크게 점유할 수 있습니다. 운영 중인 서비스 DB라면 스레드 수(-t)를 적절히 조절해야 합니다.

[맺음말]

순수 Python 기반의 Airflow Operator는 구현이 쉽지만 성능의 한계가 명확합니다. 수백만 건 이상의 대규모 이행이 필요한 통합 프로젝트라면, Native Tool(mydumper)을 Airflow가 지휘하는 방식이 가장 빠르고 안정적인 선택입니다. 툴의 성능을 믿고, Airflow로는 그 흐름만 제어하세요.


 

+ Recent posts