Spark 1.6がリリースされたことで、DataSet、DataFrameを試してみました。
まずは、事前に下記をインストールしておきます。(CentOS7にて)
- Java7 1.7.0_80
- Hadoop-2.6.3のインストール※インストールはこちらを参照してください。
- Development Toolsをグループインストール
環境変数の設定
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
VMパラメータを設定します。これをしないと、Sparkのビルド途中でout of memoryで失敗します。
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
Spark 1.6のビルドをします。かなり時間がかかりますので、完了まで待ちます。Scalaは、 2.10.4
build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.3 -DskipTests clean package
hadoopデーモンの起動
start-all.sh
DataSetで取り込むAWS IP Rangesをcsv形式で取り組んでおきます。Sparkはjson形式も取り込むことも可能ですが、ip-ranges.jsonそのままだとjson構造が入れ子になっていることもあり、csvにしてからSpark DataSetに取り込みました。
curl https://ip-ranges.amazonaws.com/ip-ranges.json | jq '.prefixes[] | {ip_prefix,region,service}' | jq "[.ip_prefix,.region,.service] | @csv" > ip_ranges.txt
Spark Shellを起動します。
$ spark-1.6.0/bin/spark-shell log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties To adjust logging level use sc.setLogLevel("INFO") Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80) Type in expressions to have them evaluated. Type :help for more information. 16/01/17 09:26:46 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 172.16.10.18 instead (on interface enp0s3) 16/01/17 09:26:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Spark context available as sc. SQL context available as sqlContext. scala>
DataFramesスキーマ定義を設定
scala> case class AwsIp(ip_prefix: String, region: String, service: String)
上記で取得したAWS IP rangesのcsvをDataFramesに取り組みます。
scala> val awsip = sc.textFile("/home/hdspark/ip_ranges.txt").map(_.split(",")).map(p => AwsIp(p(0),p(1),p(2))).toDF()
DataFramesスキーマ定義の確認
scala> awsip.printSchema() root |-- ip_prefix: string (nullable = true) |-- region: string (nullable = true) |-- service: string (nullable = true)
regionでグルーピング。東京リージョンのCIDR数は、オレゴンより多いとは。
scala> awsip.groupBy("region").count().show() +--------------+-----+ | region|count| +--------------+-----+ | eu-central-1| 18| | cn-north-1| 10| | us-east-1| 122| |ap-northeast-1| 56| |ap-southeast-1| 50| | us-west-1| 46| | us-west-2| 51| |ap-southeast-2| 34| |ap-northeast-2| 11| | GLOBAL| 35| | us-gov-west-1| 6| | sa-east-1| 29| | eu-west-1| 74| +--------------+-----+
serviceでグルーピング
scala> awsip.groupBy("service").count().show() +--------------------+-----+ | service|count| +--------------------+-----+ |ROUTE53_HEALTHCHECKS| 16| | ROUTE53| 1| | AMAZON| 323| | CLOUDFRONT| 17| | EC2| 185| +--------------------+-----+
regionが「ap-northeast-1」が合致するものを抽出し、その中で、serviceが「AMAZON」のものを抽出することも。
scala> val f1 =awsip.filter(awsip.col("region").equalTo("ap-northeast-1"))
scala> f1.filter(f1.col("service").equalTo("AMAZON")).show() +---------------+--------------+-------+ | ip_prefix| region|service| +---------------+--------------+-------+ | 27.0.0.0g22|ap-northeast-1| AMAZON| | 46.51.224.0g19|ap-northeast-1| AMAZON| | 52.68.0.0g15|ap-northeast-1| AMAZON| | 52.92.60.0g22|ap-northeast-1| AMAZON| | 52.94.9.0g24|ap-northeast-1| AMAZON| | 52.95.30.0g23|ap-northeast-1| AMAZON| | 52.95.34.0g24|ap-northeast-1| AMAZON| | 52.95.56.0g22|ap-northeast-1| AMAZON| | 52.95.243.0g24|ap-northeast-1| AMAZON| |52.95.255.48g28|ap-northeast-1| AMAZON| | 52.192.0.0g15|ap-northeast-1| AMAZON|ml | 52.196.0.0g14|ap-northeast-1| AMAZON| | 54.64.0.0g15|ap-northeast-1| AMAZON| | 54.92.0.0g17|ap-northeast-1| AMAZON| | 54.95.0.0g16|ap-northeast-1| AMAZON| | 54.150.0.0g16|ap-northeast-1| AMAZON| | 54.168.0.0g16|ap-northeast-1| AMAZON| | 54.178.0.0g16|ap-northeast-1| AMAZON| | 54.199.0.0g16|ap-northeast-1| AMAZON| |54.231.224.0g21|ap-northeast-1| AMAZON| +---------------+--------------+-------+ only showing top 20 rows
こんな感じで、フィルタリング機能も充実しています。DataSet 、DataFramesについては下記も参照してください。
http://spark.apache.org/docs/latest/sql-programming-guide.html
in-memoryで動くこともあるが、Hadoopより100x高速なので、これからデータプロセッシングはSparkがお勧め。
2016年内に、2.0のリリースも予定。(Rearchitecting for Mobile Platform、MLLib 2.0)
MLLib 2.0 については、https://issues.apache.org/jira/browse/SPARK-12626も。
Google Dataproc、EMRとも現状、Spark 1.6.0には対応していません(1.5.2が対応している最新版)。Google DataProcは、2015/9、2015/11と新バージョンがリリースしているなので、次は、2016/1に新バージョンがリリース?