Component Abstraction
개요
Spring Cloud Stream의 탄생 배경
"왜 Spring Cloud와 메시지 큐가 연결되었는가?"
Spring Cloud
는 클라우드 네이티브 애플리케이션과 마이크로 서비스 개발을 위한 도구 모음이다.
마이크로서비스 간 통신에서 메시지 큐는 핵심 컴포넌트로 비동기 통신을 가능하게 하기때문에 마이크로서비스 아키텍처에서 필요하게 되었다.
또한, 마이크로서비스 환경에서는 서비스간 느슨한 결합(loose coupling)이 중요하기 때문에, 메시지 큐로 서비스 간 비동기 통신을 제공하여 이 문제를 해결하여 분산 시스템 문제를 해결할 수 있다.
Spring Cloud Stream이 만들어진 이유
현대에는 RabbitMQ, Kafka, Amazon SQS 등 다양한 메시지 브로커가 존재하며, 각각 사용법과 개념이 달라 애플리케이션 코드가 특정 기술에 종송되는 문제가 있기 때문에 여러 메시징 기술의 추상화가 필요했다.
Spring Cloud Stream
은 바인더(Binder)라는 개념을 통해 다양한 메시징 시스템을 추상화하였다.
개발자는 기본 메시징 시스템을 알 필요 없이 동일한 코드로 작업하여 일관된 프로그래밍 모델을 제공할 수 있게되었다.
또한 간단한 설정 변경만으로 RabbitMQ에서 Kafka로 또는 반대로 전환이 가능하기 때문에, 기반 기술이 변경되어도 비즈니스 로직 코드는 그대로 유지된다.
SCS Property 구조
╭───────────────────────╮ ╭────────────────────╮
│ spring.cloud.function │ ─→ │ FunctionProperties │
╰───────────────────────╯ ╰────────────────────╯
╭─────────────────────╮ ╭──────────────────────────╮
│ spring.cloud.stream │ ─→ │ BindingServiceProperties │
╰─────────────────────╯ ╰──────────────────────────╯
╭──────────────────────────────╮ ╭───────────────────────────────────────╮
│ spring.cloud.stream.function │ ─→ │ StreamFunctionConfigurationProperties │
╰──────────────────────────────╯ ╰───────────────────────────────────────╯
SCS
에서 제공하는 프로퍼티는 위의 컴포넌트가 대부분이다. SCS
에서는 거의 대부분의 클라우드 스트림 구성을 위 정보를 이용해 진행한다.
예를 들어 SCS
에서 추상화 하고 있는 바인딩 정보는 아래와 같다.
@ConfigurationProperties("spring.cloud.stream")
@JsonInclude(Include.NON_DEFAULT)
public class BindingServiceProperties
implements ApplicationContextAware, InitializingBean {
...
private Map<String, BindingProperties> bindings = new ConcurrentHashMap<>();
...
}
해당 추상화 바인딩 정보는 각 미들웨어(Message Broker)에서 여러 정보로 매핑된다. Rabbit의 경우 Exchange, Kafka의 경우 Topic으로 대응된다.
BindingProperties
즉 BindingProperties
는 연결할 미들웨어의 바인딩 대상이 된다. 위의 코드를 기준으로 바인딩을 비교해 본다면 아래와 같이 대응 된다:
spring:
cloud:
stream:
bindings:
GameResultProducer-out-0: # 채널명
destination: GameResultProducer # 미들웨어 내 바인딩 대상
GameResultConsumer-in-0:
destination: GameResultConsumer
BindingProperties
는 미들웨어 대상(RabbitMQ의 Exchange, Kafka의 Topic)을 추상화한 SCS
의 바인딩 객체이다. 바인딩 대상은 Producer
가 될 수도있고,
Consumer
가 될 수 도있다.
바인딩 정보로서 추상화되며 아래의 내용을 포함한다:
- destination: 바인더가 바인드하는 브로커에서의 물리적인 이름을 의미한다.
RabbitMq
의 경우 Exchange의 이름으로, Kafka의 경우 Topic의 이름으로 정의한다.
- group
- 그룹의 경우 Consumer에게만 적영되며, 소속될 바인딩의 고유한 이름이다. 많은 컨슈머가 같은 그룹 내에서 구독을 공유한다.
- null, 빈 문자열 값은 익명그룹을 나타내며 공유 되지않는다.
- 즉 그룹은 각 미들웨어에서 컨슈머를 묶는 그룹으로 이해하면 된다.
RabbitMQ
의 경유 Queue,Kafka
의 경우 Consumer Group과 연결된다.
- contentType
- 이벤트 내에서 이 바인딩으로 사용될 콘텐츠의 유형을 의미한다. HTTP 스펙에서 사용되는 MIME Type과 동일하다. 기본값은
application/json
- 메세지 헤더에 지정되지 않은 경우 이 바인딩에서 사용될 콘텐츠 유형을 지정한다.
- 이벤트 내에서 이 바인딩으로 사용될 콘텐츠의 유형을 의미한다. HTTP 스펙에서 사용되는 MIME Type과 동일하다. 기본값은
- bindier
- 여러개의 바인더가 사용 가능할 경우 이 바인딩에서 사용할 바인더명
- 예: rabbit
- consumer
- 추가적인 컨슈머 프로퍼티 (
ConsumerProperties
)
- 추가적인 컨슈머 프로퍼티 (
- producer
- 추가적인 프로듀서 프로퍼티 (
ProducerProperties
)
- 추가적인 프로듀서 프로퍼티 (
기본 아키텍쳐 및 컴포넌트의 역할
╭────────────╮ ╭─────────────────────╮
│ Service A │ ←─→ │ Spring Cloud Stream │ ←──╮
╰─(Producer)─╯ ╰─(Abstraction Layer)─╯ ↓
╭──────────────────╮
│ RabbitMQ │
╰─(Message Broker)─╯
╭────────────╮ ╭─────────────────────╮ ↑
│ Service B │ ←─→ │ Spring Cloud Stream │ ←──╯
╰─(Consumer)─╯ ╰─(Abstraction Layer)─╯
Spring Cloud Stream은 마치 레고 블록과 같다. 여러 레고 조각(마이크로서비스)들이 서로 연결될 수 있도록 표준화된 연결 부분(메세지 시스템 추상화)을 제공한다. 이러한 구성을 만들기위해 몇가지 컴포넌트 개념이 있다.
대략적으로 서비스에 설정되는 정보는 다음과 같다:
spring:
cloud:
stream:
bindings:
(...설정 A...)
spring.cloud.stream.bindings
아래에 들어가는 바인딩 설정은 SCS에서 추상화한 함수와 Binder를 잇는 Binding을 의미한다.
또한, spring.cloud.stream
하위에 들어가는 바인딩 설정은 메시징 플랫폼과 위의 Binding을 잇는 Binding을 의미한다.
- BindingServiceProperties:
sping.cloud.stream
내 프로퍼티 정보 - BindingService:
BindingServiceProperties
설정 정보를 이용해 실제 바인딩을 구성 - InputBindingLifecycle:
Bindable
객체를BindingService
로 바인딩을 트리거하고 그 정보를 관리 - BindableFunctionProxyFactory:
Bindable
의 구현체이며 추상화 함수와 바인딩할 수 있는
MessageChannel
메세지 채널은 Spring integration에서 가져온 개념으로, 애플리케이션 내에서 메시지가 이동하는 파이프라인 역할
Binder SPI
Binder Service Provider Interface는 몇개의 인터페이스, 외부 유틸리티 클래스, 외부 미들웨어에 연결하기 위해 연결가능한 메카니즘을 제공하는 발견 전략들로 구성되어있다.
여기서 중요한 점은 SPI는 외부 미들웨어에 입력과 출력을 연결하기 위한 전략인 Binder
라는 것이다. 다음의 목록은 Binder
인터페이스의 정의를 보여준다.
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
default String getBinderIdentity() {
return String.valueOf(this.hashCode());
}
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
C, P
처럼 생산자, 소비자에 대한 설정이 Generic으로 파라미터화 되었으며 여러 확장 지점(Extended Point)을 제공한다:
- 입출력 바인딩 대상 - 메세지를 수신하거나 전송하는 대상(예: Kafka 토픽, RabbitMQ 큐 등)을 바인딩 할수 있는 지점을 의미
- 확장된 소비자 및 생산자 속성 - 특정 Binder 구현(Kafka Binder, RabbitMQ Binder 등)이 타입 안전한 방식으로 지원할 수 있는 추가 속성을 더할 수 있게 해준다는 의미
일반적인 바인더 구현체는 다음의 요소로 구성된다:
Binder
인터페이스를 구현하는 클래스- 메세징 미들웨어와의 연결 인프라를 설정하는
Binder
타입 빈을 생성하는 스프링@Configuration
클래스 - 하나 이상의 바인더 정의를 포함하며 클래스 패스에 위치하는
META-INF/spring.binders
파일:spring.binderskafka:\ org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
SCS LifeCycle
SCS의 경우 프레임워크 레벨에서 다음의 라이프사이클을 구현한다.
Diagram Loading...
실제로 바인딩이 진행되는 흐름은 Spring의 LifeCycle
로 구현되어있다.
이 LifeCycle
은 기본적으로 DefaultLifeCycleProcessor
에 의해 처리되는데, 이는 아래와 같다.
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new TreeMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || isAutoStartupCandidate(beanName, bean)) {
int startupPhase = getPhase(bean);
phases.computeIfAbsent(startupPhase,
phase -> new LifecycleGroup(phase, determineTimeout(phase), lifecycleBeans, autoStartupOnly)
).add(beanName, bean);
}
});
//라이프사이클이 실제 시작 되는 부분
if (!phases.isEmpty()) {
phases.values().forEach(LifecycleGroup::start);
}
}
lifecycleBeans는 클래스패스에 포함된 LifeCycle 하위 구현요소들이며, 이를 시작할지를 결정짓는 부분이다. 포스팅을 하는 시점에서는 위와 같이 14개정도가 포함 되어있었다. 어쨋든 동일한 Phase끼리 그룹화 되어, 각 그룹을 한번에 시작한다.
어쨋든 라이프 사이클 그룹을 시작하면 Bean
을 개별적으로 시작한다. 추상화된 바인딩 정보는 Bindable
이라는 추상화 객체로서 선언된 함수 Bean
을 바인딩한다.
DefaultLifecycleProcessor.java 파일 내용
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
for (String dependency : dependenciesForBean) {
doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() && (!autoStartupOnly || toBeStarted(beanName, bean))) {
if (logger.isTraceEnabled()) {
logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
}
try {
bean.start(); // 시작
}
catch (Throwable ex) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully started bean '" + beanName + "'");
}
}
}
}
또한 위 라이프사이클 빈 중에서 바인딩 관련은 InputBindingLifecycle
, OutputBindingLifecycle
이 있는데 Input
은 Consumer
들의 모든 바인딩 정보를,
Output
은 Function
등의 바인딩 정보를 가지고 있다.
아무튼 위 코드그룹 세번째 코드에 보여진 createAndBindInput(...)
메서드로 바인딩이 구성하는데, 이는 각 함수와 바인딩을 생성을 관리하는 BindableFunctionProxyFactory
이다.
SCS
는 철저하게 추상화된 부분만 관리하며, 실제 연동 및 바인딩은 Binder
구현체에게 맡긴다.
여기까지가 Spring Cloud Stream
이 추상화 하여 유연하게 서드파티 바인더와 연결하는 방법이다.