SOGM'S Data

SPARK BFS 구현 본문

About Data/Engineering

SPARK BFS 구현

왈왈가부 2022. 6. 12. 22:36

슈퍼 히어로들간의 관계 즉, 거리를 SPARK BFS로 구현하는 예제입니다.

기본적인 초기 노드는 다음과 같이 나타납니다.

-  히어로1_ID , ( 히어로2_ID, 히어로3_ID ... 히어로N_ID)  , 거리 , 노드의색(방문여부) 

 =  (5983, (2031, 23121, 12313...123) , 9999 , WHITE)    * 초기에는 거리를 모르기 때문에 9999로 고정 , WHITE는 방문X

 

1. BFS전환 함수 converToBFS

#Boilerplate stuff:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)

# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14  #ADAM 3,031 (who?)

# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))

    color = 'WHITE'
    distance = 9999

    if (heroID == startCharacterID):
        color = 'GRAY'
        distance = 0

    return (heroID, (connections, distance, color))

-  초기 히어로 값인 스파이더맨(5306) 방문시 color 노드 GRAY 선언 및 거리 0 , 

- hitcounter 생성 -> 누적값 계산기를 통해 우리가 원하는 작업이 끝났는지를 확인

-  (heroID, (connections, distance, color)) 형태 생성

 

2. RDD 생성 함수

def createStartingRdd():
    inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt")
    return inputFile.map(convertToBFS)

 

3. 두 히어로들 간의 분리된 거리를 추적, 방문할때마다 노드 COLOR 업데이트 

3-1. The MAPPER 함수 

def bfsMap(node):
    characterID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]

    results = []

    #If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newCharacterID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetCharacterID == connection):
                hitCounter.add(1)

            newEntry = (newCharacterID, ([], newDistance, newColor))
            results.append(newEntry)

        #We've processed this node, so color it black
        color = 'BLACK'

    #Emit the input node so we don't lose it.
    results.append( (characterID, (connections, distance, color)) )
    return results

- targetCharacterID = 'ADAM' 을 만나면 Accumulater hitcounter 1증가함.  드라이버 스크립트로 원하는 사람을 찾았다는 신호를 보냄. 

3-2. THE Reducer함수

def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)

 

4. 반복

#Main program here:
iterationRdd = createStartingRdd()

for iteration in range(0, 10):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target character! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)

- bfsMap에 대해 flatMap 함수 실행 

- flatMap에 간단한 설명 : https://eprj453.github.io/spark/2021/02/25/Spark-map-flatMap/

 

[Spark] map, flatMap

map과 flatMap은 spark transformation의 대표적인 연산입니다. 이 둘을 사용해보고 차이점이 무엇인지 살펴보겠습니다. pyspark을 이용합니다.

eprj453.github.io

- targetCharacterID = 'ADAM' 을 만나면 Accumulater hitcounter 1증가함.  드라이버 스크립트로 원하는 사람을 찾았다는 신호를 보냄.  이때   .count()  action은 RDD를 evaluated되도록 강제

'About Data > Engineering' 카테고리의 다른 글

SPARK_02 : DATAFRAME_2  (0) 2022.02.28
SPARK_02 : DATAFRAME_1  (0) 2022.02.17
SPARK_01 : RDD_2 (mapvalue, reduceByKey)  (0) 2022.02.06
SPARK_01 : RDD_1  (0) 2022.01.11
로컬에 spark 설치후 pyspark 실행 오류  (3) 2021.12.31
Comments