728x90
반응형
SMALL
학습내용
- 작업 큐의 개념
- 테이블 작업 기반 큐 활용 실습
학습정리
1. 작업 큐의 개념
- 작업을 비동기로 순서대로 저장하고 처리
- 데이터베이스나 메모리와 같은 저장소에 저장하고 하나씩 작업을 처리하여 상태를 업데이트 하는 방식
- 필요성
- 응답성 향상 : 비동기적으로 처리함으로 응답속도 향상
- 효율적인 요청처리 : 대량의 요청을 큐에 저장하고 순차적으로 처리하여 시스템 과부하 방지
- 작업 추적 가능 : 작업의 현재 상태와 진행 상황을 추적하여 오류를 방지하고, 관리 용이
2. 테이블 작업 기반 큐 활용 실습
TaskQueue 테이블 및 엔티티 생성
@Entity @Getter @DynamicInsert @DynamicUpdate @NoArgsConstructor @Table(name= "task_queue") @FieldDefaults(level = AccessLevel.PRIVATE) public class TaskQueue { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) Long id; //taskQueue 이벤트아이디 ex) orderId,refundId 등등 @Setter @Column Long eventId; //taskQueue 타입으로 ORDER,REFUND 등등 //eventId와 연결해서 해당 이벤트가 확인 @Column(nullable = false) @Enumerated(EnumType.STRING) TaskType taskType; //준비,처리중,완료 등을 나타내는 상태값 @Setter @Column(nullable = false) @Enumerated(EnumType.STRING) TaskStatus status; //추가적으로 에러메시지,재시도 횟수, 작업우선순위등 다양한 컬럼 추가가능 @Column(name = "created_at", nullable = false, updatable = false) @CreationTimestamp LocalDateTime createdAt; @Column(name = "updated_at") @UpdateTimestamp LocalDateTime updatedAt; @Builder public TaskQueue(TaskType taskType, TaskStatus status) { this.taskType = taskType; this.status = status; } }
TaskQueueRepository 생성
@Repository public interface TaskQueueRepository extends JpaRepository<TaskQueue,Long> { //비관적 락 사용하여 해당 이벤트 조회 @Lock(LockModeType.PESSIMISTIC_WRITE) @Query("SELECT tq FROM TaskQueue tq WHERE tq.id = :id") Optional<TaskQueue> findByIdForUpdate(@Param("id") Long id); @Lock(LockModeType.PESSIMISTIC_WRITE) List<TaskQueue> findAllByStatus(TaskStatus status); }
TaskQueueServie 생성
//공통서비스로 빼서 작업 @Service @RequiredArgsConstructor public class TaskQueueService { private final TaskQueueRepository taskQueueRepository; //해당 메서드는 트랜잭션 X 롤백 되면 안되기 때문에 public TaskQueue requestQueue(TaskType taskType) { TaskQueue taskQueue = TaskQueue.builder() .taskType(taskType) .status(TaskStatus.PENDING) .build(); return taskQueueRepository.save(taskQueue); } //비동기적으로 작업 //람다를 만들기위한 인자가 consumer @Transactional public void processQueueById(Long taskQueueId, Consumer<TaskQueue> task) { TaskQueue taskQueue = taskQueueRepository.findByIdForUpdate(taskQueueId) .orElseThrow(()->new ServiceException(ServiceExceptionCode.NOT_FOUND_TASK)); updateStatus(taskQueue,TaskStatus.PROCESSING); task.accept(taskQueue); updateStatus(taskQueue,TaskStatus.COMPLETED); } private void updateStatus(TaskQueue taskQueue,TaskStatus taskStatus) { taskQueue.setStatus(taskStatus); taskQueueRepository.flush(); } }
해당 TaskQueue 적용 예시
@Service @RequiredArgsConstructor public class OrderService { private final TaskQueueService taskQueueService; private final TaskQueueRepository taskQueueRepository; private final OrderProcessService orderProcessService; private final OrderRepository orderRepository; private final UserRepository userRepository; //api에서는 해당 Request를 호출 public void orderRequest(OrderRequest request) { TaskQueue taskQueue = taskQueueService.requestQueue(TaskType.ORDER); //해당 프로세스는 비동기로 동작하기때문에 성공 실패와 상관없이 api는 응답하고 //다른 쓰레드에서 비동기적으로 orderProcess 처리진행 orderProcess(taskQueue.getId(),request); } @Async @Transactional public void orderProcess(Long taskQueueId, OrderRequest request) { //taskQueue는 processQueueById()에서 만든 taskQueue taskQueueService.processQueueById(taskQueueId, (taskQueue)->{ Order order = save(request.getUserId()); //TaskQueue 타입에 맞는 taskQueue 이벤트 아이디에 해당 아이디 추가 taskQueue.setEventId(order.getId()); List<OrderItem> orderItems = orderProcessService.saveOrderItems(request, order); BigDecimal totalPrice = orderProcessService.calculateTotalPrice(orderItems); order.setTotalPrice(totalPrice); }); } //리트라이 로직 대충 틀만 확인 @Transactional public void orderBatchProcess(OrderRequest request) { List<TaskQueue> taskQueues = taskQueueRepository.findAllByStatus(TaskStatus.PENDING); for (TaskQueue taskQueue : taskQueues) { orderProcess(taskQueue.getId(),request); } } ... 생략 }
ps. 이번 시간에는 데이터베이스 기반 작업 큐를 배우고 실습을 진행했는데,
kafka를 써보긴 했지만 이러한 방법이 있었다는 걸 알고 있지 못했는데
이러한 방법도 있다는 걸 알게 되서 신기했고 작업 큐에 대해 정확한 흐름을 다시 한번 파악하게 된 것 같아서 좋았습니다.
728x90
반응형
LIST
'TIL' 카테고리의 다른 글
7_2.트랜잭션 전파 옵션과 DB Read/Write 분리 (0) | 2025.02.07 |
---|---|
7_1.트랜잭션 개념과 선언적/프로그래밍 방식 비교 (0) | 2025.02.04 |
6_4.CAP 이론과 일관성 전략 (0) | 2025.02.04 |
6_3.DB 락 메커니즘 이해와 실습 (0) | 2025.02.02 |
6_2.트랜잭션 격리 수준 이해 및 실습 (0) | 2025.02.01 |