如果你会任意一门语言的stream流,没道理不会大数据开发。
【资料图】
俗话说男追女隔座山,女追男隔层纱。如果说零基础学大数据,感觉前面是一座山,那么只要你会java或者任意一门语言的stream流,那大数据就只隔了一层纱。本文以java stream流计算为例,讲解一些基础的spark操作。另一个流行的大数据框架flink同理。
测试数据,以下列分别表示姓名,年龄,部门,职位。
张三,20,研发部,普通员工李四,31,研发部,普通员工李丽,36,财务部,普通员工张伟,38,研发部,经理杜航,25,人事部,普通员工周歌,28,研发部,普通员工
创建一个Employee
类。
@Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString static class Employee implements Serializable { private String name; private Integer age; private String department; private String level; }}
版本:jdk:1.8spark:3.2.0scala:2.12.15上面的scala版本只是spark框架本身需要依赖到scala。因为scala确实是比较小众的语言,本文还是使用java演示spark代码。
1.map类1.1 java stream mapmap表示一对一操作。将上游数据的一行数据进行任意操作,最终得到操作后的一条数据。这种思想,在java和spark,flink都是一致的。
我们先用java stream演示读取文件,再使用map操作将每行数据映射为Employee
对象。
List list = FileUtils.readLines(new File("f:/test.txt"), "utf-8"); List employeeList = list.stream().map(word -> { List words = Arrays.stream(word.split(",")).collect(Collectors.toList()); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); return employee; }).collect(Collectors.toList()); employeeList.forEach(System.out::println);
转换后的数据:
JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
1.2 spark map首先得到一个SparkSession对象,读取文件,得到一个DataSet弹性数据集对象。
SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();Dataset reader = session.read().text("F:/test.txt");reader.show();
这里的show()就是打印输出当前数据集,它是一个action类的算子。得到结果:
+-----------------------+| value|+-----------------------+|张三,20,研发部,普通员工||李四,31,研发部,普通员工||李丽,36,财务部,普通员工|| 张伟,38,研发部,经理||杜航,25,人事部,普通员工||周歌,28,研发部,普通员工|+-----------------------+
现在我们拿到了基础数据,我们使用map一对一操作,将一行行数据转换为Employee
对象。我们这里不使用lamda表达式,让大家看得更加清晰。这里实现了MapFunction接口里的call方法,每次拿到一行数据,我们这里进行切分,再转换为对象。
需要特别指出的一点是,与后端WEB应用有一个统一异常处理不同的是,大数据应用,特别是流式计算,要保证7*24在线,需要对每个算子进行异常捕获。因为你不知道上游数据清洗到底怎么样,很可能拿到一条脏数据,处理的时候抛出异常,如果没有捕获处理,那么整个应用就会挂掉。
spark的算子分为Transformation和Action两种类型。Transformation会开成一个DAG图,具有lazy延迟性,它只会从一个dataset(rdd/df)转换成另一个dataset(rdd/df),只有当遇到action类的算子才会真正执行。我们今天会演示的算子都是Transformation类的算子。典型的Action算子包括show,collect,save之类的。比如在本地进行show查看结果,或者完成运行后save到数据库,或者HDFS。
spark执行时分为driver和executor。但不是本文的重点,不会展开讲。只需要注意driver端会将代码分发到各个分布式系统的节点executor上,它本身不会参与计算。一般来说,算子外部,如以下示例代码的a处会在driver端执行,b处算子内部会不同服务器上的executor端执行。所以在算子外部定义的变量,在算子内部使用的时候要特别注意!! 不要想当然地以为都是一个main方法里写的代码,就一定会在同一个JVM里。这里涉及到序列化的问题,同时它们分处不同的JVM,使用"=="比较的时候也可能会出问题!!这是一个后端WEB开发转向大数据开发时,这个思想一定要转变过来。简言之,后端WEB服务的分布式是我们自己实现的,大数据的分布式是框架天生帮我们实现的
。
// a 算子外部,driver端Dataset employeeDataset = reader.map(new MapFunction() { @Override public Employee call(Row row) throws Exception { // b 算子内部,executor端 Employee employee = null; try { // gson.fromJson(); 这里使用gson涉及到序列化问题 List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(); } return employee; } }, Encoders.bean(Employee.class)); employeeDataset.show();
输出
+---+----------+--------+----+|age|department| level|name|+---+----------+--------+----+| 20| 研发部|普通员工|张三|| 31| 研发部|普通员工|李四|| 36| 财务部|普通员工|李丽|| 38| 研发部| 经理|张伟|| 25| 人事部|普通员工|杜航|| 28| 研发部|普通员工|周歌|
1.2.2 MapPartitionsFunctionspark中 map和mapPartitions有啥区别?map是1条1条处理数据mapPartitions是一个分区一个分区处理数据
后者一定比前者效率高吗?不一定,看具体情况。
这里使用前面 map 一样的逻辑处理。可以看到在call方法里得到的是一个Iterator迭代器,是一批数据。得到一批数据,然后再一对一映射为对象,再以Iterator的形式返回这批数据。
Dataset employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction() { @Override public Iterator call(Iterator iterator) throws Exception { List employeeList = new ArrayList<>(); while (iterator.hasNext()){ Row row = iterator.next(); try { List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(); } } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); employeeDataset2.show();
输出结果跟map一样,这里就不贴出来了。
2.flatMap类map和flatMap有什么区别?map是一对一,flatMap是一对多。当然在java stream中,flatMap叫法叫做扁平化。
这种思想,在java和spark,flink都是一致的。
2.1 java stream flatMap以下代码将1条原始数据映射到2个对象上并返回。
List employeeList2 = list.stream().flatMap(word -> { List words = Arrays.stream(word.split(",")).collect(Collectors.toList()); List lists = new ArrayList<>(); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee); Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee2); return lists.stream(); }).collect(Collectors.toList()); employeeList2.forEach(System.out::println);
输出
JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=张三_2, age=20, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=李四_2, age=31, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)JavaStreamDemo.Employee(name=李丽_2, age=36, department=财务部, level=普通员工)JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)JavaStreamDemo.Employee(name=张伟_2, age=38, department=研发部, level=经理)JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)JavaStreamDemo.Employee(name=杜航_2, age=25, department=人事部, level=普通员工)JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)JavaStreamDemo.Employee(name=周歌_2, age=28, department=研发部, level=普通员工)
2.2 spark flatMap这里实现FlatMapFunction的call方法,一次拿到1条数据,然后返回值是Iterator,所以可以返回多条。
Dataset employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction() { @Override public Iterator call(Row row) throws Exception { List employeeList = new ArrayList<>(); try { List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee2); } catch (Exception exception) { exception.printStackTrace(); } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); employeeDatasetFlatmap.show();
输出
+---+----------+--------+------+|age|department| level| name|+---+----------+--------+------+| 20| 研发部|普通员工| 张三|| 20| 研发部|普通员工|张三_2|| 31| 研发部|普通员工| 李四|| 31| 研发部|普通员工|李四_2|| 36| 财务部|普通员工| 李丽|| 36| 财务部|普通员工|李丽_2|| 38| 研发部| 经理| 张伟|| 38| 研发部| 经理|张伟_2|| 25| 人事部|普通员工| 杜航|| 25| 人事部|普通员工|杜航_2|| 28| 研发部|普通员工| 周歌|| 28| 研发部|普通员工|周歌_2|+---+----------+--------+------+
3 groupby类与SQL类似,java stream流和spark一样,groupby对数据集进行分组并在此基础上可以进行聚合函数操作。也可以分组直接得到一组子数据集。
3.1 java stream groupBy按部门分组统计部门人数:
Map map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting())); System.out.println(map);
输出
{财务部=1, 人事部=1, 研发部=4}
3.2 spark groupBy将映射为对象的数据集按部门分组,在此基础上统计部门员工数和平均年龄。
RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");// 统计每个部门有多少员工datasetGroupBy.count().show(); /** * 每个部门的平均年龄 */datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();
输出分别为
+----------+-----+|department|count|+----------+-----+| 财务部| 1|| 人事部| 1|| 研发部| 4|+----------+-----+
+----------+------+|department|avgAge|+----------+------+| 财务部| 36.0|| 人事部| 25.0|| 研发部| 29.25|+----------+------+
3.3 spark groupByKeyspark的groupBy
和groupByKey
的区别,前者在此基础上使用聚合函数得到一个聚合值,后者只是进行分组,不进行任何计算。类似于java stream的:
Map> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));System.out.println(map2);
输出
{财务部=[JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)], 人事部=[JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)], 研发部=[JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理), JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)]}
使用spark groupByKey。先得到一个key-value的一对多的一个集合数据集。这里的call()方法返回的是key,即分组的key。
KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction() { @Override public String call(Employee employee) throws Exception { // 返回分组的key,这里表示根据部门进行分组 return employee.getDepartment(); } }, Encoders.STRING());
再在keyValueGroupedDataset
的基础上进行mapGroups,在call()方法里就可以拿到每个key的所有原始数据。
keyValueGroupedDataset.mapGroups(new MapGroupsFunction() { @Override public Object call(Object key, Iterator iterator) throws Exception { System.out.println("key = " + key); while (iterator.hasNext()){ System.out.println(iterator.next()); } return iterator; } }, Encoders.bean(Iterator.class)) .show(); // 这里的show()没有意义,只是触发计算而已
输出
key = 人事部SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)key = 研发部SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理)SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)key = 财务部SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
4 reduce类reduce
的字面意思是:减少;减小;降低;缩小。又叫归约。
它将数据集进行循环,让当前对象
和前一对象
两两进行计算,每次计算得到的结果作为下一次
计算的前一对象
,并最终得到一个对象。假设有5个数据【1,2,3,4,5】,使用reduce进行计算,分别是
比如上面的测试数据集,我要计算各部门年龄总数。使用聚合函数得到的是一个int类型的数字。
4.1 java stream reduceint age = employeeList.stream().mapToInt(e -> e.age).sum();System.out.println(age);//178
使用reduce也可进行上面的计算
int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);System.out.println(age1);// 178
但是我将年龄求和,同时得到一个完整的对象呢?
JavaStreamDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)
可以使用reduce将数据集两两循环,将年龄相加,同时返回最后一个遍历的对象。下面代码的pre 代表前一个对象,current 代表当前对象。
/** * pre 代表前一个对象 * current 代表当前对象 */Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> { // 当第一次循环时前一个对象为null if (pre.getAge() == null) { current.setAge(current.getAge()); } else { current.setAge(pre.getAge() + current.getAge()); } return current;});System.out.println(reduceEmployee);
4.2 spark reducespark reduce的基本思想跟java stream是一样的。直接看代码:
Employee datasetReduce = employeeDataset.reduce(new ReduceFunction() { @Override public Employee call(Employee t1, Employee t2) throws Exception { // 不同的版本看是否需要判断t1 == null t2.setAge(t1.getAge() + t2.getAge()); return t2; } }); System.out.println(datasetReduce);
输出
SparkDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)
其它常见操作类Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();System.out.println(employee);// SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
同时可以将dataset注册成table,使用更为强大的SQL来进行各种强大的运算。现在SQL是flink的一等公民,spark也不遑多让。这里举一个非常简单的例子。
employeeDataset.registerTempTable("table");session.sql("select * from table where age > 30 order by age desc limit 3").show();
输出
+---+----------+--------+----+|age|department| level|name|+---+----------+--------+----+| 38| 研发部| 经理|张伟|| 36| 财务部|普通员工|李丽|| 31| 研发部|普通员工|李四|+---+----------+--------+----+
employeeDataset.registerTempTable("table");session.sql("select concat_ws(",",collect_set(name)) as names, // group_concat avg(age) as age, department from table where age > 30 group by department order by age desc limit 3").show();
输出
+---------+----+----------+| names| age|department|+---------+----+----------+| 李丽|36.0| 财务部||张伟,李四|34.5| 研发部|+---------+----+----------+
小结本文依据java stream的相似性,介绍了spark里面一些常见的算子操作。本文只是做一个非常简单的入门介绍。如果感兴趣的话,后端的同学可以尝试着操作一下,非常简单,本地不需要搭建环境,只要引入spark 的 maven依赖即可。我把本文的所有代码全部贴在最后面。
java stream 源码:
点击查看代码
import lombok.*;import org.apache.commons.io.FileUtils;import java.io.File;import java.io.IOException;import java.io.Serializable;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Map;import java.util.stream.Collectors;public class JavaStreamDemo { public static void main(String[] args) throws IOException { /** * 张三,20,研发部,普通员工 * 李四,31,研发部,普通员工 * 李丽,36,财务部,普通员工 * 张伟,38,研发部,经理 * 杜航,25,人事部,普通员工 * 周歌,28,研发部,普通员工 */ List list = FileUtils.readLines(new File("f:/test.txt"), "utf-8"); List employeeList = list.stream().map(word -> { List words = Arrays.stream(word.split(",")).collect(Collectors.toList()); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); return employee; }).collect(Collectors.toList()); // employeeList.forEach(System.out::println); List employeeList2 = list.stream().flatMap(word -> { List words = Arrays.stream(word.split(",")).collect(Collectors.toList()); List lists = new ArrayList<>(); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee); Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee2); return lists.stream(); }).collect(Collectors.toList()); // employeeList2.forEach(System.out::println); Map map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting())); System.out.println(map); Map> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment)); System.out.println(map2); int age = employeeList.stream().mapToInt(e -> e.age).sum(); System.out.println(age);// 178 int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b); System.out.println(age1);// 178 /** * pre 代表前一个对象 * current 代表当前对象 */ Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> { if (pre.getAge() == null) { current.setAge(current.getAge()); } else { current.setAge(pre.getAge() + current.getAge()); } return current; }); System.out.println(reduceEmployee); } @Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString static class Employee implements Serializable { private String name; private Integer age; private String department; private String level; }}
spark的源码:
点击查看代码
import com.google.gson.Gson;import lombok.*;import org.apache.spark.api.java.function.*;import org.apache.spark.sql.*;import java.io.Serializable;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;import java.util.stream.Collectors;/** * @projectName: spark-demo * @package: com.alpha.data * @className: SparkDemo * @author: nyp * @description: TODO * @date: 2023/4/27 9:06 * @version: 1.0 */public class SparkDemo { public static void main(String[] args) { SparkSession session = SparkSession.builder().master("local[*]").getOrCreate(); Dataset reader = session.read().text("F:/test.txt"); // reader.show(); /** * +-----------------------+ * | value| * +-----------------------+ * |张三,20,研发部,普通员工| * |李四,31,研发部,普通员工| * |李丽,36,财务部,普通员工| * |张伟,38,研发部,经理| * |杜航,25,人事部,普通员工| * |周歌,28,研发部,普通员工| * +-----------------------+ */ // 本地演示而已,实际分布式环境,这里的gson涉及到序列化问题 // 算子以外的代码都在driver端运行 // 任何算子以内的代码都在executor端运行,即会在不同的服务器节点上执行 Gson gson = new Gson(); // a 算子外部,driver端 Dataset employeeDataset = reader.map(new MapFunction() { @Override public Employee call(Row row) throws Exception { // b 算子内部,executor端 Employee employee = null; try { // gson.fromJson(); 这里使用gson涉及到序列化问题 List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(); } return employee; } }, Encoders.bean(Employee.class)); // employeeDataset.show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| */ Dataset employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction() { @Override public Iterator call(Iterator iterator) throws Exception { List employeeList = new ArrayList<>(); while (iterator.hasNext()){ Row row = iterator.next(); try { List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(); } } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); // employeeDataset2.show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| * +---+----------+--------+----+ */ Dataset employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction() { @Override public Iterator call(Row row) throws Exception { List employeeList = new ArrayList<>(); try { List list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee2); } catch (Exception exception) { exception.printStackTrace(); } return employeeList.iterator(); } }, Encoders.bean(Employee.class));// employeeDatasetFlatmap.show(); /** * +---+----------+--------+------+ * |age|department| level| name| * +---+----------+--------+------+ * | 20| 研发部|普通员工| 张三| * | 20| 研发部|普通员工|张三_2| * | 31| 研发部|普通员工| 李四| * | 31| 研发部|普通员工|李四_2| * | 36| 财务部|普通员工| 李丽| * | 36| 财务部|普通员工|李丽_2| * | 38| 研发部| 经理| 张伟| * | 38| 研发部| 经理|张伟_2| * | 25| 人事部|普通员工| 杜航| * | 25| 人事部|普通员工|杜航_2| * | 28| 研发部|普通员工| 周歌| * | 28| 研发部|普通员工|周歌_2| * +---+----------+--------+------+ */ RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department"); // 统计每个部门有多少员工 // datasetGroupBy.count().show(); /** * +----------+-----+ * |department|count| * +----------+-----+ * | 财务部| 1| * | 人事部| 1| * | 研发部| 4| * +----------+-----+ */ /** * 每个部门的平均年龄 */ // datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show(); /** * +----------+--------+ * |department|avg(age)| * +----------+--------+ * | 财务部| 36.0| * | 人事部| 25.0| * | 研发部| 29.25| * +----------+--------+ */ KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction() { @Override public String call(Employee employee) throws Exception { // 返回分组的key,这里表示根据部门进行分组 return employee.getDepartment(); } }, Encoders.STRING()); keyValueGroupedDataset.mapGroups(new MapGroupsFunction() { @Override public Object call(Object key, Iterator iterator) throws Exception { System.out.println("key = " + key); while (iterator.hasNext()){ System.out.println(iterator.next()); } return iterator; /** * key = 人事部 * SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工) * key = 研发部 * SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工) * SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工) * SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理) * SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工) * key = 财务部 * SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工) */ } }, Encoders.bean(Iterator.class)) .show(); // 这里的show()没有意义,只是触发计算而已 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction() { @Override public Employee call(Employee t1, Employee t2) throws Exception { // 不同的版本看是否需要判断t1 == null t2.setAge(t1.getAge() + t2.getAge()); return t2; } }); System.out.println(datasetReduce); Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first(); System.out.println(employee); // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工) employeeDataset.registerTempTable("table"); session.sql("select * from table where age > 30 order by age desc limit 3").show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 38| 研发部| 经理|张伟| * | 36| 财务部|普通员工|李丽| * | 31| 研发部|普通员工|李四| * +---+----------+--------+----+ */ } @Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString public static class Employee implements Serializable { private String name; private Integer age; private String department; private String level; }}
spark maven依赖,自行不需要的spark-streaming,kafka依赖去掉。
点击查看代码
8 8 1.8 1.8 2.12.15 3.2.0 UTF-8 org.scala-lang scala-library ${scala.version} org.apache.spark spark-sql_2.12 ${spark.version} org.projectlombok lombok 1.18.2 provided org.apache.spark spark-sql-kafka-0-10_2.12 ${spark.version} com.google.code.gson gson 2.7 mysql mysql-connector-java 5.1.34
标签:
- 上市公司回购本公司股权的流程是什么?刚上市的股票可以买吗?
- 揭开神秘面纱,会stream流就会大数据 焦点日报
- 众安百万医疗续保多少年?众安百万医疗续保保费会变吗?
- 城乡居民养老保险一年大概多少钱?城乡居民养老保险选择什么档次比较好?
- 发现保险公司拒赔还可以怎么操作?保险拒赔可以投诉吗?
- 定投基金分红方式选哪种好?基金分红怎么是亏的呢?
- 新股当日最高涨幅是多少?新股的中签率一般是多少?
- 安逸花征信不好能通过吗?安逸花为什么申请不通过?
- 团体意外伤害保险是什么?团体意外伤害保险怎么投保呢?
- 银行轧差是什么意思?银行的轧差和清算有什么区别?
- 分期乐初始额度是多少?分期乐额度无法使用多久能恢复?
- 银行怎么帮你撤销逾期?信用卡逾期什么样情况构成诈骗罪?
- 解局 | 被谴责的恒大与逾期债务冰山一角
- 车险投保人被保人受益人有什么区别?车险投保人被保人受益人不一致有关系吗?
- 分付可以提现到零钱吗?微信支付分六百多能开通分付吗?
- 分期乐上个人信用报告吗?分期乐逾期多久会被起诉?
- 江苏教育频道回看在哪里 江苏教育频道回看
- 全球热议:一季度全国新设民营企业203.9万户 同比增长10.7%
- 分期买手机可以提前还清吗?分期买手机可以用储蓄卡吗?
- 要挟的意思 要挟
- 每日速递:一季度全国新设民营企业203.9万户 同比增长10.7%
- 银行一般怎么核实收入证明?怎么开贷款收入证明?
- 借呗变成信用贷是好是坏?借呗升级信用贷是什么意思?
- 策略性思考的习惯,让你实现降维打击|世界观热点
- 字母哥成为第四个被黑八的MVP 字母哥体育界没有失败一说有些日子你能成功有些日子则不能(今日/头条)_每日聚焦
- 天天快报!铁路“五一”假期运输今日启动,预计发送旅客1.2亿人次
- 商业汇票是其他货币资金还是应付票据?应付票据会计分录怎么做?
- 环球快看点丨“五一”假期大幅涨价,如何给“狂飙”的酒店房价“刹车”
- 家乐卡贷款条件是什么?陕西信合家乐卡的用法是什么?
- 潘婷护发素推荐女生 潘婷
- 铁路“五一”假期运输今日启动,预计发送旅客1.2亿人次|当前热议
- 全球速讯:易语言技巧 易语言百集教程
- 天天速讯:三次视频合集!有一种安全叫祖国接你回家!
- 【全球热闻】三次视频合集!有一种安全叫祖国接你回家!
- “五一”假期大幅涨价,如何给“狂飙”的酒店房价“刹车”
- 3万亿、2万亿成GDP“分水岭”,这“四巨头”你追我赶
- 当前消息!热火力克雄鹿4-1完成黑八奇迹 雄鹿是第三支首轮出局的夺冠最大热门
- 涉胡鑫宇事件,这些造谣自媒体人被公诉、行拘|消息
- 热火力克雄鹿4-1完成黑八奇迹 雄鹿是第三支首轮出局的夺冠最大热门(今日/头条)
- 统计局:2023年1-3月份全国规模以上工业企业利润下降21.4% 全球观热点
- 每日速读!科学家发现火星近期水活动新证据
- 全球观速讯丨涉胡鑫宇事件,这些造谣自媒体人被公诉、行拘
- 天天讯息:手机论坛手机之家 手机之家中关村在线
- 科学家发现火星近期水活动新证据
- 天天看热讯:视频丨35秒!感受南昌舰火力全开高燃时刻
- 全球热资讯!圣诞节来历和习俗简短 圣诞节来历和习俗
- 【天天速看料】可燃冰的主要成分化学式 可燃冰的主要成分
- 中国广电首批5G合约机正式开售|资讯
- 视频丨35秒!感受南昌舰火力全开高燃时刻
- 查看linux系统时间 如何查看linux系统版本-热门
- 冰岛糯伍古树茶特点 冰岛古树茶的特点
- 简历中兴趣爱好模板 兴趣爱好简历100字
- 【全球新视野】滴滴顺风车app如何注册车主?
- 世界即时:劳动合同过期了没续签有影响吗 劳动合同到期不续签需要提前通知
- 口袋妖怪漆黑的魅影的幻影之塔怎么过_口袋妖怪漆黑的魅影中幻影之塔要怎么走 复制的勿进|焦点播报
- 天天滚动:1035人被授予全国五一劳动奖章 产业工人占比超四成
- 人社部:今年就业形势稳中有升 前3个月城镇新增就业297万人-快看
- 出现这些情况莫慌!“五一”高铁出行实用问答来了|要闻
- 全球看点:中国科学家将开展月球科研站建设研究
- 雅图怎么不能用了 雅图在线怎么打不开了_全球短讯
- 时代金融是学术期刊吗 时代金融是国内核心期刊吗-世界新动态
- 第六届数字中国建设峰会今天开幕-全球头条
- 通过ip地址添加打印机添加不了怎么办 通过ip地址添加打印机-当前聚焦
- 1035人被授予全国五一劳动奖章 产业工人占比超四成-每日消息
- 人参鹿茸酒功效及泡制方法 人参鹿茸泡酒配方大全 环球百事通
- 天天热点!人社部:今年就业形势稳中有升 前3个月城镇新增就业297万人
- 天天亮点!出现这些情况莫慌!“五一”高铁出行实用问答来了
- 环球新动态:福州邮政编码的具体含义 福州邮政编码
- 世界速看:六枝特区:快速推进高标准农田建设 推动农业高质量发展
- 当前资讯!中国科学家将开展月球科研站建设研究
- 国家统计局:3月份工业生产稳步回升,企业利润降幅有所收窄
- 注意!我省专升本志愿填报今日18:00截止!
- 催债短信怎么写发狠一点 催债短信怎么写
- 每日快播:龙口一中美女老师 龙口一中
- 全球动态:第六届数字中国建设峰会今天开幕
- 澳门:4月28日起调整佩戴口罩指引
- 联通供应商自服务平台 中国联通供应商平台 快看
- 一把辛酸泪的佳句赏析 一把辛酸泪
- 31省份常住人口数据出炉,这个省增量居榜首
- 全球最新:31省份常住人口数据出炉,这个省增量居榜首
- 澳门:4月28日起调整佩戴口罩指引-百事通
- 每日简讯:尾盘放量什么意思 尾盘放量下跌意味着什么
- 驻马店市审计局深入部署2023年第二季度灾后恢复重建跟踪审计工作 全球简讯
- 破解开发区建设短板弱项 甘肃力促园区经济整体跃升
- 提升消费帮扶质效
- 强化企业科技创新主体地位
- 【环球速看料】金禾实业:随着国民健康意识不断提升,对健康食品的需求也愈发强烈
- 探索区域公用品牌富农路径|前沿资讯
- 国家统计局:3月工业生产稳步回升 企业利润降幅有所收窄
- 副省长胡云在泸州调研时强调:加力推进现代农业园区和水利工程建设 筑牢粮食安全根基 全球速读
- 市领导带队开展“五一”节前安全检查-天天观天下
- 焦点短讯!视频丨“火卫二” 高清画面传回 像极了“太空土豆”
- 环球看热讯:视频丨“火卫二” 高清画面传回 像极了“太空土豆”
- 淄博被限价的酒店应该得到合理补偿 淄博火出圈劝退信让游客读出了真诚和尊重 全球新动态
- 【全球速看料】北京口岸“五一”出入境将超13万人次
- 全球微动态丨保养头发吃什么食物 保养头发
- 焦点速递!wps兼容office2003 wps与office兼容包
- 我把国宝上交国家免费
- 国家统计局:1—3月份 全国规模以上工业企业实现利润总额15167.4亿元
- 新华全媒+丨第六届数字中国建设成果展在福州开幕