대용량 데이터를 이행할 때는 Airflow가 직접 데이터를 처리하게 하는 것이 아니라, 데이터베이스 엔진 간에 직접 통신하게 하거나 전용 바이너리 툴을 실행하는 '오케스트레이터' 역할을 수행하도록 설계해야 합니다.

 

1. 전용 툴 활용: pg_dump & pg_restore (BashOperator)

PostgreSQL의 고유 도구인 pg_dump와 pg_restore를 활용하는 방법입니다. 데이터를 파일 형태로 덤프한 후 타겟에 적재하므로, 수억 건의 데이터도 가장 안정적으로 이전할 수 있습니다.

  • 특징: 데이터 압축 및 병렬 처리가 가능하며, 인덱스와 스키마 정보를 포함한 완전한 복제가 가능합니다.
  • 준비 사항: Airflow Worker 서버에 postgresql-client가 설치되어 있어야 하며, 덤프 파일을 임시 저장할 디스크 공간이 필요합니다.
Python
 
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# 파일 경로 및 환경 설정
DUMP_FILE_PATH = "/tmp/full_migration.dump"
SOURCE_CONN = "host=source_host user=admin dbname=source_db"
TARGET_CONN = "host=target_host user=admin dbname=target_db"

with DAG(
    'migration_postgres_native_tools',
    start_date=datetime(2025, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['postgres', 'large_scale']
) as dag:

    # 1. 고속 덤프 실행 (-Fc: 커스텀 압축 포맷 사용)
    dump_data = BashOperator(
        task_id='execute_pg_dump',
        bash_command=f"pg_dump {SOURCE_CONN} -Fc > {DUMP_FILE_PATH}",
        env={'PGPASSWORD': 'your_password'}
    )

    # 2. 고속 적재 실행 (-j 4: 4개 스레드 병렬 처리)
    restore_data = BashOperator(
        task_id='execute_pg_restore',
        bash_command=f"pg_restore {TARGET_CONN} -d target_db -j 4 {DUMP_FILE_PATH}",
        env={'PGPASSWORD': 'your_password'}
    )

    # 3. 임시 파일 삭제
    cleanup = BashOperator(
        task_id='remove_dump_file',
        bash_command=f"rm {DUMP_FILE_PATH}"
    )

    dump_data >> restore_data >> cleanup

2. 엔진 간 직접 연결: postgres_fdw (PostgresOperator)

Foreign Data Wrapper(FDW) 기능을 활용하여 타겟 데이터베이스가 소스 데이터베이스를 마치 자신의 테이블처럼 인식하게 만드는 방식입니다.

  • 특징: Airflow 서버의 메모리나 디스크를 거의 사용하지 않습니다. 타겟 DB 엔진이 소스 DB로부터 데이터를 직접 스트리밍 방식으로 가져오기 때문에 대규모 이행에서 가장 선호되는 방식 중 하나입니다.
  • 과정: 확장 모듈 설치 -> 서버 정의 -> 사용자 매핑 -> 외래 테이블 생성 -> 데이터 적재.
Python
 
from airflow.providers.postgres.operators.postgres import PostgresOperator

# 타겟 DB에서 수행될 FDW 설정 및 데이터 이행 쿼리
fdw_query = """
-- 1. FDW 확장 설치 및 서버 설정
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

CREATE SERVER IF NOT EXISTS remote_src 
FOREIGN DATA WRAPPER postgres_fdw 
OPTIONS (host 'source_host', dbname 'source_db', port '5432');

-- 2. 접속 계정 매핑
CREATE USER MAPPING IF NOT EXISTS FOR current_user 
SERVER remote_src 
OPTIONS (user 'admin', password 'your_password');

-- 3. 외래 테이블 정의 (소스 테이블 스키마와 동일해야 함)
CREATE FOREIGN TABLE IF NOT EXISTS src_table_proxy (
    id int,
    data_content text,
    created_at timestamp
) SERVER remote_src OPTIONS (table_name 'original_table');

-- 4. 고속 이행 실행
TRUNCATE TABLE local_target_table;
INSERT INTO local_target_table SELECT * FROM src_table_proxy;
"""

with DAG('migration_postgres_fdw', ...) as dag:
    run_fdw_sync = PostgresOperator(
        task_id='sync_via_fdw',
        postgres_conn_id='postgres_target_conn',
        sql=fdw_query
    )

3. 대용량 처리 시 성능 최적화 핵심 포인트

대규모 이행 시에는 다음 설정들을 병행하여 처리 시간을 단축해야 합니다.

  • 병렬 처리(Parallelism): pg_restore 시 -j 옵션을 사용하여 CPU 코어 수에 맞춰 스레드를 할당하면 속도가 비약적으로 향상됩니다.
  • 인덱스 사후 생성: 데이터를 적재하는 동안 인덱스가 활성화되어 있으면 매 행마다 인덱스 갱신이 일어나 성능이 급격히 저하됩니다. 적재 전 인덱스를 제거하고, 적재 완료 후 다시 생성하는 방식을 권장합니다.
  • UNLOGGED 테이블 활용: 이행 단계에서 타겟 테이블을 일시적으로 UNLOGGED로 설정하면 WAL(Write Ahead Log) 기록을 생략하여 쓰기 속도를 극대화할 수 있습니다. (이행 완료 후 다시 LOGGED로 전환)

[결론]

데이터 규모에 따라 Airflow 운영 전략은 달라져야 합니다. 수십만 건 정도의 데이터는 Python 기반의 PostgresHook으로도 충분하지만, 수백만 건 이상의 대규모 이행은 pg_dump와 같은 네이티브 툴이나 FDW를 이용한 엔진 간 직접 연동 방식을 선택하는 것이 안정성과 성능 면에서 유리합니다.

+ Recent posts