RxJS와 함께하는 함수형 리액티브 프로그래밍

🗓 2017-01-03

원글: https://www.sitepoint.com/functional-reactive-programming-rxjs/

이 글은 Moritz Kröger, Bruno Mota와 Vildan Softic의 검수를 받았다. SitePoint의 컨텐트들이 최고가 될 수 있게 도와주는 SitePont의 모든 동료 리뷰어들에게 감사를 표한다.

본론으로 들어가기 전에 한가지 중요한 질문을 던져 본다. "반응형 프로그래밍이란 무엇인가?" 지금 시점에서 가장 일반적인 답변은 "반응형 프로그래밍은 동시성 데이터 스트림을 이용한 프로그래밍"이 될 것이다. 그리고 적잖이 동시성이라는 단어가 비동기로 대체된 경우도 있을 것이다. 하지만 아마 이 글을 읽다 보면 스트림이 비동기일 필요가 없다는 것을 알게 될것이다.

"모든것이 스트림이다"라는 접근 방식은 우리가 직면한 프로그래밍 문제에 직접적으로 적용될 수 있음을 쉽게 알 수 있다. 결국 CPU라는 것도 단지 명령과 데이터로 구성된 스트림을 처리하는 장치일 뿐이니까 말이다. 우리의 목표는 이 스트림을 관찰(Observe)하고 필요한 경우 특정한 데이터로 변환하는 것이다.

반응형 프로그래밍라는 것은 사실 자바스크립트에겐 완전히 새로운 것은 아니다. 이미 프로퍼티 바인딩이라던지 혹은 이벤트 이미터 패턴이나 노드 제이에스의 스트림같은 것들을 다뤄 왔기 때문이다. 가끔 이런 우아한 방법들은 성능의 저하나 지나치게 복잡한 추상 혹은 디버깅상의 문제로 이어지게 될 때가 있다. 보통 이러한 단점들은 새로운 추상 레이어가 주는 이점 덕에 무시되곤 한다. 물론 우리의 작은 예제는 일반적인 애플리케이션을 반영하고 있지는 않지만 가능한 짧고 간결해야 한다.

주저하지 말고 일단 RxJS 가지고 놀아보자. RxJS는 다른 라이브러리들(jQuery 등등)에서 자주 사용하는 기술인 체이닝을 많이 사용한다. 메서드 체이닝에 관한 내용은 SitePoint 에서 확인할 수 있다.(Ruby버전)

스트림의 예

RxJS를 사용해보기 전에 나중에 작업하게 될 몇 가지 예들을 나열해보겠다. 물론 결국엔 반응형 프로그래밍과 스트림에 대한 전반적인 소개로 이어질 것이다.

일반적으로 스트림은 내부와 외부 스트림으로 구분된다. 전자는 인공적으로 우리가 통제 가능한 반면 후자는 외부의 출처로부터 들어온 우리가 통제 불가능한 것이다. 외부 스트림은 우리 코드에서 직접적으로 혹은 간접적으로 발동된다.

보통 스트림들은 우리를 기다려주지 않는다. 우리가 처리 가능하든 불가능하든 발생한다. 예를 들면 도로 위를 달리는 차들을 관찰(Observe) 한다고 할때 차들의 흐름을 다시 시작하게 할 수 없다는 말이다. 스트림은 우리가 스크림을 관찰하거나 혹은 아니거나 와 상관없이 발생한다. Rx 용어로 이것을 Hot observable이라고 부른다. 이와 반대로 Rx에는 Cold observables도 도입했는데 이것은 조금 더 표준적인 이터레이터처럼 동작하고 스트림에서 얻어지는 정보는 각 관찰자에 필요한 항목(item)들로 구성된다.

아래의 이미지는 외부 스트림들의 예를 보여준다. HTTP 요청들과 일반적으로 설정된 웹 훅들이 있고 마찬가지로 마우스나 키보드에 의한 UI 이벤트들도 보인다. 그리고 장치에서 GPS 센서나 가속도 센서 등 여러 센서들에서 데이터를 받을 수도 있다.

img

