데이터 양이 방대하지 않을 때(Small to Medium), 별도의 외부 툴 없이 Airflow 내부 리소스만으로 PostgreSQL 간 데이터를 이행하는 방법을 공유합니다.
메모리 의존도가 조금 높더라도 코드가 간결하고 관리가 편한 방식을 선호하신다면 이 가이드가 정답이 될 것입니다.
1. 왜 PostgreSQL 간 이행에는 전용 기능을 쓰는가?
동일한 DB 엔진 간의 이행은 데이터 타입 변환 오류가 거의 없고, PostgreSQL 전용 라이브러리인 psycopg2의 성능을 백분 활용할 수 있습니다. 데이터가 메모리에 적재될 정도(보통 수만 건 내외)라면 PostgresHook을 사용하는 것이 가장 유연합니다.
이 코드는 소스 PG에서 데이터를 조회해 타겟 PG로 밀어 넣는 가장 전형적인 '메모리 경유' 방식입니다.
Python
from airflow import DAG
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
# 1. 데이터 이행 로직 함수화
def pg_to_pg_migration(source_conn_id, target_conn_id, source_query, target_table):
# 소스와 타겟의 훅(Hook) 선언
src_hook = PostgresHook(postgres_conn_id=source_conn_id)
tgt_hook = PostgresHook(postgres_conn_id=target_conn_id)
# 1. 소스에서 데이터 추출 (메모리에 로드)
# 데이터 양이 적으므로 get_records로 리스트 형태로 가져옵니다.
print(f"Extracting data from {source_conn_id}...")
records = src_hook.get_records(sql=source_query)
if records:
# 2. 타겟 테이블 비우기 (멱등성 확보를 위한 선택사항)
print(f"Truncating target table: {target_table}")
tgt_hook.run(f"TRUNCATE TABLE {target_table};")
# 3. 타겟에 데이터 적재 (Bulk Insert)
# insert_rows는 내부적으로 데이터를 쪼개서 넣으므로 안정적입니다.
print(f"Loading {len(records)} rows into {target_table}...")
tgt_hook.insert_rows(
table=target_table,
rows=records,
commit_every=1000 # 1000건마다 커밋
)
print("Migration completed successfully!")
else:
print("No data found to migrate.")
# 2. DAG 정의
with DAG(
'pg_to_pg_simple_migration',
start_date=datetime(2025, 1, 1),
schedule_interval=None,
catchup=False,
tags=['postgres', 'migration', 'small_data']
) as dag:
migrate_task = PythonOperator(
task_id='migrate_pg_data',
python_callable=pg_to_pg_migration,
op_kwargs={
'source_conn_id': 'pg_source_default', # UI에서 설정한 ID
'target_conn_id': 'pg_target_default', # UI에서 설정한 ID
'source_query': 'SELECT id, name, updated_at FROM origin_table',
'target_table': 'destination_table'
}
)
3. 이 방식의 장점과 DA의 포인트
- 코드의 간결성: 별도의 .sh 파일이나 외부 툴 설치 없이 Python 파일 하나로 모든 프로세스가 완결됩니다.
- 유연성: get_records로 가져온 데이터를 타겟에 넣기 전, Python 단에서 살짝 가공(Data Cleaning)할 수 있는 여지가 있습니다.
- 보안성: Airflow의 Connections 기능을 그대로 활용하므로, 패스워드가 코드에 노출되지 않습니다.
4. 주의사항 (데이터 양이 조금씩 늘어난다면?)
지금은 데이터가 적어서 이슈가 없지만, 나중에 양이 늘어날 기미가 보인다면 다음 두 가지만 기억하세요.
- insert_rows의 commit_every: 한 번에 너무 많은 데이터를 커밋하면 트랜잭션 로그가 쌓이므로, 적절한 단위(500~1000)로 끊어서 넣는 것이 좋습니다.
- COPY 명령 검토: PostgreSQL은 INSERT보다 COPY 명령어가 훨씬 빠릅니다. 더 높은 성능이 필요하다면 tgt_hook.copy_expert 함수를 사용하는 코드로 업그레이드할 수 있습니다.
[맺음말]
PostgreSQL 간의 데이터 이동은 "심플함이 곧 정답"일 때가 많습니다. 메모리 제약이 없는 규모라면
오늘 소개해 드린 PostgresHook 패턴을 통해 가장 빠르고 유지보수하기 쉬운 파이프라인을 구축해 보세요!
'Database > 데이터이행' 카테고리의 다른 글
| [Airflow]Executor(실행기) 선택 가이드 (0) | 2026.05.26 |
|---|---|
| 대규모 PostgreSQL 데이터 이행을 위한 Airflow 활용 전략: pg_dump와 FDW 패턴 (0) | 2026.03.19 |
| [실전 이행] Airflow와 mydumper/myloader 연동: 테라바이트급 MySQL 데이터 이행 전략 (0) | 2026.03.19 |
| Airflow를 활용한 MySQL to MySQL 데이터 이행 가이드: 상황별 3가지 최적 패턴 (0) | 2026.03.19 |
| 대용량 CLOB 데이터 처리와 Swap Memory: 서버 중단 (0) | 2026.03.19 |