SW/클라우드 서비스 아키텍처

Apache Kafka와 Camel을 활용한 데이터 스트림 처리

얇은생각 2024. 9. 24. 07:30
반응형

오늘날의 데이터 중심 환경에서는 실시간 데이터 스트림 처리가 매우 중요해졌습니다. 특히, 금융, 전자상거래, 헬스케어 등 다양한 산업에서 실시간 데이터를 처리하여 빠르게 반응하는 애플리케이션을 구축하는 것이 필수적입니다. 이러한 데이터 처리 요구 사항을 충족시키기 위해 Apache KafkaApache Camel이 널리 사용되고 있습니다.

이 글에서는 Apache KafkaApache Camel을 활용하여 실시간 데이터 스트림을 처리하는 방법을 다루고, 두 기술의 장점과 구현 방법을 소개합니다. 또한, 실습 예제를 통해 Oracle DB에서 데이터를 읽고 Kafka 클러스터로 보내는 방법과, Kafka에서 데이터를 읽어 Oracle DB에 쓰는 과정을 살펴보겠습니다.

 

 

Apache Kafka와 Camel을 활용한 데이터 스트림 처리

 

 

Apache Kafka란?

Apache Kafka는 실시간 데이터를 처리하는 이벤트 스트리밍 플랫폼입니다. LinkedIn에서 처음 개발되었고, 이후 Apache Software Foundation에 의해 오픈 소스로 제공되고 있습니다. Kafka는 대량의 실시간 데이터를 처리하고, 데이터 파이프라인과 스트리밍 애플리케이션, 마이크로서비스를 구축하기 위한 확장성과 내결함성을 갖춘 아키텍처를 제공합니다.

Kafka는 퍼블리시-구독(publish-subscribe) 메시징 모델을 채택하고 있으며, 데이터를 **토픽(topic)**이라는 단위로 분류합니다. **생산자(producer)**는 토픽에 메시지를 전송하고, **소비자(consumer)**는 실시간으로 해당 메시지를 수신합니다. Kafka의 핵심은 확장성내결함성에 있으며, 데이터를 여러 노드에 분산 저장하고 여러 브로커 간에 복제하여 노드 장애가 발생하더라도 데이터를 안정적으로 사용할 수 있습니다.

Kafka의 아키텍처는 주로 브로커(broker), 프로듀서(producer), 컨슈머(consumer), 그리고 **토픽(topic)**과 같은 주요 구성 요소로 이루어져 있습니다.

  • 브로커: 메시지 큐를 관리하며, 메시지의 영속성을 처리합니다.
  • 프로듀서: 데이터를 Kafka 토픽에 게시합니다.
  • 컨슈머: Kafka 토픽에서 데이터를 구독하고 수신합니다.
  • 토픽: 메시지가 게시되고 구독되는 통신 채널입니다.

Kafka는 또한 Kafka ConnectKafka Streams와 같은 API와 도구를 제공하여 데이터를 처리하고 스트리밍 애플리케이션을 쉽게 구축할 수 있습니다. Kafka Connect는 외부 시스템과의 데이터 통합을 위한 파이프라인을 구축하는 데 사용되며, Kafka Streams는 스트리밍 애플리케이션을 구축하는 데 사용되는 고수준 API를 제공합니다.

 

 

Apache Camel이란?

Apache Camel은 다양한 프로토콜과 기술을 연결하여 통합 애플리케이션을 구축할 수 있는 오픈 소스 프레임워크입니다. Camel은 경량의 통합 플랫폼으로, **EIP(Enterprise Integration Patterns)**를 사용하여 서로 다른 시스템 간의 통합을 쉽게 구성할 수 있습니다. Camel의 주요 특징은 유연성확장성에 있으며, 다양한 데이터 소스 및 대상 간의 통합을 위한 라우트(route) 기반의 프로세스를 정의할 수 있습니다.

Camel과 Kafka를 함께 사용하면, 데이터 스트림을 실시간으로 처리하고 다른 시스템과 손쉽게 통합할 수 있습니다.

 

 

Apache Kafka와 Camel을 활용한 데이터 스트림 처리