이미지에는 메시지라는 스트림도 포함하고 있는데 메시지는 여러 가지 형태로 나타날 수 있다. 가장 간단한 형태는 우리의 웹사이트와 다른 웹사이트와의 커뮤니케이션이다. 다른 경우는 웹 소켓과 웹 워커와의 통신이 포함된다. 후자의 경우의 코드를 잠깐 살펴보자.

워커의 코드는 바로 아래에 있다. 2부터 10의 10승까지의 소수를 구하는 코드인데 숫자가 발견되게 되면 바로 보고한다.

(function (start, end) {
    var n = start - 1;

    while (n++ < end) {
        var k = Math.sqrt(n);
        var found = false;

        for (var i = 2; !found && i <= k; ++i) {
            found = n % i === 0;
        }

        if (!found) {
            postMessage(n.toString());
        }
    }
})(2, 1e10);

기본적으로, 웹 워커는 아래와 같이 사용된다.(prime.js 파일에 있다고 가정) 간결함을 위해 결과에 대한 확인코드나 웹 워커 지원에 대한 체크 코드는 생략한다.

var worker = new Worker('prime.js');
worker.addEventListener('message', function (ev) {
    var primeNumber = ev.data * 1;
    console.log(primeNumber);
}, false);

웹 워커에 대한 더 자세한 내용과 자바스크립트 멀티 스레딩에 관한 내용은 Parallel.js와 함께하는 자바스크립트 병령처리 에서 찾아볼 수 있다.

위의 예제를 잘 살펴보면 보면, 소수는 양의 정수 사이에서 점근 적 분포를 따른다는 것을 알 수 있다. x에서 무한대까지 x / log(x)의 분포를 구하게 된다. 이 말은 시작할 때 더 많은 숫자가 나타난다는 것을 뜻한다. 아래의 이미지를 보면 조금 더 이해가 수월할 수 있다.(i.e. 단위시간당 소수를 찾아내는 양이 후반보다 초반이 많다)

img

관련은 없지만 사용자가 검색 창에 입력한 사용자의 입력을 봐도 유사점을 찾을 수 있다. 사용자는 검색을 위해 무언인가를 입력할 때 초반에는 빠른 속도로 입력하다가 점점 구체적인 내용이 될수록 각 키 입력 간의 시간차가 벌어지게 된다. 이때 입력에 따라 실시간으로 결과를 제공해 검색의 범위를 좁혀준다면 분명히 좋을 것 같다. 하지만 하나하나의 모든 키 입력(특히 초반에 무의식적으로 빠르게 입력되는 내용이나 충분히 상세화되지 않은 내용)에 반응을 하는 것은 원치 않는다.

위 두 가지의 시나리오에서 정답은 주어진 시간 간격 동안 이전의 이벤트들을 수집하는 것이다. 두 가지 시나리오의 차이점은 구해진 소수는 주어진 시간 간격 후에는 반드시 발생해야 한다는 것이다.(i.e. 일부 소수는 지연되어 발생하더라도..) 대조적으로 검색 창은 일정한 간격 동안 키 입력이 발생하지 않은 경우에만 새 요청을 발생시킨다. 그래서 키 입력이 감지될 때마다 타이머가 리셋된다.

RxJS로 해결하기

Rx는 observable 컬렉션들을 이용해 비동기와 이벤트 기반의 프로그램을 작성하기 위한 라이브러리다. 선언적 구문과 조합성으로 잘 알려져 있으며 쉬운 시간 처리와 오류 모델을 도입했다. 위에서 다루웠던 예제에선 특히 시간 처리에 관한 내용이었는데 RxJS를 이용하면 시간처리 말고도 다양한 이득을 얻을 수 있다.

RxJS의 기본 구성 요소는 생산자인 observables와 소비자인 observers로 구성된다. 이미 두 가지의 observables의 두 가지 유형에 대해서는 언급했었다.

  • Hot observables 는 구독을 하고 있지 않아도 발생된다.(e.g, UI 이벤트)
  • Cold observables 는 구독을 할때만 발생한다. 구독을 다시 시작하면 다시 발생된다.

Cold observables는 일반적으로 RxJS 안에서 사용되도록 변환된 배열이나 단일 값들을 나타낸다. 예를 들어 다음의 코드는 완료하기 전에 하나의 값만 생성하는 Cold observable 객체를 만든다.

