대용량 데이터를 이행할 때는 Airflow가 직접 데이터를 처리하게 하는 것이 아니라, 데이터베이스 엔진 간에 직접 통신하게 하거나 전용 바이너리 툴을 실행하는 '오케스트레이터' 역할을 수행하도록 설계해야 합니다.
1. 전용 툴 활용: pg_dump & pg_restore (BashOperator)
PostgreSQL의 고유 도구인 pg_dump와 pg_restore를 활용하는 방법입니다. 데이터를 파일 형태로 덤프한 후 타겟에 적재하므로, 수억 건의 데이터도 가장 안정적으로 이전할 수 있습니다.
- 특징: 데이터 압축 및 병렬 처리가 가능하며, 인덱스와 스키마 정보를 포함한 완전한 복제가 가능합니다.
- 준비 사항: Airflow Worker 서버에 postgresql-client가 설치되어 있어야 하며, 덤프 파일을 임시 저장할 디스크 공간이 필요합니다.
Python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# 파일 경로 및 환경 설정
DUMP_FILE_PATH = "/tmp/full_migration.dump"
SOURCE_CONN = "host=source_host user=admin dbname=source_db"
TARGET_CONN = "host=target_host user=admin dbname=target_db"
with DAG(
'migration_postgres_native_tools',
start_date=datetime(2025, 1, 1),
schedule_interval=None,
catchup=False,
tags=['postgres', 'large_scale']
) as dag:
# 1. 고속 덤프 실행 (-Fc: 커스텀 압축 포맷 사용)
dump_data = BashOperator(
task_id='execute_pg_dump',
bash_command=f"pg_dump {SOURCE_CONN} -Fc > {DUMP_FILE_PATH}",
env={'PGPASSWORD': 'your_password'}
)
# 2. 고속 적재 실행 (-j 4: 4개 스레드 병렬 처리)
restore_data = BashOperator(
task_id='execute_pg_restore',
bash_command=f"pg_restore {TARGET_CONN} -d target_db -j 4 {DUMP_FILE_PATH}",
env={'PGPASSWORD': 'your_password'}
)
# 3. 임시 파일 삭제
cleanup = BashOperator(
task_id='remove_dump_file',
bash_command=f"rm {DUMP_FILE_PATH}"
)
dump_data >> restore_data >> cleanup
2. 엔진 간 직접 연결: postgres_fdw (PostgresOperator)
Foreign Data Wrapper(FDW) 기능을 활용하여 타겟 데이터베이스가 소스 데이터베이스를 마치 자신의 테이블처럼 인식하게 만드는 방식입니다.
- 특징: Airflow 서버의 메모리나 디스크를 거의 사용하지 않습니다. 타겟 DB 엔진이 소스 DB로부터 데이터를 직접 스트리밍 방식으로 가져오기 때문에 대규모 이행에서 가장 선호되는 방식 중 하나입니다.
- 과정: 확장 모듈 설치 -> 서버 정의 -> 사용자 매핑 -> 외래 테이블 생성 -> 데이터 적재.
Python
from airflow.providers.postgres.operators.postgres import PostgresOperator
# 타겟 DB에서 수행될 FDW 설정 및 데이터 이행 쿼리
fdw_query = """
-- 1. FDW 확장 설치 및 서버 설정
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE SERVER IF NOT EXISTS remote_src
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'source_host', dbname 'source_db', port '5432');
-- 2. 접속 계정 매핑
CREATE USER MAPPING IF NOT EXISTS FOR current_user
SERVER remote_src
OPTIONS (user 'admin', password 'your_password');
-- 3. 외래 테이블 정의 (소스 테이블 스키마와 동일해야 함)
CREATE FOREIGN TABLE IF NOT EXISTS src_table_proxy (
id int,
data_content text,
created_at timestamp
) SERVER remote_src OPTIONS (table_name 'original_table');
-- 4. 고속 이행 실행
TRUNCATE TABLE local_target_table;
INSERT INTO local_target_table SELECT * FROM src_table_proxy;
"""
with DAG('migration_postgres_fdw', ...) as dag:
run_fdw_sync = PostgresOperator(
task_id='sync_via_fdw',
postgres_conn_id='postgres_target_conn',
sql=fdw_query
)
3. 대용량 처리 시 성능 최적화 핵심 포인트
대규모 이행 시에는 다음 설정들을 병행하여 처리 시간을 단축해야 합니다.
- 병렬 처리(Parallelism): pg_restore 시 -j 옵션을 사용하여 CPU 코어 수에 맞춰 스레드를 할당하면 속도가 비약적으로 향상됩니다.
- 인덱스 사후 생성: 데이터를 적재하는 동안 인덱스가 활성화되어 있으면 매 행마다 인덱스 갱신이 일어나 성능이 급격히 저하됩니다. 적재 전 인덱스를 제거하고, 적재 완료 후 다시 생성하는 방식을 권장합니다.
- UNLOGGED 테이블 활용: 이행 단계에서 타겟 테이블을 일시적으로 UNLOGGED로 설정하면 WAL(Write Ahead Log) 기록을 생략하여 쓰기 속도를 극대화할 수 있습니다. (이행 완료 후 다시 LOGGED로 전환)
[결론]
데이터 규모에 따라 Airflow 운영 전략은 달라져야 합니다. 수십만 건 정도의 데이터는 Python 기반의 PostgresHook으로도 충분하지만, 수백만 건 이상의 대규모 이행은 pg_dump와 같은 네이티브 툴이나 FDW를 이용한 엔진 간 직접 연동 방식을 선택하는 것이 안정성과 성능 면에서 유리합니다.
'Database > 데이터이행' 카테고리의 다른 글
| [Airflow]하이브리드 Executor와 개발 생산성 (0) | 2026.05.27 |
|---|---|
| [Airflow]Executor(실행기) 선택 가이드 (0) | 2026.05.26 |
| PostgreSQL to PostgreSQL 데이터 이행: 메모리 기반의 심플하고 강력한 패턴 (0) | 2026.03.19 |
| [실전 이행] Airflow와 mydumper/myloader 연동: 테라바이트급 MySQL 데이터 이행 전략 (0) | 2026.03.19 |
| Airflow를 활용한 MySQL to MySQL 데이터 이행 가이드: 상황별 3가지 최적 패턴 (0) | 2026.03.19 |