Apache Camel-Kafka 컴포넌트를 사용하면 Camel과 Kafka를 손쉽게 통합하여 데이터 스트림을 처리할 수 있습니다. Camel을 사용하여 Kafka로 데이터를 보내거나 Kafka에서 데이터를 수신하는 애플리케이션을 구축할 수 있으며, 이를 통해 복잡한 데이터 파이프라인을 간단하게 관리할 수 있습니다.

다음 단계는 Kafka와 Camel을 이용해 데이터를 스트리밍하는 방법을 설명합니다.

 

1. Kafka 브로커 준비 및 토픽 생성

Kafka 클러스터를 구축하고, Kafka 브로커를 준비합니다. 그런 다음, 데이터를 송수신할 토픽을 생성합니다. 예를 들어, 데이터를 보낼 토픽을 my-topic으로 설정할 수 있습니다.

 

2. Camel 프로젝트 설정 및 의존성 추가

Camel 프로젝트를 설정하고, Camel-Kafka 컴포넌트를 포함한 필요한 의존성을 추가합니다. Apache Maven 또는 Gradle을 사용하여 Camel과 관련된 라이브러리를 추가할 수 있습니다.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>3.x.x</version> <!-- 해당하는 버전 입력 -->
</dependency>
 
 
 

3. Camel 라우트 정의

이제 Camel 라우트를 정의하여 Oracle DB에서 데이터를 읽어 Kafka로 전송하는 코드를 작성합니다.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.springframework.stereotype.Component;

@Component
public class OracleDBToKafkaRouteBuilder extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        // Oracle DB 설정
        String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl";
        String oracleDBUser = "username";
        String oracleDBPassword = "password";
        String oracleDBTable = "mytable";
        String selectQuery = "SELECT * FROM " + oracleDBTable;

        // Kafka 설정
        String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092";
        String kafkaSerializer = "org.apache.kafka.common.serialization.StringSerializer";

        from("timer:oracleDBPoller?period=5000")
            // Oracle DB에서 데이터 읽기
            .setBody(simple(selectQuery))
            .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword)
            .split(body())  // 데이터를 여러 조각으로 나눔
            // Kafka로 직렬화하여 전송
            .setHeader(KafkaConstants.KEY, simple("${body.id}"))
            .marshal().string(kafkaSerializer)
            .to(kafkaEndpoint);
    }
}
 
 

이 코드는 주기적으로 Oracle DB에서 데이터를 조회하여 Kafka 클러스터로 전송하는 라우트를 정의합니다.

 

 

4. Kafka에서 데이터를 읽어 DB에 쓰기

이제 Kafka에서 데이터를 수신하고 Oracle DB에 쓰는 라우트를 정의합니다.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.springframework.stereotype.Component;

@Component
public class KafkaToOracleDBRouteBuilder extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        // Kafka 설정
        String kafkaEndpoint = "kafka:my-topic?brokers=localhost:9092";
        String kafkaDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";

        // Oracle DB 설정
        String oracleDBEndpoint = "jdbc:oracle:thin:@localhost:1521:orcl";
        String oracleDBUser = "username";
        String oracleDBPassword = "password";
        String oracleDBTable = "mytable";

        from(kafkaEndpoint)
            // Kafka에서 데이터 수신
            .unmarshal().string(kafkaDeserializer)
            .split(body().tokenize("\n"))
            // Oracle DB에 데이터 쓰기
            .setBody(simple("INSERT INTO " + oracleDBTable + " VALUES(${body})"))
            .to("jdbc:" + oracleDBEndpoint + "?user=" + oracleDBUser + "&password=" + oracleDBPassword);
    }
}
 
 

이 코드는 Kafka에서 받은 데이터를 Oracle DB 테이블에 삽입하는 라우트를 정의합니다.

 

 

결론

Apache Kafka와 Camel을 활용하면 대규모 실시간 데이터 스트림을 쉽게 처리할 수 있으며, 다양한 시스템 간의 통합을 원활하게 수행할 수 있습니다. Camel의 유연한 라우트 기반 구조와 Kafka의 확장성 있는 아키텍처를 결합하면 복잡한 데이터 파이프라인을 효과적으로 구축할 수 있습니다.

Kafka와 Camel을 활용한 데이터 스트림 처리 기술은 대규모 데이터 트래픽을 처리해야 하는 다양한 산업에서 널리 사용되고 있으며, 앞으로도 실시간 데이터 처리 요구 사항이 증가함에 따라 그 중요성은 더욱 커질 것입니다.

반응형