RDD
RDD는 Resilient Distributed Dataset의 줄임말로 스파크의 기본 데이터 구조이다.
분산 변경 불가능한 객체 모음이며, 스파크의 모든 작업은 새로운 RDD를 만들거나 존재하는 RDD를 변형하거나 결과 계산을 위해 RDD에서 연산하는 것을 표현하고 있다.
Create RDD
RDD를 생성하는 법은 3가지가 있다.
- 직접 생성
- parallelize() 함수를 이용한 RDD 생성
- parallelize(c, numClices=None)
- 기능 : RDD로 구성해줌.
- C : Interables, numSlices : 분할할 갯수
- parallelize(c, numClices=None)
- createDataFrame() 함수를 이용한 RDD 생성
- parallelize() 함수를 이용한 RDD 생성
- 기존데이터를 이용한 생성
- read and load를 이용한 RDD 생성
Example
parallelize()함수를 이용한 RDD 생성
#make RDD and then transform to DF
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sparkContext.parallelize([
(1,2,3,'a b c'),
(4,5,6,'d e f'),
(7,8,9,'g h i')
]).toDF(['col1','col2','col3','col4'])
df.show()
#make RDD
myData = spark.sparkContext.parallelize([
(1,2),
(3,4),
(5,6),
(7,8)
])
myData.collect()
createDataFrame() 함수를 이용한 RDD 생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Employee = spark.createDataFrame([
(1,'joe','70000',1),
(2,'henry','80000',2),
(3,'sam','60000',2)
],
['id','name','sallary','departmentId']
)
Employee.show()
read & load를 이용한 RDD생성
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('com.databricks.spark.csv').\
inferschema='true'\
.load("C:/temp/Advertising.csv",header=True)
df.show()
df.printSchema()
spark operation
spark 연산은 분산 환경에서 데이터를 다루는 데 필요한 지연 처리방식의 transformations과 즉시 실행방식의 actions를 제공한다.
- RDD transformation
- 기존의 RDD에서 새로운 RDD를 생성하는 function이다.
- RDD Action
- 실제의 데이터를 가지고 작업할 떄 사용한다.
- action을 수행하게되면 RDD는 더이상 RDD가 아닌 Non-RDD values로 바뀐다.
rdd.DataFrame vs pandas.DataFrame
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.getOrcCreate()
#Create DataFrame
col_name = ['col1','col2']
list1 = [[1,2,3],[4,5,6]]
dict = {'col1':[1,2,3],
'col2':[4,5,6]}
#pandas
list_df = pd.DataFrame(list1,columns=col_name)
dict_df = pd.DataFrame(dict)
#spark
list_sdf = spark.createDataFrame(list1,col_name)
dict_sdf = spark.createDataFrmae(np.array(list(dict.values())).T.tolist(),list(dict.keys())
#Load DataFrame
#pandas
csv_df = pd.read_scv('Advertising.csv')
#spark
csv_sdf = spark.read.csv(path='homeAdvertising.csv',
header=True,
inferSchema=True)
#Data types
list_df.dtypes
list_sdf.dtypes
'빅데이터' 카테고리의 다른 글
빅데이터_Chapter06_RDD (0) | 2022.05.11 |
---|---|
빅데이터_Chapter05_Spark (0) | 2022.05.02 |
빅데이터_Chapter05_Spark 설치 및 환경설정 (0) | 2022.04.27 |
빅데이터_Chapter04_Hadoop 설치 및 환경설정 (0) | 2022.04.27 |
빅데이터_Chapter03_Java 설치 및 환경설정 (0) | 2022.04.27 |