Sunday, September 1, 2013

맵리듀스(Map-Reduce) 프로그래밍 - 공기정보(Co-occurrence) 구하기 두번째. 상대 빈도 구하기

지난번 포스팅에서 하나의 문장에서 두개의 단어가 동시 출현하는 빈도수를 계산하는 간단한 hadoop streaming 파이썬 프로그램을 소개하였다. 이번 포스팅에서는 이 프로그램이 가지는 문제점을 해결하기 위한 새로운 프로그램을 만들어 보도록 하겠다.

지난번에는 파이썬으로 mapper 프로그램만 작성하고, reducer는 -reduce aggregate라는 옵션으로 내장 reducer를 사용했지만, 이번에는 reducer를 직접 작성해야 한다. 또한, 여러개의 reducer에 분배되는 key를 제어하기 위한 partitioner 옵션도 사용하도록 하겠다.

1. 단순한 co-occur 빈도의 문제점과 해결 방법
예를 들어, '구글'이라는 단어의 앞에 자주 나오는 단어를 알고 싶은 상황이 있다고 치자. 말뭉치에서 구글 앞에 많이 출현한 단어들을 빈도순으로 뽑아 보면 다음과 같은 단어들이 나온다.

단어1-단어2 빈도수
==================
하는-구글 1365
수-구글 664
글-구글 530
다른-구글 525
카테고리의-구글 402
2-구글 394
있습니다-구글 389
0-구글 359
애드센스-구글 324
1-구글 249

우리의 목적은 '구글'이라는 단어와 상관 관계가 높은 단어를 알고 싶은 것인데, '하는', '수', '글', '다른' 이런 단어들은 '원래' 많이 나오는 단어들이다.
이렇게, co-occur 카운트에서 '절대 빈도'는 우리가 원하는 정보가 아니다. 우리는 절대 빈도 뿐만 아니라, '상대 빈도'도 사용하여 문제를 해결하고자 한다.
'하는' 이라는 단어와 '구글' 이라는 단어가 함께 출현하는 절대 빈도는
    f(구글, 하는)
이라고 표현한다. 반면 상대 빈도는
    f(구글 | 하는)
이라고 표현한다. 말로 풀어쓰면 '하는'이라는 단어가 출현한 상황에서(조건에서, 이후에...) '구글'이라는 단어가 출현한 비율을 의미한다. (맞다. 조건부 확률이다!) 계산식은 다음과 같다.
   f(구글|하는) = f(구글, 하는) / f(하는)
즉, 절대 빈도 f(구글, 하는)의 값이 크더라도, '하는'이라는 단어가 원래 많이 나오는 단어라면 상대 빈도 f(구글|하는)의 값은 작은 값이 될 수도 있는 것이다.
여기에 조금 더 욕심을 내서, 상대빈도와 절대빈도를 하이브리드하여 다음의 식을 사용을 할 것이다.
  hybrid_f(구글|하는) = log(f(구글)) * f(구글|하는)
이렇게 하면 같은 상대 빈도 0.9라고 하더라도 기왕이면 '구글'이라는 단어가 많이 출현한 쪽이 더 높은 점수를 받게 된다.

2. Mapper, Reducer 프로그램의 구현
먼저 쉽게 생각할 수 있는 방법은 두개의 Map-Reduce 프로그램을 만드는 것이다. 하나의 MR 프로그램은 f(단어1)를 구하는 프로그램, 두번째 프로그램은 f(단어1, 단어2) 를 구하는 프로그램을 만드는 것이다. 이 방법도 나쁘지는 않지만, 여기서는 하나의 MR 프로그램, 즉, 하나의 Mapper와 하나의 Reducer만으로 상대 빈도 값을 구하는 프로그램을 만들어 보려고 한다.

먼저, 절대 빈도만 구하는 프로그램의 Mapper라면 다음과 같은 key-value 쌍들을 출력할 것이다.
입력 문장: "맵리듀스 프로그래밍 결코 어렵지 않아요"
출력 key-value:
[맵리듀스-프로그래밍   1],
[맵리듀스-결코              1],
[맵리듀스-어렵지          1],
[맵리듀스-않아요          1]
여기에 우리는 단어의 단독 출현 횟수를 동시에 계산하기 위해서 [단어-*     빈도] 라는 특수 목적의 새로운 key-value를 추가한다. 예를 들면,
[맵리듀스-*                   4]
와 같은 식이다.

이를 위한 Mapper.py는 다음과 같다.
아래 프로그램의 출력은 위 설명의 표현 형식과는 약간 다르게 "단어1[탭]단어2[탭]빈도수"의 형식으로 되어있다. 설명의 편의성과 실행의 편의성 차이에 때문이니 양해 바랍니다.^^
"""
relative_count_map.py
"""
import re
import sys

