Issue
PR
Changes
기능 추가
@NoRepositoryBean
public interface ReactiveSortingRepository<T, ID> extends Repository<T, ID> {
// .. 생략
/**
* Returns all entities sorted by the given options with limit.
*
* @param sort the {@link Sort} specification to sort the results by, can be {@link Sort#unsorted()}, must not be
* {@literal null}.
* @param limit the {@link Limit} to limit the results, must not be {@literal null} or negative.
* @return all entities sorted by the given options.
* @throws IllegalArgumentException in case the given {@link Sort} is {@literal null} or {@link Limit} is
* {@literal null} or negative.
*/
default Flux<T> findAll(Sort sort, Limit limit) {
if (limit == null) {
throw new IllegalArgumentException("Limit must not be null");
}
if (limit.isUnlimited()) {
return findAll(sort);
}
final int maxLimit = limit.max();
if (maxLimit < 0) {
throw new IllegalArgumentException("Limit value cannot be negative");
}
if (maxLimit == 0) {
throw new IllegalArgumentException("Limit value cannot be zero");
}
if (maxLimit == Integer.MAX_VALUE) {
return findAll(sort);
}
return findAll(sort).take(maxLimit);
}
}
- limit 매개 변수에 대한 null 체크
- unlimited 및 maxLimit이 정수의 최대 값인 경우, 성능을 위해 findAll() 메서드 호출
- maxLimit이 음수 및 0 값인 경우, 예외 발생
- findAll() 메서드 호출 후, take() 메서드 호출한 값을 반환
테스트 코드 작성
/**
* Unit tests for {@link ReactiveSortingRepository}.
*
* @author YongHwan Kwon
*/
@ExtendWith(MockitoExtension.class)
class ReactiveSortingRepositoryTest {
@Spy ReactiveSortingRepository<Integer, Integer> repository;
@DisplayName("Entity fetching tests")
@Nested
class EntityFetchingTests {
@DisplayName("Return a single entity for a limit of one")
@Test
void shouldReturnOnlyOneEntityForLimitOfOne() {
when(repository.findAll(Sort.unsorted())).thenReturn(Flux.range(1, 20));
final Flux<Integer> results = repository.findAll(Sort.unsorted(), Limit.of(1));
StepVerifier.create(results)
.expectNext(1)
.expectComplete()
.verify();
verify(repository).findAll(Sort.unsorted());
}
@DisplayName("Return ten entities for a limit of ten")
@Test
void shouldReturnTenEntitiesForLimitOfTen() {
when(repository.findAll(Sort.unsorted())).thenReturn(Flux.range(1, 20));
final Flux<Integer> results = repository.findAll(Sort.unsorted(), Limit.of(10));
StepVerifier.create(results)
.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.expectComplete()
.verify();
verify(repository).findAll(Sort.unsorted());
}
@DisplayName("Return all entities with unlimited")
@Test
void shouldReturnAllEntitiesForUnlimited() {
when(repository.findAll(Sort.unsorted())).thenReturn(Flux.range(1, 20));
final Flux<Integer> results = repository.findAll(Sort.unsorted(), Limit.unlimited());
StepVerifier.create(results)
.expectNextSequence(IntStream.rangeClosed(1, 20).boxed().toList())
.expectComplete()
.verify();
verify(repository).findAll(Sort.unsorted());
}
@DisplayName("Return all entities with maximum integer limit")
@Test
void shouldReturnAllEntitiesForLimitOfMaxValue() {
when(repository.findAll(Sort.unsorted())).thenReturn(Flux.range(1, 20));
final Flux<Integer> results = repository.findAll(Sort.unsorted(), Limit.of(Integer.MAX_VALUE));
StepVerifier.create(results)
.expectNextSequence(IntStream.rangeClosed(1, 20).boxed().toList())
.expectComplete()
.verify();
verify(repository).findAll(Sort.unsorted());
}
@DisplayName("Exception tests for invalid limits")
@Nested
class ExceptionTests {
@DisplayName("Throw IllegalArgumentException for null limit")
@Test
void shouldThrowIllegalArgumentExceptionWhenNullLimitIsGiven() {
assertThrows(IllegalArgumentException.class, () -> repository.findAll(Sort.unsorted(), null));
}
@DisplayName("Throw IllegalArgumentException for invalid limit values")
@ParameterizedTest
@ValueSource(ints = { -1, Integer.MIN_VALUE, 0 })
void shouldThrowIllegalArgumentExceptionForInvalidLimits(int limitValue) {
assertThrows(IllegalArgumentException.class, () -> repository.findAll(Sort.unsorted(), Limit.of(limitValue)));
}
}
}
}
- 모든 분기문 테스트
- limit == null
- limit == MIN_VAL
- limit == -1
- limit == 0
- limit == 1
- limit == 10
- limit == MAX_VAL
- limit == Limit.unlimited()
Merge
최종 머지에는 실패했습니다.
메인테이너의 말을 요약하자면 아래와 같습니다.
Limit
을 추가하는 것은ReactiveSortingRepository
가 정렬 기능을 표현하는 인터페이스의 의도와 잘 맞지 않는다take
를 사용하여 기본 메서드를 구현하는 것은 모든 데이터를 완전히 로딩하므로 위험하다- 현재로서는
Limit
을 사용하는 쿼리 파생을 이용하기를 권장한다
분석
리액티브 프로그래밍은 비동기 데이터 스트림과 변화의 전파에 초점을 맞춘 패러다임입니다. 리액티브 리포지토리의 인터페이스 정의 시에는 논블로킹 처리와 백프레셔 대응이 중요합니다. ReactiveSortingRepository
인터페이스는 정렬 기능을 리액티브 맥락에서 제공하기 위해 설계되었습니다.
이러한 맥락에서 Limit
매개변수를 추가하는 것은 인터페이스의 본래 취지와 부합하지 않습니다. 리액티브 리포지토리에서는 결과를 제한하는 동기적인 작업이 잠재적으로 무한한 반응형 스트림의 원칙과 상충할 수 있으므로, 메서드 시그니처에 Limit
매개변수를 포함하는 것을 지양합니다.
take(n)
연산은 무한한 요소 스트림에서 첫 n개의 요소만 가져오는 방식으로 작동합니다. 이는 큰 데이터셋에 적용될 경우 전체 데이터를 메모리에 로드한 후 요청한 수만큼만 처리하고 나머지를 버리기 때문에, 비효율적이고 위험할 수 있습니다. 이러한 이유로, 인터페이스의 기본 메서드에서 take를 사용하는 것은 위험하다고 평가되었습니다.
메인테이너는 대신 쿼리 파생 방식을 권장합니다. 이는 개발자가 메서드를 호출할 때 제한 조건을 명시적으로 설정할 수 있도록 유도하는 방식입니다. 아래는 이를 예시하는 코드입니다:
public interface ReactiveUserRepository extends ReactiveSortingRepository<User, Long> {
Flux<User> findTop10ByName(String name);
}
여기서 findTop10ByName
메서드는 이름을 기준으로 최대 10명의 사용자를 반환하는 쿼리를 자동으로 생성합니다. Spring Data는 사용 중인 데이터베이스에 알맞은 제한 조건을 포함한 쿼리를 내부적으로 생성합니다.
느낀점
이전에 오픈소스 프로젝트에 기여한 경험은 있지만, 주로 번역이나 문서 수정에 그쳤습니다. 인제님의 도움을 받아 스프링 프로젝트에 코드 기여를 시도한 것은 제 개발 커리어에 중요한 이정표가 될 것 같습니다.
현업에서 리액터를 사용해본 경험이 없음에도, 이번 PR 작업과 그에 대한 분석을 통해 리액티브 프로그래밍의 개념과 리액터의 기능에 대한 이해를 깊게 할 수 있었습니다. 이 과정이 향후 더 나은 기술적 결정을 내리는 데 도움이 될 것으로 기대합니다.
추가 질문
인제님의 조언으로 `limit` 매개변수가 해당 인터페이스의 의도 자체에 반한다고 판단을 한 것인지, `take()` 메서드를 사용한 구현의 문제인지, 두가지 모두가 아니라면 메인테이너가 가진 올바른 방향이 무엇인지 알려줄 수 있냐는 질문을 던져봤습니다.
추후.. 답변을 받을 수 있을 것으로 예상됩니다.
참고
1. Flux.handle(BiConsumer<? super T, SynchronousSink<T>> handler):
handle 메서드는 Flux의 각 요소를 개별적으로 처리하는 데 사용됩니다.
현재 요소(t)와 SynchronousSink(sink) 두 매개변수를 받는 BiConsumer를 인자로 받습니다.
SynchronousSink는 handle 함수 내에서 하류(subscriber)와 상호작용할 수 있게 해주는 특별한 객체입니다. 요소를 발출(sink.next(t)), 스트림을 완료(sink.complete()), 혹은 에러를 신호(sink.error(Throwable t))할 수 있습니다.
2. SynchronousSink.requestedFromDownstream():
이 메서드는 하류(subscriber)가 준비된 아이템의 수를 반환합니다. 이는 Project Reactor에 의해 제공되는 백프레셔 지원의 일부입니다.
백프레셔(backpressure) 개념은 발행자(publisher)가 구독자(subscriber)를 압도하지 않도록 데이터 전송량을 제어하는 것과 관련이 있습니다.
3. SynchronousSink.next(T t):
이 메서드는 다음 요소를 하류(subscriber)에게 발출하는 데 사용됩니다.
주어진 핸들러에서는 하류에서 요청이 있는지(requestedFromDownstream() > 0) 확인한 후에 다음 아이템을 발출합니다(sink.next(t)).
하류에서 요청이 없으면, 요소를 발출하지 않음으로써 백프레셔 메커니즘을 존중합니다.
4. Flux.limitRequest(long n):
이 연산자는 상류(publisher)에 대한 요청을 n으로 제한합니다.
상류로부터 요청되는 요소의 수에 대한 연성 경계를 생성하여, 소스 Flux에서 요청되는 요소의 수를 제한하는 데 특히 유용합니다.
주어진 코드 스니펫에서 쿼리로 반환된 결과의 수를 제한하고자 할 때 유용합니다.