TIL

6_5.데이터베이스 기반 작업 큐 실습

꿀승 2025. 2. 4. 13:43
728x90
반응형
SMALL

학습내용

  1. 작업 큐의 개념
  2. 테이블 작업 기반 큐 활용 실습

학습정리

1. 작업 큐의 개념

  • 작업을 비동기로 순서대로 저장하고 처리
  • 데이터베이스나 메모리와 같은 저장소에 저장하고 하나씩 작업을 처리하여 상태를 업데이트 하는 방식
  • 필요성
    • 응답성 향상 : 비동기적으로 처리함으로 응답속도 향상
    • 효율적인 요청처리 : 대량의 요청을 큐에 저장하고 순차적으로 처리하여 시스템 과부하 방지
    • 작업 추적 가능 : 작업의 현재 상태와 진행 상황을 추적하여 오류를 방지하고, 관리 용이

2. 테이블 작업 기반 큐 활용 실습

  1. TaskQueue 테이블 및 엔티티 생성

    @Entity
    @Getter
    @DynamicInsert
    @DynamicUpdate
    @NoArgsConstructor
    @Table(name= "task_queue")
    @FieldDefaults(level = AccessLevel.PRIVATE)
    public class TaskQueue {
    
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        Long id;
    
        //taskQueue 이벤트아이디 ex) orderId,refundId 등등
        @Setter
        @Column
        Long eventId;
    
        //taskQueue 타입으로 ORDER,REFUND 등등
        //eventId와 연결해서 해당 이벤트가 확인
        @Column(nullable = false)
        @Enumerated(EnumType.STRING)
        TaskType taskType;
    
        //준비,처리중,완료 등을 나타내는 상태값
        @Setter
        @Column(nullable = false)
        @Enumerated(EnumType.STRING)
        TaskStatus status;
    
        //추가적으로 에러메시지,재시도 횟수, 작업우선순위등 다양한 컬럼 추가가능
    
        @Column(name = "created_at", nullable = false, updatable = false)
        @CreationTimestamp
        LocalDateTime createdAt;
    
        @Column(name = "updated_at")
        @UpdateTimestamp
        LocalDateTime updatedAt;
    
        @Builder
        public TaskQueue(TaskType taskType, TaskStatus status) {
            this.taskType = taskType;
            this.status = status;
        }
    
    }
    
  2. TaskQueueRepository 생성

    @Repository
    public interface TaskQueueRepository extends JpaRepository<TaskQueue,Long> {
        //비관적 락 사용하여 해당 이벤트 조회
        @Lock(LockModeType.PESSIMISTIC_WRITE)
        @Query("SELECT tq FROM TaskQueue tq WHERE tq.id = :id")
        Optional<TaskQueue> findByIdForUpdate(@Param("id") Long id);
    
        @Lock(LockModeType.PESSIMISTIC_WRITE)
        List<TaskQueue> findAllByStatus(TaskStatus status);
    }
    
  3. TaskQueueServie 생성

    //공통서비스로 빼서 작업
    @Service
    @RequiredArgsConstructor
    public class TaskQueueService {
        private final TaskQueueRepository taskQueueRepository;
    
        //해당 메서드는 트랜잭션 X 롤백 되면 안되기 때문에
        public TaskQueue requestQueue(TaskType taskType) {
            TaskQueue taskQueue = TaskQueue.builder()
                    .taskType(taskType)
                    .status(TaskStatus.PENDING)
                    .build();
    
            return taskQueueRepository.save(taskQueue);
        }
    
        //비동기적으로 작업
        //람다를 만들기위한 인자가 consumer
        @Transactional
        public void processQueueById(Long taskQueueId, Consumer<TaskQueue> task) {
            TaskQueue taskQueue = taskQueueRepository.findByIdForUpdate(taskQueueId)
                    .orElseThrow(()->new ServiceException(ServiceExceptionCode.NOT_FOUND_TASK));
    
            updateStatus(taskQueue,TaskStatus.PROCESSING);
    
            task.accept(taskQueue);
    
            updateStatus(taskQueue,TaskStatus.COMPLETED);
        }
    
        private void updateStatus(TaskQueue taskQueue,TaskStatus taskStatus) {
            taskQueue.setStatus(taskStatus);
            taskQueueRepository.flush();
        }
    }
    
  4. 해당 TaskQueue 적용 예시

    @Service
    @RequiredArgsConstructor
    public class OrderService {
      private final TaskQueueService taskQueueService;
      private final TaskQueueRepository taskQueueRepository;
      private final OrderProcessService orderProcessService;
    
      private final OrderRepository orderRepository;
      private final UserRepository userRepository;
    
    //api에서는 해당 Request를 호출
      public void orderRequest(OrderRequest request) {
        TaskQueue taskQueue = taskQueueService.requestQueue(TaskType.ORDER);
        //해당 프로세스는 비동기로 동작하기때문에 성공 실패와 상관없이 api는 응답하고 
        //다른 쓰레드에서 비동기적으로 orderProcess 처리진행
        orderProcess(taskQueue.getId(),request);
      }
    
      @Async
      @Transactional
      public void orderProcess(Long taskQueueId, OrderRequest request) {
        //taskQueue는 processQueueById()에서 만든 taskQueue
        taskQueueService.processQueueById(taskQueueId, (taskQueue)->{
          Order order = save(request.getUserId());
    
          //TaskQueue 타입에 맞는 taskQueue 이벤트 아이디에 해당 아이디 추가
          taskQueue.setEventId(order.getId());
    
          List<OrderItem> orderItems = orderProcessService.saveOrderItems(request, order);
    
          BigDecimal totalPrice = orderProcessService.calculateTotalPrice(orderItems);
          order.setTotalPrice(totalPrice);
        });
      }
    
      //리트라이 로직 대충 틀만 확인
      @Transactional
      public void orderBatchProcess(OrderRequest request) {
        List<TaskQueue> taskQueues = taskQueueRepository.findAllByStatus(TaskStatus.PENDING);
    
        for (TaskQueue taskQueue : taskQueues) {
          orderProcess(taskQueue.getId(),request);
        }
      }
        ... 생략
    }

ps. 이번 시간에는 데이터베이스 기반 작업 큐를 배우고 실습을 진행했는데,
kafka를 써보긴 했지만 이러한 방법이 있었다는 걸 알고 있지 못했는데
이러한 방법도 있다는 걸 알게 되서 신기했고 작업 큐에 대해 정확한 흐름을 다시 한번 파악하게 된 것 같아서 좋았습니다.

728x90
반응형
LIST