일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- 도커exec
- docker
- DataFrame Spark
- 로컬 pyspark
- Docker error
- 시각화
- 태블로
- spark #스파크
- 빅쿼리
- spark explode
- Big Query
- PySpark
- SparkSQL
- 빅쿼리 튜닝
- 데이터 시각화
- 도커
- 도커오류
- 데이터엔지니어링
- 언어모델
- dataframe
- 코테
- ifkakao2020
- pyspark오류
- 프로그래머스 파이썬
- sparkdf
- spark df
- BigQuery
- airflow
- LLM
- tableau
- Today
- Total
SOGM'S Data
SPARK_02 : DATAFRAME_1 본문
spark 2.0 부터는 RDD 기반의 Dataframe이 지원된다.
기존 DB 언어인 SQL을 사용할 수 있어서 굉장히 편리하다.
기본 SPARK의 구동원리는 RDD와 같다. ( transformation lazy , action시 실제 spark run)
< spark datafame 예제>
1. 원본 데이터 모습. (예시)
2. 필요 모듈 불러오기 및 sparksession 초기화
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
우선 dataframe의 경우 sparksession을 import해주는 데 rdd에서 쓰던 sparkcontext의 세부항목을 좀 더 나열해준 것으로 이해하면된다. builder 사용
3. 데이터 type 지정 및 칼럼 정의
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
age=int(fields[2]), numFriends=int(fields[3]))
lines = spark.sparkContext.textFile("fakefriends.csv")
people = lines.map(mapper)
기존 csv 파일에 header(칼럼명)이 없어 칼럼명과 type을 지정해준다. 이때 spark.sql 의 row 를 import해준다.
읽은 파일에 mapper함수 적용.
4. cache() 및 viewtable만들기
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")
people RDD를 dataframe으로 생성한다.
그리고 캐시(중요!!) 나중에 action (show(), collect())할 때 cache가 되지 않으면 N번 테이블을 불러와야함.
people이랑 view 테이블도 만들어준다. people은 나중에 sql from절에서 불러와서 자유자재로 사용할 수 있다.
5. 원하는 조건뽑기 ( 조건: 13살 이상 19살 이하 사람만 출력 )
#1#
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")
for teen in teenagers.collect():
print(teen)
#2#
schemaPeople.groupBy("age").count().orderBy("age").show()
#1#처럼
from 절에서 people이란 view table을 사용하여 spark sql을 사용할 수도 있으며
#2#처럼
spark df 에서 바로 함수를 적용한 뒤 show()를 해도 가능하다.
*마지막은 spark.stop()을 반드시 하는 습관을 기르자.
< 번외: SQL 대신 자주쓰는 SPARK 메소드>
DF.show()
DF.select("someFieldName")
DF.filter(df("someFieldName")
DF.grouBy(df("someFieldName")).mean()
DF.rdd().map(mapperFunction)
dataset의 경우 scala가 좋음.
DF.show(DF.count())
#전체 열 표시 DF.show(DF.count()) <-> DF.show()와 달리 모든 행을 다보여준다.
'About Data > Engineering' 카테고리의 다른 글
SPARK BFS 구현 (0) | 2022.06.12 |
---|---|
SPARK_02 : DATAFRAME_2 (0) | 2022.02.28 |
SPARK_01 : RDD_2 (mapvalue, reduceByKey) (0) | 2022.02.06 |
SPARK_01 : RDD_1 (0) | 2022.01.11 |
로컬에 spark 설치후 pyspark 실행 오류 (3) | 2021.12.31 |