# 유효 단어 검사 패턴 - 숫자, 알파벳, 한글 자모
pattern = re.compile(ur"^[0-9a-zA-Z\uac00-\ud7a3]+$", re.UNICODE)

# 유효 단어 인지 검사
def is_valid_word(word) :
    if pattern.match(unicode(word, "utf-8")) :
        return True
    else :
        return False

if __name__ == "__main__" :
    # 토큰 구분 delimeter
    delimeter = re.compile(r"[ \t,.()'\"-]*")
    WINDOW_SIZE = 10
 
    for line in sys.stdin :
        words = []
        for word in delimeter.split(line.strip()) :
            if is_valid_word(word) :
                words.append(word)
 
        for i in range(len(words)) :
            sum = 0
            for j in range(1, WINDOW_SIZE) :
                if i + j >= len(words) :
                    break
                # 단어1[탭]단어2[탭]1 출력
                if words[i] != words[i+j] :
                    print "%s\t%s\t1" % (words[i], words[i+j])
                    sum += 1
            if sum > 0 :
                # 단어1[탭]*[탭]sum 출력
                print "%s\t*\t%d" % (words[i], sum)

이렇게 만들어진 Mapper의 key-value 쌍들은 하둡의 프레임웍이 알아서 섞고, 모으고 가지런히 정렬하여 (shuffle&sort) 아래와 같이 예쁜 순서대로 Reducer의 standard input 입력으로 도달하게 된다. (솔직히 말해서, 알아서가 아니고, partitioner 파라미터를 설정해주는 것이 필요한데, 이 설명은 뒤에 한다.)

# Reducer의 입력
하는[탭]*[탭]123451
하는[탭]*[탭]321212
하는[탭]*[탭]231231   ====> 여러 mapper에서 결과가 만들어져서 중복키가 연달아 들어옴
하는[탭]0[탭]223
하는[탭]0[탭]345
하는[탭]0[탭]311
하는[탭]00[탭]16
...
하는[탭]구글[탭]565
하는[탭]구글[탭]425
하는[탭]구글[탭]136
...

이제, 위와 같은 standard input 을 입력으로 받아서, 위에서 언급한 상대빈도와 절대빈도의 hybrid score를 계산하는 Reducer는 다음과 같다.

"""
relative_count_map.py
"""
import re
import sys
import math

"""
이 프로그램은 word1-word2 조합으로 이루어진 키의 입력 순서가 정렬되어 있다는 가정을 하고 있다.
그리고, 모든 word1-word2 맨앞에 word1-*가 반드시 있어야 한다.
또한, 같은 word1은 반드시 하나의 reducer로 모아서 입력되어야 한다.
이런 튜닝은 partitioner의 입력 파라미터로 이루어진다.
"""
if __name__ == "__main__" :
    prev_key = None
    key_count = 0
    for line in sys.stdin :
        word1, word2, val = line.split("\t")
        key = "%s\t%s" % (word1, word2)
        val = int(val)
        # word1-word2 조합 key가 바뀌면 바뀌기 전의 word1-word2에 대해 score 계산
        if prev_key != key :
            if prev_key :
                word1, word2 = prev_key.split("\t")
                if word2 == "*" :
                    word1_sum = key_count
                    print "%s\t%s\t%d" % (word1, word2, word1_sum)
                elif key_count >=1 :
                    hybrid_f = math.log(key_count) * key_count/word1_sum
                    print "%s\t%s\t%d\t%d\t%5.5f" % (word1, word2, key_count, word1_sum, hybrid_f)
            key_count = val
            prev_key = key
        # 동일한 word1-word2가 들어오는 동안은 빈도를 누적
        else :
            key_count += val

    word1, word2 = prev_key.split("\t")
    # 너무 빈도가 낮은 출력은 용량만 차지하고 쓸모가 없으므로 아예 출력을 안한다.
    if key_count >= 5 :
        print "%s\t%s\t%d\t%d\t%5.5f" % (word1, word2, key_count, word1_sum, float(key_count)/word1_sum)

여기까지의 내용을 다시 한번 요약하면, 단어1-단어2의 절대 빈도에, 단어1 만의 빈도를 의미하는 단어1-* 의 값을 이용하여 단어1의 출현 조건 하에서의 상대빈도를 구한다는 것이다.
Mapper에서 문서로부터 빈도를 만들어 출력하고, Reducer에서 빈도를 수집한다. 이런 패턴이 Map-Reduce의 기본 패턴이다.


