SOGM'S Data

SPARK_01 : RDD_2 (mapvalue, reduceByKey) 본문

About Data/Engineering

SPARK_01 : RDD_2 (mapvalue, reduceByKey)

왈왈가부 2022. 2. 6. 20:12

<DATA>

각 열 : stationID(관측소) , entryType(온도 구분) , temperature(섭씨) 정보가 포함된 기상 관측소 데이터 

x[0]: stationID(관측소) , x[2]: entryType(온도 구분) , x[3]: temperature(섭씨)

 

<실습 : 기상 관측소별 최소온도 구하기>

step1 : spark conf 생성

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext.getOrCreate(conf=conf)
#중복 sparkconf 실행 명령어 -> getOrCreate
  • getOrCreate(conf=conf) 명령어의 경우 이미 존재하는 스파크세션이 있는 경우 new one을 만들어주는 작업

* 참조: pyspark.sql.SparkSession.builder.getOrCreate

builder.getOrCreate()

Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.


 

step2 : data 파싱 

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)
    
lines = sc.textFile("file:///SparkCourse/1.RDD/1800.csv")
parsedLines = lines.map(parseLine)
  • temperature의 경우 기존 fields[3]의 값을 화씨로 만들어주는 계산.
    • ( *0.1 은 기존 데이터가 온도의 10를 기록했기때문에 다시 스케일)
  • parsedLines RDD생성 

 

step3 : FILTER , reduceBykey RDD생성

minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
# 온도만 분리 filter 로 tmin(최소온도만 남김)

stationTemps = minTemps.map(lambda x: (x[0], x[2]))
# stationID와 temperature 남김 


minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
  • FILTER로 최솟값 구분자인 TMIN만 구분하여 가져와서 minTemps RDD생성 
  • stationID와 temperature 남긴 stationTemps RDD생성 
  • reduceBykey를 통한 관측소별 최솟값 구하기.

 

step4 : 출력 (  실제 spark 실행되는 부분(=action)  )

results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
#소수점 2자리 + 화씨 F 출력

 

결과

>>ITE00100554	5.36F
EZE00100082	7.70F

관측소별 화씨온도 5.36, 7.70F 

'About Data > Engineering' 카테고리의 다른 글

SPARK_02 : DATAFRAME_2  (0) 2022.02.28
SPARK_02 : DATAFRAME_1  (0) 2022.02.17
SPARK_01 : RDD_1  (0) 2022.01.11
로컬에 spark 설치후 pyspark 실행 오류  (3) 2021.12.31
[1-4] SPARK RDD + structured data  (0) 2021.12.28
Comments