빅데이터

빅데이터_Chapter?_SparkRDD

강용민 2022. 6. 6. 02:04

RDD

RDD는 Resilient Distributed Dataset의 줄임말로 스파크의 기본 데이터 구조이다.

분산 변경 불가능한 객체 모음이며, 스파크의 모든 작업은 새로운 RDD를 만들거나 존재하는 RDD를 변형하거나 결과 계산을 위해 RDD에서 연산하는 것을 표현하고 있다.

 

Create RDD

RDD를 생성하는 법은 3가지가 있다.

  • 직접 생성
    • parallelize() 함수를 이용한 RDD 생성
      • parallelize(c, numClices=None)
        • 기능 : RDD로 구성해줌.
        • C : Interables, numSlices : 분할할 갯수
    • createDataFrame() 함수를 이용한 RDD 생성
  • 기존데이터를 이용한 생성
    • read and load를 이용한 RDD 생성

 

Example

parallelize()함수를 이용한 RDD 생성

#make RDD and then transform to DF
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.sparkContext.parallelize([
	(1,2,3,'a b c'),
    	(4,5,6,'d e f'),
   	(7,8,9,'g h i')
    ]).toDF(['col1','col2','col3','col4'])

df.show()

#make RDD
myData = spark.sparkContext.parallelize([
	(1,2),
    	(3,4),
	(5,6),
    	(7,8)
    ])
myData.collect()

createDataFrame() 함수를 이용한 RDD 생성

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Employee = spark.createDataFrame([
	(1,'joe','70000',1),
    	(2,'henry','80000',2),
    	(3,'sam','60000',2)
    ],
    ['id','name','sallary','departmentId']
    )
Employee.show()

read & load를 이용한 RDD생성

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.format('com.databricks.spark.csv').\
	inferschema='true'\
    .load("C:/temp/Advertising.csv",header=True)
    
df.show()
df.printSchema()

 

spark operation

spark 연산은 분산 환경에서 데이터를 다루는 데 필요한 지연 처리방식의 transformations과 즉시 실행방식의 actions를 제공한다.

  • RDD transformation
    • 기존의 RDD에서 새로운 RDD를 생성하는 function이다.
  • RDD Action
    • 실제의 데이터를 가지고 작업할 떄 사용한다.
    • action을 수행하게되면 RDD는 더이상 RDD가 아닌 Non-RDD values로 바뀐다.

 

rdd.DataFrame vs pandas.DataFrame

import pandas as pd
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrcCreate()

#Create DataFrame
col_name = ['col1','col2']
list1 = [[1,2,3],[4,5,6]]
dict = {'col1':[1,2,3],
	'col2':[4,5,6]}

#pandas
list_df = pd.DataFrame(list1,columns=col_name)
dict_df = pd.DataFrame(dict)
#spark
list_sdf = spark.createDataFrame(list1,col_name)
dict_sdf = spark.createDataFrmae(np.array(list(dict.values())).T.tolist(),list(dict.keys())

#Load DataFrame
#pandas
csv_df = pd.read_scv('Advertising.csv')
#spark
csv_sdf = spark.read.csv(path='homeAdvertising.csv',
	header=True,
    inferSchema=True)
    
#Data types
list_df.dtypes
list_sdf.dtypes