단순히 SELECT해서 INSERT하면 끝날 것 같지만, 실무에서는 데이터의 양, 네트워크 대역폭, 그리고 '멱등성(Idempotency)' 보장 등 고려해야 할 변수가 너무나 많습니다. 오늘은 Apache Airflow 환경에서 MySQL 간 데이터를 이행하는 가장 대표적인 방법 3가지를 상세히 파헤쳐 보겠습니다.
1. 가장 표준적인 방법: GenericTransfer 오퍼레이터
Airflow에서 제공하는 범용 데이터 전송 일꾼입니다. 소스 DB의 연결 정보와 타겟 DB의 연결 정보만 있으면 아주 간단하게 구현됩니다.
- 동작 원리: 소스 DB에서 데이터를 메모리로 읽어 들인 후, 타겟 DB에 INSERT 문으로 밀어 넣습니다.
- 장점: 코드가 매우 단순하며, 소스와 타겟의 DB 종류가 달라도 사용 가능합니다.
- 단점: 대량 데이터 처리 시 Airflow 워커 서버의 메모리에 부하가 걸릴 수 있습니다. (수만 건 이내 권장)
Python
from airflow.operators.generic_transfer import GenericTransfer
transfer_task = GenericTransfer(
task_id='mysql_to_mysql_standard',
source_conn_id='source_db_conn', # 소스 MySQL 연결 ID
destination_conn_id='target_db_conn', # 타겟 MySQL 연결 ID
destination_table='target_table_name',
sql="SELECT col1, col2 FROM source_table WHERE base_date = '{{ ds }}'",
pre_execute_sql="TRUNCATE TABLE target_table_name" # 이행 전 기존 데이터 삭제(멱등성)
)
2. 대량 데이터 처리를 위한 최적: MySqlHook + Bulk Load
데이터가 수백만 건 이상이라면 일반적인 INSERT로는 시간이 너무 오래 걸립니다. 이때는 MySQL의 고속 적재 명령인 LOAD DATA INFILE 방식을 활용해야 합니다.
- 동작 원리: 소스 데이터를 CSV 파일로 추출(Dump)한 뒤, 타겟 DB에서 파일을 통째로 읽어 적재합니다.
- DA의 관점: 네트워크 부하를 줄이고 DB 엔진의 성능을 최대치로 활용하는 방식입니다.
Python
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator
def bulk_migration():
source_hook = MySqlHook(mysql_conn_id='source_db_conn')
target_hook = MySqlHook(mysql_conn_id='target_db_conn')
# 1. 데이터를 Pandas나 CSV 형태로 추출
df = source_hook.get_pandas_df(sql="SELECT * FROM source_table")
tmp_path = '/tmp/migration_data.csv'
df.to_csv(tmp_path, index=False, header=False)
# 2. 타겟 DB에 벌크 로드 실행
target_hook.bulk_load(table='target_table_name', tmp_file=tmp_path)
migration_task = PythonOperator(
task_id='mysql_to_mysql_bulk',
python_callable=bulk_migration
)
3. 정교한 가공이 필요할 때: MySqlHook + insert_rows
데이터를 단순히 옮기는 게 아니라, 행(Row)별로 유효성을 검사하거나 특정 로직을 태워야 할 때 사용합니다.
- 특징: 데이터를 '청크(Chunk)' 단위로 쪼개서 처리할 수 있어 메모리 관리에 효율적입니다.
- 활용 예시:
-
Python
def chunk_migration(): source_hook = MySqlHook(mysql_conn_id='source_db_conn') target_hook = MySqlHook(mysql_conn_id='target_db_conn') # 데이터를 한꺼번에 가져오지 않고 커서를 통해 스트리밍 conn = source_hook.get_conn() cursor = conn.cursor() cursor.execute("SELECT * FROM source_table") while True: rows = cursor.fetchmany(1000) # 1,000건씩 끊어서 처리 if not rows: break # 타겟 DB에 1,000건씩 배치 삽입 target_hook.insert_rows(table='target_table_name', rows=rows)
이행 업무 수행 시 체크리스트 (멱등성 전략)
데이터 이행 중 배치가 실패해서 재실행할 때, 데이터가 중복으로 쌓이면 안 됩니다. DA는 항상 다음의 '멱등성(Idempotency)' 전략 중 하나를 선택해야 합니다.
- Truncate & Insert: 이행 시작 전 타겟 테이블을 비우고 전체를 다시 넣는 방식 (가장 확실함).
- Delete & Insert: 특정 날짜({{ ds }}) 데이터만 삭제한 후 해당 날짜 데이터만 다시 넣는 방식 (일별 배치에 적합).
- Upsert (ON DUPLICATE KEY UPDATE): PK가 겹치면 업데이트하고, 없으면 새로 넣는 방식 (실시간성에 유리).
[맺음말]
데이터 이행은 단순히 '복사-붙여넣기'가 아닙니다. 데이터의 성격과 규모에 따라 GenericTransfer의 간결함, Bulk Load의 속도, Hook의 유연함 중 무엇을 선택할지가 DA의 역량을 결정합니다.
통합 프로젝트의 성공적인 데이터 이행을 위해 우리 팀에 가장 맞는 패턴을 적용해 보시기 바랍니다!
'Database > 데이터이행' 카테고리의 다른 글
| PostgreSQL to PostgreSQL 데이터 이행: 메모리 기반의 심플하고 강력한 패턴 (0) | 2026.03.19 |
|---|---|
| [실전 이행] Airflow와 mydumper/myloader 연동: 테라바이트급 MySQL 데이터 이행 전략 (0) | 2026.03.19 |
| 대용량 CLOB 데이터 처리와 Swap Memory: 서버 중단 (0) | 2026.03.19 |
| 1,000개 테이블 이행, Apache Airflow & MyDumper 실전 구축 가이드 (0) | 2026.03.19 |
| 통합 프로젝트 환경에서의 데이터 거버넌스 수립 및 통제 전략 (0) | 2026.03.15 |