SW/Java

Java 배압(Backpressure) : 개념, 실제 사례, 구현

얇은생각 2023. 12. 20. 07:30
반응형

배압은 데이터 생산과 소비량의 균형을 유지하여 시스템 과부하를 방지합니다. 자바의 Flow API는 애플리케이션에서 효과적인 배압 구현을 가능하게 합니다.

특히 데이터 스트림을 다룰 때, 배압은 소프트웨어 개발에서 중요한 개념입니다. 데이터 생산과 소비율 사이의 균형을 유지하는 제어 메커니즘을 말합니다. 이 글에서는 배압의 개념과 그 중요성, 실제 사례, 그리고 자바 코드를 이용한 구현 방법에 대해 알아보겠습니다.

 

 

Java 배압(Backpressure) : 개념, 실제 사례, 구현

 

 

배압에 대한 이해

데이터 스트리밍과 관련된 시스템에서 데이터 생산 속도가 소비 속도를 초과할 수 있는 경우에 사용되는 방법이 바로 배압입니다. 이러한 불균형은 자원 고갈로 인한 데이터 손실이나 시스템 충돌을 초래할 수 있습니다. 배압은 소비자가 더 많은 데이터에 대한 준비가 되었을 때 생산자에게 신호를 보내 소비자의 부담을 방지할 수 있습니다.

 

 

배압의 중요성

배압 관리가 없는 시스템에서는 소비자가 유입되는 데이터를 처리하는 데 어려움을 겪을 수 있으며, 이는 느린 처리, 메모리 문제 및 심지어 충돌로 이어질 수 있습니다. 개발자는 배압을 구현함으로써 무거운 부하에서도 애플리케이션이 안정적이고 응답성이 높으며 효율적으로 유지되도록 보장할 수 있습니다.

 

실제 사례

비디오 스트리밍 서비스

넷플릭스, 유튜브, 훌루와 같은 플랫폼은 사용자의 기기와 네트워크가 유입되는 데이터 스트림을 처리할 수 있도록 보장하는 동시에 역압을 활용하여 고품질의 비디오 콘텐츠를 제공합니다. 적응형 비트레이트 스트리밍(ABS)은 사용자의 네트워크 조건과 기기 기능에 따라 비디오 스트림 품질을 동적으로 조정하여 과도한 데이터로 인한 잠재적 문제를 완화합니다.

 

트래픽 관리

역압은 고속도로의 교통관리와 비슷합니다. 한꺼번에 너무 많은 차가 고속도로에 진입하면 정체가 발생해 속도가 느려지고 통행시간이 늘어납니다. 고속도로로 향하는 차량의 흐름을 교통신호기나 램프미터를 이용하면 정체를 줄이고 최적의 속도를 유지할 수 있습니다.

 

자바에서의 배압 구현

자바는 Java 9에 도입된 Flow API를 통해 배압 처리를 위한 내장된 메커니즘을 제공합니다. Flow API Reactive Streams 사양을 지원하므로 개발자들이 배압을 효과적으로 처리할 수 있는 시스템을 만들 수 있습니다.

자바의 Flow API를 사용하는 간단한 생산자-소비자 시스템의 예는 다음과 같습니다:

import java.util.concurrent.*;
import java.util.concurrent.Flow.*;

public class BackpressureExample {

    public static void main(String[] args) throws InterruptedException {
        // Create a custom publisher
        CustomPublisher<Integer> publisher = new CustomPublisher<>();

        // Create a subscriber and register it with the publisher
        Subscriber<Integer> subscriber = new Subscriber<>() {
            private Subscription subscription;
            private ExecutorService executorService = Executors.newFixedThreadPool(4);

            @Override
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                System.out.println("Received: " + item);
                executorService.submit(() -> {
                    try {
                        Thread.sleep(1000); // Simulate slow processing
                        System.out.println("Processed: " + item);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscription.request(1);
                });
            }

            @Override
            public void onError(Throwable throwable) {
                System.err.println("Error: " + throwable.getMessage());
                executorService.shutdown();
            }

            @Override
            public void onComplete() {
                System.out.println("Completed");
                executorService.shutdown();
            }
        };

        publisher.subscribe(subscriber);

        // Publish items
        for (int i = 1; i <= 10; i++) {
            publisher.publish(i);
        }

        // Wait for subscriber to finish processing and close the publisher
        Thread.sleep(15000);
        publisher.close();
    }
}

 

class CustomPublisher<T> implements Publisher<T> {
    private final SubmissionPublisher<T> submissionPublisher;

    public CustomPublisher() {
        this.submissionPublisher = new SubmissionPublisher<>();
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        submissionPublisher.subscribe(subscriber);
    }

    public void publish(T item) {
        submissionPublisher.submit(item);
    }

    public void close() {
        submissionPublisher.close();
    }
}

 

 

이 예제에서는 기본 제공된 제출 게시자를 감싸는 사용자 지정 게시자 클래스를 만듭니다.

Custom Publisher는 특정 비즈니스 로직 또는 외부 소스를 기반으로 데이터를 생성하도록 추가로 사용자 지정할 수 있습니다.

수신된 항목을 ExecutorService를 사용하여 병렬로 처리하도록 Subscriber 구현이 수정되었습니다. 이를 통해 Subscriber는 더 많은 양의 데이터를 더 효율적으로 처리할 수 있습니다. 이제 onComplete() 메서드가 ExecutorService를 종료하여 제대로 정리할 수 있습니다.

onError() 메서드에서도 오류 처리가 향상됩니다. 이 경우, 오류가 발생하면 resource를 해제하기 위해 executorService를 종료합니다.

 

 

결론

배압은 데이터 스트리밍 시스템을 관리하기 위한 필수 개념으로, 소비자가 들어오는 데이터를 부담 없이 처리할 수 있도록 보장합니다. 개발자는 배압 기술을 이해하고 구현함으로써 보다 안정적이고 효율적이며 신뢰성 있는 애플리케이션을 만들 수 있습니다. 자바의 Flow API는 개발자들이 반응형 프로그래밍의 잠재력을 최대한 활용할 수 있도록 배압 인식 시스템을 구축하기 위한 훌륭한 기반을 제공합니다.

반응형