var observable = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();
});

또한 observable 생성 함수에서 로직이 포함된 함수를 리턴할 수도 있다.

observable을 구독(subscribe) 하는 것은 observable의 종류(Cold, Hot)와 상관없이 동일하다. 구독할 때는 두가지 타입 모두 알림을 위한 문법인 onNext, onError 그리고 onComplete 이렇게 세 가지의 함수를 기본적으로 제공할 수 있는데 이중 onNext 콜백은 필수 요소다.

var subscription = observable.subscribe(
    function (value) {
        console.log('Next: %s.', value);
    },
    function (ev) {
        console.log('Error: %s!', ev);
    },
    function () {
        console.log('Completed!');
    }
);

subscription.dispose();

구독을 종료하는 가장 좋은 방법은 dispose 메서드를 이용하는 것이다. 이렇게 하면 필요한 정리 절차를 진행하게 된다. 그렇지 않으면 사용되지 않으면서 가비지 컬렉션도 되지 않는 자원으로 남을 수 있다.

구독하지 않으면 변수 observable 안에 포함 된 observable은 그저 Cold observable일 뿐이다. 그렇지만 publish 메소드를 이용해서 핫 시퀀스(i.e. 의사 구독을 수행)로 변환할 수도 있다.

var hotObservable = observable.publish();

RxJS에 포함된 일부 헬퍼들은 단지 기존 데이터 구조의 변환만 다룬다. 자바스크립트에서는 이들을 세 가지로 구분할 수 있다.

  1. 프로미스 는 한 개의 비동기 결과를 리턴하기 위해 사용된다.
  2. 함수 는 한 개의 결과를 위해 사용되고,
  3. 제너레이터 는 이터레이터를 제공하기 위해 사용된다.

제너레이터는 ES6에서 새로 나왔는데 ES5나 이전 버전의 배열(나쁘진 않지만 한 개의 값으로 취급되어야 한다)을 대체할만한 기능이다.

RxJS는 비동기 다중(반환) 값을 지원하는 데이터 유형을 제공한다. 그래서, 4개의 사분면이 이제 채워진다.

img

이터레이터의에서는 값을 당겨(pull)오는 반면 observables의 값들은 밀어(push) 넣어 진다. 예제로 대뤄질 내용은 이벤트 스트림인데 다음 이벤트를 강제로 발생시킬 수 없고 그저 이벤트 루프에서의 통보(notify)를 기다릴 수 밖에 없다.

var array = [1,2,3,4,5];
var source = Rx.Observable.from(array);

Observables를 생성하고 다루는 대부분의 헬퍼들은 스케쥴러를 적용할 수 있다. 스케줄러는 언제 구독이 시작되고 언제 통보(notifications)가 발행(publish) 될지를 조종한다. 기본 스케줄러가 실 사용시 대부분의 경우에서 잘 동작하기 때문에 여기서는 스케줄에 관해 자세히 다루지는 않을 것이다.

RxJS의 많은 오퍼레이터들은 스로틀이나 인터벌 혹은 딜레이 같은 동시성을 도입한다. 이제 이전의 예제를 다시 한 번 살펴보자 이제 이러한 헬퍼들이 필수적으로 된다.

예제

먼저 소수 발생기부터 살펴보자. 이제 주어진 시간의 결과들을 모와서 UI가 너무 많이 업데이트 되지 않도록 할 것이다.(특히 시작 시점에)

여기서는 앞서 언급했던 인터벌 헬퍼와의 연결을 통해 RxJS의 버퍼 함수를 사용하려고 한다.

결과는 아래의 이미지의 다이어그램이 보여주고 있다. 녹색원은 일정한 간격(소요시간) 후에 발생하고 버퍼는 그 간격 동안 발생한 파란색의 원만큼 수집한다.(역. 파란색원 = 소수 발생)

img

또한 맵을 이용해 데이터를 변환할 수 있다. 예를 들어, 수신된 이벤트 인수를 변형해 수신된 데이터를 숫자로 변환 할 수 있다.

