0. 기본 개념
- 입력으로 주어진 데이터프레임에 새 컬럼을 추가해 새 데이터프레임을 만드는 함수. 여러 프레임워크에서 transform()이라는 이름의 메서드로 호출할 수 있으며, 그 결과로 데이터프레임이 리턴된다.
- ML 모델도 어떻게 보면 입력으로 주어진 데이터프레임에 ‘예측값’이라는 새 컬럼을 추가해 새 데이터프레임을 만드므로 일종의 transformer라고 볼 수 있다.
2) estimator
- 데이터프레임을 입력으로 받아 ML 모델을 만드는 함수. 여러 프레임워크에서 fit()이라는 이름의 메서드로 호출할 수 있으며, 그 결과로 ML 모델(transformer)이 리턴된다.
- ML Pipeline도 어떻게 보면 데이터프레임을 입력으로 받아 ML 모델을 만드므로 일종의 estimator라고 볼 수 있다.
1. Spark 세션 열기
1
2
3
4
5
| !pip install pyspark py4j
from pyspark.sql import SparkSession
spark1 = SparkSession.builder.appName("app1").getOrCreate()
|
2. 데이터 로드
1
2
3
4
5
6
7
8
| !wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv #Titanic 승객 정보로 생존자를 예측하는 ML 모델을 만들어보자.
data1 = spark1.read.csv('./titanic.csv', header=True, inferSchema=True)
data1.printSchema() #titanic.csv의 schema를 볼 수 있다.
data1.show() #titanic.csv의 내용을 볼 수 있다.
data1.select(['*']).describe().show() #titanic.csv의 각 컬럼의 개수, 평균, 최댓값, 최솟값 같은 정보를 볼 수 있다.
|
3. data cleansing
1) estimator 만들기
1
2
3
4
5
6
| from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, MinMaxScaler
imputer1 = Imputer(strategy='mean', inputCols = ['Age'], outputCols = ['AgeImputed']) #imputer1은, fit() 메서드로 호출하면 '입력으로 받은 데이터프레임의 Age 컬럼의 결측치를 그 컬럼의 평균으로 채운 AgeImputed라는 컬럼을 추가하는 모델'을 리턴하는 estimator이다.
indexer1 = StringIndexer(inputCol = 'Gender', outputCol = 'GenderIndexed')
assembler1 = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
scaler1 = MinMaxScaler(inputCol = 'features', outputCol = 'features_scaled')
|
2) data cleansing
1
| data1_selected = data1.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare']) #이 컬럼들만 모델 훈련에 사용
|
(1) 결측치 채우기
1
2
| imputer1_model = imputer1.fit(data1_selected) #data1_selected라는 데이터프레임을 입력으로 하여 imputer1 estimator를 호출한다. 결과값으로 ML 모델을 리턴받는다.
data1_cleaned = imputer1_model.transform(data1_selected) #imputer1_model에 담긴 모델에 data1_selected라는 데이터프레입을 입력으로 하여 impute 연산이 처리된 새로운 데이터프레임을 리턴받는다.
|
(2) 문자열 인덱싱
1
2
| indexer1_model = indexer1.fit(data1_cleaned)
data1_cleaned = indexer1_model.transform(data1_cleaned)
|
(3) feature vector 만들기
1
| vec1 = assembler1.transform(data1_cleaned)
|
(4) 데이터값 크기를 0과 1사이로 scaling
1
2
| scaler1_model = scaler1.fit(vec1)
vec1 = scaler1_model.transform(vec1)
|
4. binary classification model 만들고 평가해보기
1) train, test set 나누기
1
| train, test = vec1.randomSplit([0.7, 0.3])
|
2) 모델 만들고 예측 결과 얻기
1
2
3
4
5
| from pyspark.ml.classification import LogisticRegression
lr1 = LogisticRegression(featuresCol='features_scaled', labelCol='Survived')
lr1_model = lr1.fit(train)
prediction1 = lr1_model.transform(test)
|
3) 성능평가
1
2
3
4
| from pyspark.ml.evaluation import BinaryClassificationEvaluator
ev1 = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
ev1.evaluate(prediction1)
|
4. data cleansing부터 성능 평가까지 진행하는 ML Pipeline 만들기
1) ML Pipeline 만들기
1
2
3
4
5
| from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
lr1 = LogisticRegression(featuresCol='features_scaled', labelCol='Survived')
pipeline1 = Pipeline(stages=[imputer1, indexer1, assembler1, scaler1, lr1])
|
2) train, test set 나누고 예측 결과 얻고 성능평가
1
2
3
4
5
6
7
8
| #data1_selected = data1.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
train, test = data1_selected.randomSplit([0.7, 0.3])
pipeline1_model = pipeline1.fit(train)
prediction2 = pipeline1_model.transform(test)
ev1.evalutate(prediction2)
|
5. ML Tuning
- 위에서 만든 ML Pipeline을 이용해 하이퍼파라미터를 결정하는 cross validation을 자동으로 수행할 수 있다.
1
2
3
4
5
6
7
8
9
| from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
grid1 = (ParamGridBuilder().addGrid(lr1.maxIter, [1, 5, 10]).build())
cv1 = CrossValidator(estimator=pipeline1, estimatorParamMaps=grid1, evaluator=ev1, numFolds=5)
cv1_model = cv1.fit(train)
prediction3 = cv1_model.transform(test)
ev1.evaluate(prediction3)
|