일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 빅쿼리
- ifkakao2020
- dataframe
- LLM
- SparkSQL
- Big Query
- DataFrame Spark
- 프로그래머스 파이썬
- spark #스파크
- Docker error
- sparkdf
- BigQuery
- PySpark
- 태블로
- spark explode
- spark df
- 데이터 시각화
- 로컬 pyspark
- 언어모델
- 도커
- docker
- 도커오류
- tableau
- 데이터엔지니어링
- 시각화
- 도커exec
- pyspark오류
- 코테
- 빅쿼리 튜닝
- airflow
- Today
- Total
SOGM'S Data
SPARK BFS 구현 본문
슈퍼 히어로들간의 관계 즉, 거리를 SPARK BFS로 구현하는 예제입니다.
기본적인 초기 노드는 다음과 같이 나타납니다.
- 히어로1_ID , ( 히어로2_ID, 히어로3_ID ... 히어로N_ID) , 거리 , 노드의색(방문여부)
= (5983, (2031, 23121, 12313...123) , 9999 , WHITE) * 초기에는 거리를 모르기 때문에 9999로 고정 , WHITE는 방문X
1. BFS전환 함수 converToBFS
#Boilerplate stuff:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)
# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14 #ADAM 3,031 (who?)
# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)
def convertToBFS(line):
fields = line.split()
heroID = int(fields[0])
connections = []
for connection in fields[1:]:
connections.append(int(connection))
color = 'WHITE'
distance = 9999
if (heroID == startCharacterID):
color = 'GRAY'
distance = 0
return (heroID, (connections, distance, color))
- 초기 히어로 값인 스파이더맨(5306) 방문시 color 노드 GRAY 선언 및 거리 0 ,
- hitcounter 생성 -> 누적값 계산기를 통해 우리가 원하는 작업이 끝났는지를 확인
- (heroID, (connections, distance, color)) 형태 생성
2. RDD 생성 함수
def createStartingRdd():
inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt")
return inputFile.map(convertToBFS)
3. 두 히어로들 간의 분리된 거리를 추적, 방문할때마다 노드 COLOR 업데이트
3-1. The MAPPER 함수
def bfsMap(node):
characterID = node[0]
data = node[1]
connections = data[0]
distance = data[1]
color = data[2]
results = []
#If this node needs to be expanded...
if (color == 'GRAY'):
for connection in connections:
newCharacterID = connection
newDistance = distance + 1
newColor = 'GRAY'
if (targetCharacterID == connection):
hitCounter.add(1)
newEntry = (newCharacterID, ([], newDistance, newColor))
results.append(newEntry)
#We've processed this node, so color it black
color = 'BLACK'
#Emit the input node so we don't lose it.
results.append( (characterID, (connections, distance, color)) )
return results
- targetCharacterID = 'ADAM' 을 만나면 Accumulater hitcounter 1증가함. 드라이버 스크립트로 원하는 사람을 찾았다는 신호를 보냄.
3-2. THE Reducer함수
def bfsReduce(data1, data2):
edges1 = data1[0]
edges2 = data2[0]
distance1 = data1[1]
distance2 = data2[1]
color1 = data1[2]
color2 = data2[2]
distance = 9999
color = color1
edges = []
# See if one is the original node with its connections.
# If so preserve them.
if (len(edges1) > 0):
edges.extend(edges1)
if (len(edges2) > 0):
edges.extend(edges2)
# Preserve minimum distance
if (distance1 < distance):
distance = distance1
if (distance2 < distance):
distance = distance2
# Preserve darkest color
if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
color = color2
if (color1 == 'GRAY' and color2 == 'BLACK'):
color = color2
if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
color = color1
if (color2 == 'GRAY' and color1 == 'BLACK'):
color = color1
return (edges, distance, color)
4. 반복
#Main program here:
iterationRdd = createStartingRdd()
for iteration in range(0, 10):
print("Running BFS iteration# " + str(iteration+1))
# Create new vertices as needed to darken or reduce distances in the
# reduce stage. If we encounter the node we're looking for as a GRAY
# node, increment our accumulator to signal that we're done.
mapped = iterationRdd.flatMap(bfsMap)
# Note that mapped.count() action here forces the RDD to be evaluated, and
# that's the only reason our accumulator is actually updated.
print("Processing " + str(mapped.count()) + " values.")
if (hitCounter.value > 0):
print("Hit the target character! From " + str(hitCounter.value) \
+ " different direction(s).")
break
# Reducer combines data for each character ID, preserving the darkest
# color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)
- bfsMap에 대해 flatMap 함수 실행
- flatMap에 간단한 설명 : https://eprj453.github.io/spark/2021/02/25/Spark-map-flatMap/
[Spark] map, flatMap
map과 flatMap은 spark transformation의 대표적인 연산입니다. 이 둘을 사용해보고 차이점이 무엇인지 살펴보겠습니다. pyspark을 이용합니다.
eprj453.github.io
- targetCharacterID = 'ADAM' 을 만나면 Accumulater hitcounter 1증가함. 드라이버 스크립트로 원하는 사람을 찾았다는 신호를 보냄. 이때 .count() action은 RDD를 evaluated되도록 강제
'About Data > Engineering' 카테고리의 다른 글
SPARK_02 : DATAFRAME_2 (0) | 2022.02.28 |
---|---|
SPARK_02 : DATAFRAME_1 (0) | 2022.02.17 |
SPARK_01 : RDD_2 (mapvalue, reduceByKey) (0) | 2022.02.06 |
SPARK_01 : RDD_1 (0) | 2022.01.11 |
로컬에 spark 설치후 pyspark 실행 오류 (3) | 2021.12.31 |