일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 |
Tags
- 빅쿼리 튜닝
- BigQuery
- pyspark오류
- airflow
- PySpark
- ifkakao2020
- 코테
- spark explode
- 태블로
- 시각화
- sparkdf
- dataframe
- 빅쿼리
- 언어모델
- DataFrame Spark
- 프로그래머스 파이썬
- Docker error
- docker
- 도커오류
- 데이터엔지니어링
- 도커
- 도커exec
- 데이터 시각화
- LLM
- Big Query
- spark df
- SparkSQL
- 로컬 pyspark
- spark #스파크
- tableau
Archives
- Today
- Total
SOGM'S Data
SPARK_01 : RDD_2 (mapvalue, reduceByKey) 본문
<DATA>
각 열 : stationID(관측소) , entryType(온도 구분) , temperature(섭씨) 정보가 포함된 기상 관측소 데이터
x[0]: stationID(관측소) , x[2]: entryType(온도 구분) , x[3]: temperature(섭씨)
<실습 : 기상 관측소별 최소온도 구하기>
step1 : spark conf 생성
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext.getOrCreate(conf=conf)
#중복 sparkconf 실행 명령어 -> getOrCreate
- getOrCreate(conf=conf) 명령어의 경우 이미 존재하는 스파크세션이 있는 경우 new one을 만들어주는 작업
* 참조: pyspark.sql.SparkSession.builder.getOrCreate¶
builder.getOrCreate()
Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.
step2 : data 파싱
def parseLine(line):
fields = line.split(',')
stationID = fields[0]
entryType = fields[2]
temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
return (stationID, entryType, temperature)
lines = sc.textFile("file:///SparkCourse/1.RDD/1800.csv")
parsedLines = lines.map(parseLine)
- temperature의 경우 기존 fields[3]의 값을 화씨로 만들어주는 계산.
- ( *0.1 은 기존 데이터가 온도의 10를 기록했기때문에 다시 스케일)
- parsedLines RDD생성
step3 : FILTER , reduceBykey RDD생성
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
# 온도만 분리 filter 로 tmin(최소온도만 남김)
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
# stationID와 temperature 남김
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
- FILTER로 최솟값 구분자인 TMIN만 구분하여 가져와서 minTemps RDD생성
- stationID와 temperature 남긴 stationTemps RDD생성
- reduceBykey를 통한 관측소별 최솟값 구하기.
step4 : 출력 ( 실제 spark 실행되는 부분(=action) )
results = minTemps.collect();
for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))
#소수점 2자리 + 화씨 F 출력
결과
>>ITE00100554 5.36F
EZE00100082 7.70F
관측소별 화씨온도 5.36, 7.70F
'About Data > Engineering' 카테고리의 다른 글
SPARK_02 : DATAFRAME_2 (0) | 2022.02.28 |
---|---|
SPARK_02 : DATAFRAME_1 (0) | 2022.02.17 |
SPARK_01 : RDD_1 (0) | 2022.01.11 |
로컬에 spark 설치후 pyspark 실행 오류 (3) | 2021.12.31 |
[1-4] SPARK RDD + structured data (0) | 2021.12.28 |
Comments