使用Pycharm来实现Spark-SQL。
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, StructType
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("app name")\
.master("local")\
.getOrCreate()
sc = spark.sparkContext
line = sc.textFile("D:\\data\\demo.txt").map(lambda x: x.split('|'))
# personRdd = line.map(lambda p: Row(id=p[0], name=p[1], age=int(p[2])))
# personRdd_tmp = spark.createDataFrame(personRdd)
# personRdd_tmp.show()
#读取数据
schemaString = "id name age"
fields = list(map(lambda fieldName: StructField(fieldName, StringType(), nullable=True), schemaString.split(" ")))
schema = StructType(fields)
rowRDD = line.map(lambda attributes: Row(attributes[0], attributes[1],attributes[2]))
peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.createOrReplaceTempView("people")
results = spark.sql("SELECT * FROM people")
results.rdd.map(lambda attributes: "name: " + attributes[0] + "," + "age:" + attributes[1]).foreach(print)
# SQL风格语法
# personRdd_tmp.registerTempTable("person")
# spark.sql("select * from person where age >= 20 order by age desc limit 2").show()
#方法风格语法
# personRdd_tmp.select("name").show()
# personRdd_tmp.select(personRdd_tmp['name'], personRdd_tmp['age'] + 1).show()
# personRdd_tmp.filter(personRdd_tmp['age'] > 21).show()
# personRdd_tmp.groupBy("age").count().show()
# personRdd_tmp.createOrReplaceTempView("people")
# sqlDF = spark.sql("SELECT * FROM people")
# sqlDF.show()
# personRdd_tmp.createGlobalTempView("people")
# spark.sql("SELECT * FROM global_temp.people").show()
#
# spark.newSession().sql("SELECT * FROM global_temp.people").show()
# 保存为指定格式
# people = line.map(lambda p: (p[0],p[1], p[2].strip()))
# schemaString = "id name age"
#
# fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
# # # 通过StructType直接指定每个字段的schema
# schema = StructType(fields)
# schemaPeople = spark.createDataFrame(people, schema)
# schemaPeople.createOrReplaceTempView("people")
# results = spark.sql("SELECT * FROM people")
# results.write.json("D:\\code\\hadoop\\data\\spark\\day4\\personout.txt")
# results.write.save("D:\\code\\hadoop\\data\\spark\\day4\\personout1")
# results.show()