0. 설치

(1) PySpark, Py4J

1
!pip install pyspark py4j

(2) Redshift 접속에 필요한 JAR 파일들

- JAR 파일 설치를 위해서는 먼저 구글 Colab에 설치된 파이썬의 버전을 확인해야 한다.

1
!python --version

- 아래와 같이 경로를 이동해 JAR 파일을 설치한다.

1
!cd /usr/local/lib/python3.6.9/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

1. SparkSession 객체 만들기

- SparkSession은 Spark를 사용하고자 할 경우 가장 먼저 선언해야 하는 객체이다. 이 객체가 지원하는 여러 변수, 메서드를 사용하여 Spark를 사용할 수 있다.

1
2
3
4
5
6
from pyspark.sql import SparkSession

spark1 = SparkSession.builder
        .appName("app1") \
        .config("spark.jars", "/usr/local/lib/python3.6.9/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
        .getOrCreate()

2. RDD 다루기

(1) Python의 리스트 같은 변수형을 RDD형으로 변환하기

1
2
list1 = [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
rdd1 = spark1.sparkContext.parallelize(list1)

- parallelize() 메서드를 호출한다 하여 곧바로 rdd1 변수에 RDD형으로 변환된 값이 저장되는 게 아님을 유의해야 한다. rdd1의 메서드를 호출하는 문장이 나타날 때 그 메서드를 호출하면서 그때부터 비로소 rdd1 변수에 RDD형으로 변환된 값이 저장된다. (이를 lazy execution이라 한다.)

(2) 서버에 저장돼 있는 RDD형 변수의 내용 가져오기

1
print(rdd1.collect())

- 실제로 서버에 저장돼 있는 RDD형 데이터의 크기는 굉장히 큰 경우가 많으므로, 이와 같은 방식으로 그 내용을 가져오게 하면 굉장히 많은 자원 소모가 있게 된다. 따라서 RDD형 데이터의 크기가 아주 작은 경우가 아닐 때에는 이 코드를 사용할 일은 없다.

3. DataFrame 다루기

(1) DataFrame 생성

  • RDD형 데이터를 DataFrame형으로 변환하기
1
df1 = rdd1.toDF()
  • DB에 접속해 쿼리를 보내 그 쿼리에 해당하는 데이터를 가져와 DataFrame형으로 저장하기
1
2
3
4
5
df2 = spark1.read.format("jdbc").option("url", "jdbc:redshift://host1:1234/db1") \
        .option("dbtable", "table1") \ #이 부분에 table1 대신 SQL 쿼리를 쓸 수 있다.
        .option("user", "guest1").option("password", "guest123") \
        .option("driver", "com.amazon.redshift.jdbc42.Drive") \
        .load()
  • pandas의 DataFrame형 변수를 Spark의 DataFrame형으로 변환하기
1
df3 = spark1.createDataFrame(df_pandas1)

(2) DataFrame에 이름 지정하기

1
df1.createOrReplaceTempView("table1")

(3) DataFrame형 변수로 담아온 데이터에 대하여 SQL 쿼리 사용하기

1
select_df1 = spark1.sql("SELECT * FROM table1") #이 쿼리에서 사용할 테이블 이름은 위 2번에서 지정한 이름을 사용할 수 있다.

(4) DataFrame형 변수의 내용 가져오기

1
print(select_df1.collect()) #RDD형 변수의 내용을 가져오는 메서드와 이름이 같은 메서드를 사용한다.