Spark 20基础练习-精品文档资料.docx
《Spark 20基础练习-精品文档资料.docx》由会员分享,可在线阅读,更多相关《Spark 20基础练习-精品文档资料.docx(17页珍藏版)》请在淘文阁 - 分享文档赚钱的网站上搜索。
1、Spark Core基本练习Initializing Sparkval conf = new SparkConf().setAppName(appName).setMaster(master)new SparkContext(conf)inteljia ideaval conf = new SparkConf() val sc = new SparkContext(conf) val lines = sc.textFile(file:/home/training/test.txt); val words = lines.flatMap(_.split( ).map(word = (word,1
2、).reduceByKey(_+_)words.collect().foreach(print)Using the Shell$ ./bin/spark-shell -master local4$ ./bin/spark-shell -master local4 -jars code.jarResilient Distributed Datasets (RDDs)val distFile = sc.textFile(data.txt)RDD Operationsval lines = sc.textFile(data.txt)val lineLengths = lines.map(s = s.
3、length)val totalLength = lineLengths.reduce(a, b) = a + b)WordCountval rdd=sc.textFile(/home/training/software/spark/examples/src/main/resources/people.txt)val rdd = sc.textFile(hdfs:/localhost:9000/testspark.txt)rdd.cache()val wordcount=rdd.flatMap(_.split( ).map(x=(x,1).reduceByKey(_+_)Spark-Submi
4、t 实战spark-submit -class org.apache.spark.examples.SparkPi -executor-memory 256m -total-executor-cores 2 /home/training/software/spark/examples/jars/spark-examples_2.11-2.0.1.jar 200Accumulators:scala val accum = sc.longAccumulator(My Accumulator)accum: org.apache.spark.util.LongAccumulator = LongAcc
5、umulator(id: 0, name: Some(My Accumulator), value: 0)scala sc.parallelize(Array(1, 2, 3, 4).foreach(x = accum.add(x).10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala accum.valueres2: Long = 10Spark SQL 基本操作DataFrame Operationsimport org.apache.spark.sql.SparkSessionval spark =
6、SparkSession.builder().appName(Spark SQL basic example).config(spark.some.config.option, some-value).getOrCreate()import spark.implicits._val df = spark.read.json(file:/home/training/software/spark/examples/src/main/resources/people.json)df.show()/ This import is needed to use the $-notationimport s
7、park.implicits._/ Print the schema in a tree formatdf.printSchema()df.select(name).show()df.select($name, $age + 1).show()df.filter($age 21).show()df.groupBy(age).count().show()/ Register the DataFrame as a SQL temporary viewdf.createOrReplaceTempView(people)val sqlDF = spark.sql(SELECT * FROM peopl
8、e)sqlDF.show()/ Register the DataFrame as a global temporary viewspark.sql(SELECT * FROM people).show()/ +-+-+DataSets/ Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,/ you can use custom classes that implement the Product interfacecase class Person(name
9、: String, age: Long)/ Encoders are created for case classesval caseClassDS = Seq(Person(Andy, 32).toDS()caseClassDS.show()/ +-+-+/ |name|age|/ +-+-+/ |Andy| 32|/ +-+-+/ Encoders for most common types are automatically provided by importing spark.implicits._val primitiveDS = Seq(1, 2, 3).toDS()primit
10、iveDS.map(_ + 1).collect() / Returns: Array(2, 3, 4)/ DataFrames can be converted to a Dataset by providing a class. Mapping will be done by nameval path = examples/src/main/resources/people.jsonval peopleDS = spark.read.json(path).asPersonpeopleDS.show()/ For implicit conversions from RDDs to DataF
11、ramesimport spark.implicits._/ Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext .textFile(examples/src/main/resources/people.txt) .map(_.split(,) .map(attributes = Person(attributes(0), attributes(1).trim.toInt) .toDF()/ Register the DataFr
12、ame as a temporary viewpeopleDF.createOrReplaceTempView(people)/ SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql(SELECT name, age FROM people WHERE age BETWEEN 13 AND 19)/ The columns of a row in the result can be accessed by field indexteenagersDF.map
13、(teenager = Name: + teenager(0).show()/ +-+/ | value|/ +-+/ |Name: Justin|/ +-+/ or by field nameteenagersDF.map(teenager = Name: + teenager.getAsString(name).show()/ +-+/ | value|/ +-+/ |Name: Justin|/ +-+/ No pre-defined encoders for DatasetMapK,V, define explicitlyimplicit val mapEncoder = org.ap
14、ache.spark.sql.Encoders.kryoMapString, Any/ Primitive types and case classes can be also defined as/ implicit val stringIntMapEncoder: EncoderMapString, Any = ExpressionEncoder()/ row.getValuesMapT retrieves multiple columns at once into a MapString, TteenagersDF.map(teenager = teenager.getValuesMap
15、Any(List(name, age).collect()/ Array(Map(name - Justin, age - 19)import org.apache.spark.sql.types._/ Create an RDDval peopleRDD = spark.sparkContext.textFile(examples/src/main/resources/people.txt)/ The schema is encoded in a stringval schemaString = name age/ Generate the schema based on the strin
16、g of schemaval fields = schemaString.split( ) .map(fieldName = StructField(fieldName, StringType, nullable = true)val schema = StructType(fields)/ Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD .map(_.split(,) .map(attributes = Row(attributes(0), attributes(1).trim)/ Apply the sch
17、ema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)/ Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView(people)/ SQL can be run over a temporary view created using DataFramesval results = spark.sql(SELECT name FROM people)/ The results of SQL queries are Data
18、Frames and support all the normal RDD operations/ The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes = Name: + attributes(0).show()/ +-+/ | value|/ +-+/ |Name: Michael|/ | Name: Andy|/ | Name: Justin|/ +-+Data SourcesGeneric Load/Save Functionsva
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Spark 20基础练习-精品文档资料 20 基础 练习 精品 文档 资料
限制150内