3. 각각의 Reducer에 입력될 key를 할당하는 담당자 - Partitioner
이쯤에서 Map-Reduce의 실행 방식을 되돌아 보면, 여러개의 Mapper 프로그램이 key-value형식으로 출력을 하고, 여러개의 Reducer 프로그램이 또한, key-value 형식으로 입력을 받는다. 이때, Mapper의 key와 Reducer의 key는 반드시 일치할 필요는 없다.
여기서 질문. Mapper의 수많은 출력이 어떻게 섞이고 정렬되어 여러개의 Reducer에 분배, 할당 될까? 우리가 보통 코딩을 하지는 않지만(경우에 따라서는 custom으로 만들수도 있는) Partitioner라고 하는 프로그램이 있다.
Default Partitioner는 Mapper의 출력을 그냥 골고루 Reducer에 나눠주는 역할만 한다. 그래서, mapper 출력의 key의 hash값 % Reducer 개수로 계산하여, Reducer를 결정한다. 그러므로, 이 Default Partitioner는 Mapper의 출력 중 같은 key는 같은 Reducer로 할당하는 것까지만 보장된다. Hash값이 같을 태니까.

하지만, 우리가 만든 Reducer의 입력은 다음과 같은 조건이 반드시 지켜져야 한다.
1. 같은 단어1은 반드시 하나의 reducer로 가야 한다. 즉, '검색-구글' key는 1번 reducer로 가는데, '검색-네이버' key는 2번 reducer로 가면 안된다. 단어1이 '검색'인 단어를 모아서 계산해야 하기 때문이다. MR 프로그램은 reducer의 출력이 최종 출력이다. Reducer 이후 단계는 없다.
2. 단어1-단어2  에 대해서 오름 차순 정렬되어 있어야 하고, 그 맨위에 단어1-*가 있어야 한다. 순서가 뒤죽 박죽되면, 위에 소개한 Reducer 파이썬 프로그램은 정상 동작하지 않는다.

Partitioner가 이런 목적대로 동작하기 위해서, 우리는 Hadoop Streaming 실행 파라미터를 다음과 같이 한다.
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.0.4.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.num.map.output.key.fields=2 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D mapred.text.key.comparator.options="-k1 -k2" \ 
-input /input -output /output \
-mapper relative_count_map.py \
-reducer relative_count_reduce.py \
-file relative_count_map.py -file relative_count_reduce.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner


여기서, 주요 옵션에 대한 설명은 다음과 같다.
-D stream.num.map.output.key.fields=2 ---> mapper의 출력은 단어1, 단어2 두필드의 조합키
-D mapred.text.key.partitioner.options=-k1,1----> 단어1 만 partitioning key로 사용하려면 -k1,1로 해주어야 함
-D mapred.text.key.comparator.options="-k1 -k2" \  ---> 정렬은 단어1, 단어2 조합에 대해 해야 한다.
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner   --> Partitioning에 사용할 hadoop-streaming 내장 클래스

옵션에 대한 설명은 몹시 약하지만, 실제 개발시에는 매뉴얼과 구글을 뒤져가며 할 수 밖에 없을 것이다. 본인도 그렇게 찾아낸 옵션들이다.

4. 실행 결과와 결과 요약
이렇게 실행해서 찾아낸 '구글'과 상관 관계가 높은 단어들은 다음과 같다.


단어1-단어2 Hybrid-빈도( = log(f(단어2)) * f(단어2 | 단어1) )
===================================================
구글토그-구글 0.89414
스타트업기업과-구글 0.61769
타케팅으로-구글 0.53517
돈준다-구글 0.47337
다운로드계의-구글 0.44055
첫출시를-구글 0.39816
라이터로도-구글 0.39816
검색엔진만을-구글 0.39816
Megaphone-구글 0.39816
구글어스다운-구글 0.30951

보시는 것처럼, 포스팅 맨 앞에 소개한 단어들과는 다르게 '구글'과 상관 관계가 높은 단어들 만이 높은 점수를 받은 것을 알 수가 있다.

여기까지, 두서 없이 긴 내용을 읽어주신 분들께 감사드린다. 요약하자면,
1. 공기(co-occurrence) 빈도수를 구할 때, 절대 빈도의 대안으로 상대 빈도가 사용된다.
2. 이를 하나의 MR 프로그램에서 해결하기 위한 단어1-* 키를 도입하였다.
3. 각각의 Reducer에 적절한 key값 분배를 위하여 Partitioner 옵션을 설정하였다.
4. 2개의 key를 기준으로 Reducer 입력을 정렬하기 위한 옵션을 설정하였다.