var worker = new Worker('prime.js');
var observable = Rx.Observable.fromEvent(worker, 'message')
    .map(function (ev) { return ev.data * 1; })
    .buffer(Rx.Observable.interval(500))
    .where(function (x) { return x.length > 0; })
    .map(function (x) { return x.length; });

fromEvent 함수는 표준 이벤트 이미터 패턴을 이용해 모든 오브젝트로부터 observable를 생성한다. 버퍼는 길이가 0인 배열을 리턴할 수 있으니 where 함수를 이용해 길이가 0인 스트림은 제외한다. 마지막으로 이 예제에서는 단 소수의 생성 횟수에만 관심이 있으니 map을 이용해 버퍼의 개수를 얻어 온다.

다른 예제는 일정한 시간 후에 요청을 시작하도록 제한해야 하는 검색 쿼리 상자다. 이런 시나리오에서는 사용할 수 있는 함수가 두가지가 있는데 throttle 함수는 지정된 타임 윈도우동안 실행을 지연하는데 첫 요청만 수행하고 지연하고 debounce 함수는 타임 윈도우가 흐른 후 마지막에 한번 수행해준다. 그 타임 윈도우 역시 각 펑션에 따라 일정하게 미뤄진다.

여기서는 아래의 다이어그램이 보여주는 동작이 필요하다 그래서 여기서는 디바운스 매커니즘을 사용할 것이다.

img

이전의 결과들은 모두 버리고 마지막 타임 윈도우가 끝나기 직전 마지막 것만 취하게 된다. 입력 필드에 id 쿼리가 있다고 한다면 아래의 코드를 사용할 수 있다.

var q = document.querySelector('#query');
var observable = Rx.Observable.fromEvent(q, 'keyup')
    .debounce(300)
    .map(function (ev) { return ev.target.value; })
    .where(function (text) { return text.length >= 3; })
    .distinctUntilChanged()
    .map(searchFor)
    .switch()
    .where(function (obj) { return obj !== undefined; });

위 코드에서는 타임 윈도우의 시간을 300ms로 정했다. 또한 이전 쿼리하고는 다른 최소 3개의 문자로 제한한다. 이렇게 하면 텍스트를 지우거나 수정할 때의 불필요한 요청을 제거할 수 있다.

전체 표현 중에는 두 가지의 중요한 부분이 있다. 하나는 searchFor 함수를 이용해 쿼리 텍스트를 요청으로 변환하는 부분이고 나머지 하나는 switch 함수다. switch 함수는 네스트된 observable들을 리턴하는 함수를 받아 가장 최근의 observable 시퀀스에서만 값을 생성한다.

요청을 만드는 함수는 아래와 같이 정의된다.

function searchFor(text) {
    var xhr = new XMLHttpRequest();
    xhr.open('GET', apibaseUrl + '?q=' + text, true);
    xhr.send();
    return Rx.Observable.fromEvent(xhr, 'load').map(function (ev) {
        var request = ev.currentTarget;

        if (request.status === 200) {
            var response = request.responseText;
            return JSON.parse(response);
        }
    });
}

switch함수에 where함수를 연결하는 이유는 네스트된 observable의 잘못된 요청으로 undefined가 될 수 있기 때문이다.

결론

RxJS는 javascript에서 즐거운 반응형 프로그래밍을 할 수 있게 해준다. 대안으로 유사하게 동작하는 Bacon.js도 있다. 그래도 RxJS의 가장 좋은 점은 바로 Rx라는 점 그 자체다. 많은 플랫폼에서 사용 가능하며 이로 인해서 다른 언어, 플랫폼 혹은 시스템으로 쉽게 전환할 수 있다. 그리고 갈결하고 조합 가능한 일련의 메서드의 사용으로 반응형 프로그래밍의 개념을 통합한다. 또한 DOM 과의 상호 작용을 단순화하는 RxJS-DOM과 같은 몇 가지 매우 유용한 확장들도 존재한다.

Where do you see RxJS shine?

♥ Support writer ♥
with kakaopay

크리에이티브 커먼즈 라이선스이 저작물은 크리에이티브 커먼즈 저작자표시-비영리-변경금지 4.0 국제 라이선스에 따라 이용할 수 있습니다.

© Sungho Kim2023