들어가며
안녕하세요. 현재 유망한 백엔드 개발자를 꿈꾸는 진기태입니다.
이번 포스팅에서는 AI 설계 생성 기능의 비동기 처리 리팩토링 과정을 공유하고자 합니다.
기존 동기 방식으로 구현했던 기능을 RabbitMQ, Celery을 활용하여 비동기 처리로 전환하여 성능을 크게 개선한 사례를 소개합니다.
동기 처리 vs. 비동기 처리: 문제 인식
동기 방식 구현
초기 구현 시, AI 설계 생성 기능은 동기적으로 작동하여 10회 호출 기준 3개의 정보를 생성하는 데 약 62초 정도 소요되었습니다.
동시 다발적인 요청이 들어오는 상황에서 응답 지연 및 시스템 부하가 발생할 우려가 있었고, 이를 개선하기 위해 비동기 처리 방식으로 전환할 필요성을 느꼈습니다.
문제점
- 높은 응답 지연 시간: 요청마다 순차적으로 처리되어 전체 소요 시간이 늘어남
- 자원 낭비: 동기 처리로 인해 시스템 자원을 비효율적으로 사용
- 사용자 경험 저하: 기능 실행 시간이 길어짐에 따라 사용자 만족도가 하락
비동기 처리 구현: RabbitMQ, Celery 적용
구현 개요
비동기 처리를 위해 RabbitMQ를 메시지 브로커로, Celery를 작업 큐로 활용하였습니다.
주요 구현 단계
1. 메시지 브로커(RabbitMQ) 설정
- RabbitMQ를 설치 및 설정 후, Celery와 연동하여 비동기 작업 큐를 구성했습니다.
- 메시지 큐를 통해 작업 요청을 비동기적으로 분배하고, 처리 결과를 받아올 수 있도록 설계했습니다.
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("config")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request}")
2. Celery 작업 정의 및 구성
- Celery Worker를 구현하여 비동기로 처리해야 할 작업들을 정의했습니다.
- 기존 동기 처리 로직을 Celery 태스크로 전환하여 메시지 큐에서 작업을 받아 처리하도록 변경했습니다.
- 기존의 동기 코드입니다.
@api_view(["POST"])
@csrf_exempt
def create_document(request):
serializer = CreateDocumentSerializer(data=request.data)
if serializer.is_valid():
try:
title = serializer.validated_data.get("title")
content = serializer.validated_data.get("content")
requirements = serializer.validated_data.get("requirements", "No requirements provided")
prompt = f"""
Title: {title}
Content: {content}
requirements: {requirements}
위 내용을 가지고 쳬계적인 문서화를 만들어주세요.
방식은
Title: {title}
Content: {content}
requirements: {requirements} 형식으로 체계적으로 다시 문서화 해주세요.
"""
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model = "deepseek-chat",
messages = messages)
result = response.choices[0].message.content
document = Document.objects.create(
title = title,
content = content,
requirements = requirements,
result = result)
return JsonResponse({
"status": "success",
"data": {
"id": document.id,
"response": result
}},
status = status.HTTP_200_OK)
except Exception as e:
return JsonResponse({
"status": "error",
"message": str(e)},
status = status.HTTP_500_INTERNAL_SERVER_ERROR)
else:
return JsonResponse({
"status": "error",
"errors": serializer.errors},
status = status.HTTP_400_BAD_REQUEST)
- Celery Worker를 사용한 비동기 방식 코드입니다.
def call_openai_api(prompt):
api_url = "https://api.openai.com/v1/chat/completions"
api_key = settings.OPENAI_API_KEY
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": "gpt-4o", # 정확한 모델 이름으로 변경
"messages": [
{"role": "system", "content": "당신은 전문적인 기술 문서를 작성하는 전문가입니다."},
{"role": "user", "content": prompt}
],
"stream": False
}
response = requests.post(api_url, json=payload, headers=headers)
if response.status_code == 200:
result = response.json()
return result["choices"][0]["message"]["content"]
else:
error_msg = response.json().get("error", "Unknown error occurred.")
raise Exception(f"DeepSeek API 호출 실패: {error_msg}")
@shared_task
def create_diagram(data):
"""
주어진 데이터를 기반으로 Mermaid 형식의 시퀀스 다이어그램을 생성합니다.
"""
channel = "task_updates"
redis_client.publish(channel, "create_diagram 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api를 사용하여 다이어그램 코드 생성
diagram_code = call_openai_api(prompt)
redis_client.publish(channel, "create_diagram 작업 완료")
return diagram_code
except Exception as e:
redis_client.publish(channel, f"create_diagram 작업 실패: {e}")
raise Exception(f"Error generating sequence diagram: {e}")
@shared_task
def create_erd(data):
channel = "task_updates"
redis_client.publish(channel, "create_erd 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api 함수 호출하여 Mermaid 코드 생성
diagram_code = call_openai_api(prompt)
# 작업 완료 알림
redis_client.publish(channel, "create_erd 작업 완료")
return diagram_code
except Exception as e:
# 작업 실패 알림
redis_client.publish(channel, f"create_erd 작업 실패: {e}")
raise
@shared_task
def create_api(data):
channel = "task_updates"
redis_client.publish(channel, "create_api 작업 시작")
prompt = f"""
{data} ...
"""
try:
# call_openai_api 함수 호출하여 Swagger JSON 코드 생성
swagger_json_code = call_openai_api(prompt)
# 작업 완료 알림
redis_client.publish(channel, "create_api 작업 완료")
return swagger_json_code
except Exception as e:
# 작업 실패 알림
redis_client.publish(channel, f"create_api 작업 실패: {e}")
raise
@shared_task
def collect_results(results):
return {
"diagram": results[0],
"erd": results[1],
"api": results[2],
}
- 비동기 처리 흐름 확인

- 이미지와 같이, 기존 동기 처리 시의 흐름과 성능 지표를 먼저 기록한 후, (10회 기준 약 62초)

- 비동기 처리 후 개선된 성능과 시스템 상태를 확인했습니다. (10회 기준 약 34초!)
성능 개선 결과
비동기 처리 적용 후, 10회 호출 기준 동기처리 평균 실행 시간은 34초로, 동기 방식의 약 45% 단축되었습니다.
이를 통해 동시 요청 처리 능력과 시스템 자원 활용 효율성이 크게 향상된 것을 확인할 수 있었습니다.
마무리 및 향후 계획
이번 비동기 처리 리팩토링을 통해 시스템 확장성 향상, 빠른 응답 시간 보장, 그리고 사용자 경험 개선이라는 목표를 달성할 수 있었습니다.
앞으로도 지속적인 모니터링과 성능 최적화를 통해, 보다 많은 요청과 복잡한 작업 처리에 유연하게 대응할 계획입니다.
감사합니다.