- Today
- Total
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |
- 팀네이버
- infra
- JWT
- SpringBoot
- 브랜치전략
- 만들면서 배우는 클린 아키텍처
- LazyInitialization
- JPA
- Project
- Java
- spring
- 팀네이버 공채
- container
- EntityTransaction
- docker
- redis
- Kotlin
- chrome80
- 리뷰
- 젠킨스
- websocket
- 후기
- 캐싱전략
- 스프링
- 책
- Spring Security
- network
- 프로젝트
- SPRING JWT
- jenkins
PPAK
Spring Cloud Data Flow 를 사용하며 본문
지난 포스팅 에서 Spring Cloud Data Flow(이하 SCDF) 에 대해 설명했는데 1년이 조금 넘는 시간동안 추가적으로 운영하면서 느낀점을 남기려고 한다.
SCDF에 대해서는 생소할 수 있는데, 마이크로 서비스 기반의 스트리밍, 배치 처리 플랫폼이라 하고 쉽게 이야기하면 Spring Cloud Stream, Spring Clodu Batch/Task 파이프라인을 손쉽게 구성할 수 있는 추상화된 기능을 제공한다. 또한, K8S, Cloud Foundry 에서 손쉽게 배포가 가능하다.
안타깝게도 최근에 올라온 포스팅에 따르면 더이상 오픈소스로 공개하지 않는다고 하는데 여러 내부적인 사유가 있겠지만 눈에 띄었던 것은 아래 문구인데
The vast majority of usage we see for Spring Cloud Data Flow exists within our Tanzu enterprise customers. Open-source usage represents a very small part of the overall adoption today with an equally small contribution of the maintenance provided by the community.
전반적으로 사용자가 적고 동시에 오픈소스 컨트리뷰터 역시 적기 때문에 오픈소스 프로젝트를 운영하기 위한 제약이 리소스 낭비로 여겨져 상용 제품으로 변경한다는 내용이었다. 나도 근 1년 넘게 꾸준히 사용하는 플랫폼임에도 어떠한 의견을 내지 않고 사용했기에 다소 아쉬운 마음이 들었다.
물론 여전히 오픈 소스 버전으로도 최신 버전의 Spring 생태계 연동을 지원하고 아래에 설명하겠지만 기능이 강력하기 때문에 메리트가 있는 기술이라고 생각한다. LTS 는 생각보다 더 오랜 기간 사용된다.
나도 SCDF 를 접하게 된 것은 굉장한 우연이었는데, 처음 속하게 된 팀에서 해당 기술을 사용하고 있었고 1~2달 정도 뒤에 예상치 못한 조직 변동으로 새로운 팀에 합류하게 되었는데 잠깐 사용했던 SCDF 가 유용해 보이기도 했고 직접 더 써보고 싶은 마음에 팀 내에 직접 도입했다.
개요
SCDF 는 Spring Cloud Stream, Spring Cloud Task/Batch 로 작성된 애플리케이션을 손쉽게 파이프라이닝 하고 모니터링할 수 있는 기능을 제공한다. 이와 더불어서 K8S 배포를 지원하고, Kafka 와의 손쉬운 연동을 지원한다. (SCDF에서 Batch Job은 Task 로 래핑되어 실행되기 때문에 본 포스팅에서는 Stream과 Task 관점으로 설명한다)
작성된 설정 정보를 토대로 애플리케이션이 배포되는데 파이프라인을 구성하기 위한 대시보드가 제공된다. 파이프라인은 fan-in, fan-out 의 단방향 구조로 구성된다.
하나의 네모박스(?) 가 빌드된 Stream/Task 애플리케이션이라고 생각하면 된다. Stream 의 경우 위와 같이 파이프라인을 구축하면 자동으로 토픽을 정의하고 데이터 처리 결과(메시지)가 토픽으로 발행된다. Task 의 경우 파이프라인을 구축하면 앞에 위치한 Task 실행이 종료되면 다음 Task 가 실행되는 방식으로 진행된다.
Spring Cloud Data Flow 를 구성하는 인스턴스는 크게 dataflow server, skipper server 로 나뉘고 아래와 같이 역할이 나뉜다.
- Data Flow 서버 (Dataflow Server):
- 스트림 및 태스크를 정의하고 배포, 관리하는 역할.
- 스트림과 태스크의 상태를 추적하고 모니터링 제공.
- 스트림 및 태스크의 실행을 시작하고 중단(스트림의 경우 Skipper 서버로 실행 위임).
- 위 작업을 위한 대시보드 제공.
- Skipper 서버 (Skipper Server):
- 버전 업그레이드 및 롤백과 같은 배포 관련 작업을 수행.
- 서로 다른 환경 간에 애플리케이션 배포 지원.
dataflow server 와 skipper server 는 한번 배포해두면 크게 신경쓸 일은 없다.
배포 방법은 아래 공식 문서나 지난 포스팅 을 보면 조금 도움이 될 것 같다. SCDF 컴포넌트 자체가 구축하는 입장따라 크다면 크고 작다면 작은데, 개인적으로는 한번 구축해서 1년 넘도록 잘 쓰고 있어서 그리 아까운 시간은 아니었던 것 같다.
https://dataflow.spring.io/docs/installation/
도입을 고민하고 있다면 아래 체크리스트를 고려해서 오버 엔지니어링인지 판단하면 좋을 것 같다.
1. 우리 프로젝트는 버전에 민감한가(참고로 최신 버전 SCDF 는 k8s 1.30.x, Spring Boot 3.x 까지 지원한다)
2. Spring Cloud Stream/Batch 둘 다 자주 개발한다.
3. Kafka/RabbitMQ 기반 스트리밍 파이프라인이 필요하다.
4. Stream 파이프라인 버전 관리/롤백/업그레이드가 필요하다.
5. Stream 을 관리하기 위한 Web UI 가 필요하다
6. Container Orchestration 환경에서 Stream 배포가 필요하다
대부분의 플랫폼 성격을 띄는 기술이 그렇듯 SCDF 는 여러 기술과의 연동을 통해 특정 작업을 추상화 해둔 것이 많고, 이러한 부분을 사실 직접 구현할 수 있고, 이것이 귀찮은 일이 아니라면 새로운 기술을 도입할 필요도 없다고 생각한다.
특히 Task 만 개발하는 조직에서는 SCDF 를 쓰지 않더라도 모니터링, 스케줄링을 더 손쉽게 구성할 수 있을 것이다.
운영 팁
버전
Spirng Cloud Data Flow 2.10.3 을 사용하고 있고 메시지큐로 Kafka 를 사용한다.
바인딩
보통 stream 을 운영하면 function binding 을 많이 사용할 것인데 아래와 같이 설정하곤 한다.
- spring.cloud.stream.${bean name}-in-0: ${alias}
- spring.cloud.stream.bindings.${alias}.destination: ${topic}
(source의 경우 ${bean name}-out-0, processor의 경우 in/out 둘 다 작성)
하지만 SCDF 에서는 아래와 같이 설정하고, UI에서 파이프라인을 구성해주기만 하면 자동으로 토픽(destination) 을 정의해준다
단일 모듈 task, stream 관리
보통 Spring Cloud Task, Stream 을 운영할 때 특정 Bean 을 생성하고, 설정을 주입하기 위한 많은 방법이 존재할 것이다.
가령
1. @ConditionalOnProperty 로 특정 Bean 생성 제어
2. Task/Stream별로 실행 가능한 모듈을 생성, 빌드 타임에 특정 모듈을 빌드
목표는 설정 값을 토대로 특정 애플리케이션을 실행하고 싶을 것인데, 나는 기존 팀의 설정을 참고해 아래와 같이 구성했다.
1. 특정 설정 값(task, stream 이름)을 토대로 Enum 을 매핑하고 해당 Enum 을 가진 Bean 이 생성되도록 Condition 추가
2. Stream 작성시에 해당 Enum 을 커스텀 어노테이션과 함께 명시해줍니다
class StreamCondition : Condition {
override fun matches(context: ConditionContext, metadata: AnnotatedTypeMetadata): Boolean {
val targetStreamName = Optional.ofNullable(
context.environment.getProperty("custom.stream.name", String::class.java))
.orElseThrow { RuntimeException("set custom.stream.name") }
val streamName: CustomStreamNames = metadata.annotations
.get(CustomStream::class.java)
.getValue("value", CustomStreamNames::class.java)
.orElseThrow { RuntimeException("set stream name for CustomStream annotation") }
val isMatch = StringUtils.equals(targetStreamName.lowercase(Locale.getDefault()), streamName.name.lowercase(Locale.getDefault()))
if (isMatch) {
logger.info("[StreamCondition] Stream: $streamName is created")
}
return isMatch
}
}
@Configuration
@CustomStream(CustomStreamNames.DEMO_SOURCE)
class DemoSource {
@Bean
fun sendEvents(): Supplier<Message<DemoMessage>> {
return Supplier<Message<DemoMessage>> {
val demoMessage = DemoMessage("test", 10)
MessageBuilder.withPayload(DemoMessage("test", 10))
.setHeader("partitionKey", Random.nextInt())
.build()
}
}
}
또한 스트림마다 설정값이 다를 수 있는데, 특히 consumer, producer 설정이 스트림별로 다르면 특정 시점에 원하는 값만 주입되도록 설정을 해주어야 한다.
그리고 스트림을 처음 개발하는 개발자 입장에서 각종 설정에 대해 모두 이해하고 개발하는 것은 다소 비효율적일 수 있다. 나는 이 값을 타겟 Enum이 활성화될 때 모두 주입할 수 있도록 Enum 을 정의하는 시점에 값을 넣도록 세팅했다.
이러면 스트림에 익숙하지 않은 팀원들도 손쉽게 설정값들을 조정할 수 있고, 운영에 검증된 설정값을 열어둠으로써 사이드이펙트를 최소화 할 수 있다.
enum class StreamNames(
val streamConfig: StreamConfig,
val customConfig: Map<String, Any> = emptyMap()
) {
DEMO_SOURCE(
CustomSourceConfig(
"sendEvents",
fixedDelay = 100,
partitionConfig =
PartitionConfig(
partitionCount = 30,
partitionSelectorNameAndKeyExtractorName = Pair(
"roundRobinSelectorStrategy",
"randomPartitionKeyExtractorStrategy"
)
)
)
)
companion object {
fun getStreamConfigByName(name: String): Map<String, Any> {
val streamNames = valueOf(name)
return streamNames.streamConfig.getConfigMap() + streamNames.customConfig
}
fun getStreamLocalConfigByName(name: String): Map<String, Any> {
val streamNames = valueOf(name)
return streamNames.streamConfig.getLocalConfigMap() + streamNames.customConfig
}
}
}
class StreamEnvironmentPostProcessor : EnvironmentPostProcessor {
private val loader = YamlPropertySourceLoader()
override fun postProcessEnvironment(environment: ConfigurableEnvironment, application: SpringApplication) {
val streamName = Optional.ofNullable(environment.getProperty("custom.stream.name"))
.orElseThrow { RuntimeException("set property custom.stream.name") }
val activeProfile = Optional.ofNullable(environment.getProperty("spring.profiles.active"))
.orElseThrow { RuntimeException("set property spring.profiles.active") }
val streamConfigByName = if (activeProfile.equals("local-processor") && streamName.lowercase().contains("processor")) {
StreamNames.getStreamLocalConfigByName(streamName.uppercase(Locale.getDefault()))
} else {
StreamNames.getStreamConfigByName(streamName.uppercase(Locale.getDefault()))
}
MapPropertySource("stream-resource", streamConfigByName).also { environment.propertySources.addLast(it) }
}
}
로컬 테스트 환경 구성
로컬 환경에서 Stream 테스트를 하는 것은 귀찮(?)다. 특히 binder, destination(토픽) 설정을 생각하는게 번거롭다. 따라서 아래와 같은 워크플로우로 개발 가능하게 설정했다.
1. Stream 컴포넌트(source, processor, sink) 로직 작성
2. 타겟 Stream 설정
3. Run Stream(테스트 시작)
그러기 위해서는 모든 stream 의 function binding 별칭이 input, output 으로 고정되어 있는 SCDF 스펙을 따르면서, 각각의 바인더의 destination 을 위와 같이 잡았다. 단, processor 의 경우 destination 이 2개 존재해야하기 때문에 어쩔 수 없이 local 프로필에 한해서 설정을 분기시켰다.
그리고, server.port=0, 인텔리제이 Build and Run > Modify Options > Allow Multiple Instances 를 활성화하면 새로운 Stream 컴포넌트가 추가되어도 위와 같은 워크플로우로 동일하게 테스트할 수 있도록 했다.
spring:
cloud:
stream:
bindings:
output:
destination: local.topic
inputProcessor:
destination: local.topic
outputSink:
destination: local.topic.outputSink
input:
destination: local.topic.outputSink
배포
배포는 크게 아래와 같은 순서로 이루어진다.
1. Stream 이미지 빌드
2. Dataflow Server로의 이미지 등록 요청
3. 대시보드 내에서 실행
Dataflow Server로의 이미지 등록 요청의 경우 다시 두 가지 방법으로 나뉠 수 있는데
1. Spring Cloud Data Flow Rest Client 를 이용한 이미지 등록
2. 대시보드 내에서 이미지 등록(docker://${image url})
대시보드에서 이미지를 등록할 경우 수동으로 (스트림 이름, 이미지 주소)를 기입해야 하는데, 반복적으로 이미지를 빌드하고 배포하려다 보면 은근 귀찮은 작업이기도 하고 오타가 있으면 배포가 정상적으로 수행되지 않는 문제가 있다.
따라서, SCDF Rest Client 를 지원하는 서버를 통해 CI 단계에서 이미지 빌드가 끝나면 자동으로 대시보드에 등록되도록 했다.
운영
Task의 경우 로직만 작성하면 대시보드에서 k8s Cron Job 을 손쉽게 등록할 수 있고 Task 실행시에 리소스도 제어할 수 있다는 장점이 있다. 그리고 무엇보다 Task 간의 단방향 의존 관계를 복잡한 코드가 아니라 UI로 줄을 이어서 실행하기만 하면 되는 것도 편리하다.
Stream 역시 UI 로 스트림 파이프라인을 관리할 수 있고 모니터링이나, 스케일링(replica)을 UI 레벨에서 수행할 수 있어서 매우 직관적이다. 또, 버전 관리를 자동으로 해주기 때문에 배포가 잘못되었을 때 버튼 하나로 롤백처리까지 가능하다는 장점이 있다.
이 외에 스트림 관련 세부 설정은 스트림별 세부 설정 객체를 생성하고 주입하는 방식으로 운영하고 있다.
class CustomSinkConfig (
private val inputMethodName: String,
private val fixedDelay: Long = 10,
private val autoOffsetReset: String = "latest",
private val metadataAge: Int = 30000,
private val maxPollRecords: Int = 500,
private val maxPollInterval: Long = 300000,
private val concurrency: Int = 3,
): StreamConfig {
companion object {
val DEFAULT_SINK_CONFIG = { outputMethodName: String -> CustomSinkConfig(outputMethodName)}
}
override fun getConfigMap(): Map<String, Any> {
return mapOf(
"spring.cloud.stream.function.bindings.${inputMethodName}-in-0: " to inputName,
"spring.integration.poller.fixed-delay" to "${fixedDelay}ms",
"spring.kafka.consumer.auto-offset-reset" to autoOffsetReset,
"spring.kafka.consumer.properties.metadata.max.age.ms" to metadataAge,
"spring.kafka.consumer.max-poll-records" to maxPollRecords,
"spring.kafka.consumer.properties.max.poll.interval.ms" to maxPollInterval,
"spring.cloud.stream.bindings.${inputName}.consumer.concurrency" to concurrency
)
}
override fun getLocalConfigMap(): Map<String, Any> {
throw UnsupportedOperationException("Not implemented")
}
}
느낀점
단일 애플리케이션에서 멀티스레딩만으로 성능을 극대화하는 데에는 한계가 있다. 처리해야 할 데이터의 양이 많은데 개별 처리가 가능한 경우, 데이터를 잘 분리해 여러 개의 배치 프로세스를 병렬로 실행할 수도 있지만, 이미 잘 구축된 스트림 처리 시스템이 있다면 모든 데이터를 토픽으로 발행하고 파티션을 늘린 뒤, 스트림 프로세스를 병렬로 실행하기만 하면 손쉽게 확장 가능한 병렬 데이터 처리 시스템을 구축할 수 있다. (물론 로그 파일이나, 정산 데이터처럼 한번에 집계되어야 하는 데이터는 Hadoop MapReduce, Spark 가 유리할 수 있으니 상황에 맞게 전환을 고려해야 한다)
내가 SCDF 를 도입할 때에 Spring Cloud Stream 이라는 프레임워크 자체에 대한 이해도가 낮았는데 SCDF를 도입하면서 Stream 데이터를 처리하는 방식에 대한 이해도 높아졌고 SCDF가 왜 나오게 되었는지 직간접적으로 느낄 수 있었다. 그리고 무엇보다 Stream 자체를 여러 유즈케이스에서 설명하는 것처럼 설정과 배포 과정에서 개발자가 신경써주어야 하는 많은 부분을 SCDF 가 담당하기 때문에 이러한 관점에서 SCDF 는 (특히) 스트림을 관리하고 개발하는 개발자들에게 정말 용이한 도구라고 생각한다.
SCDF를 유용하게 쓰고 있기도 했고 노하우를 쌓아 나가는 입장에서 SCDF가 오픈소스 버전을 더이상 지원하지 않는다는 점은 매우 아쉽다😢. 하지만, 여러 제한된 환경으로 인해 아직 SCDF 의 최신 버전조차 사용하지 않고 있기 때문에 당분간(n년간) 큰 변동은 없을거라 생각한다. 개인적인 생각으론 현재 나와 있는 최신 버전 이후로 Spring Cloud Stream(또는 Kafka Binder) 이나 k8s 인터페이스에 변화가 없지 않는 이상 호환은 계속될 것이라고 생각한다. 그래서 최신 버전을 빠르게 따라가지 않아도 되는 프로젝트라면 조심스럽지만 SCDF 도입을 한번 검토해보길 추천한다.
'spring' 카테고리의 다른 글
mulipart/form-data 다루기(feat. feign-client, restTemplate) (1) | 2024.04.15 |
---|---|
Spring Rest Docs (2) | 2023.12.11 |
[Spring/JWT] Access Token 과 Refresh Token 을 어디에 저장하고 어떻게 교환해야 할까? (3) | 2022.08.27 |
[Spring/Spring Boot] 서버 https 적용 (Certbot, Let's Encrypt) (0) | 2022.08.27 |
[Spring/SpringBoot] SpringBoot 로컬 서버 Https 적용 (0) | 2022.08.17 |