Spark on SpringBoot
集成SpringBoot框架的Spark程序,运行在集群上
基础配置
首先创建一个SpringBoot项目,以下配置基于该项目做更改。
服务版本
- JDK:8
- SpringBoot:2.7.18
- Spark:3.5.4
配置pom.xml
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- 项目模型版本 -->
<modelVersion>4.0.0</modelVersion>
<!-- 项目坐标 -->
<groupId>local.ateng.java.spark</groupId>
<artifactId>spark-cluster</artifactId>
<version>v1.0</version>
<name>spark-cluster</name>
<description>
集成SpringBoot框架的Spark程序,运行在集群上
</description>
<!-- 项目属性 -->
<properties>
<!-- 默认主程序 -->
<start-class>local.ateng.java.spark.BigdataSparkClusterApplication</start-class>
<java.version>8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.7.18</spring-boot.version>
<maven-compiler.version>3.12.1</maven-compiler.version>
<maven-shade.version>3.5.1</maven-shade.version>
<lombok.version>1.18.36</lombok.version>
<fastjson2.version>2.0.53</fastjson2.version>
<hutool.version>5.8.35</hutool.version>
<spark.version>3.5.4</spark.version>
<hadoop.version>3.3.6</hadoop.version>
<mysql.version>8.0.33</mysql.version>
<postgresql.version>42.7.1</postgresql.version>
</properties>
<!-- 项目依赖 -->
<dependencies>
<!-- Spring Boot Web Starter: 包含用于构建Web应用程序的Spring Boot依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 使用log4j2作为日志实现 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!-- Spring Boot Starter Test: 包含用于测试Spring Boot应用程序的依赖项 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Hutool: Java工具库,提供了许多实用的工具方法 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- Lombok: 简化Java代码编写的依赖项 -->
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- 高性能的JSON库 -->
<!-- https://github.com/alibaba/fastjson2/wiki/fastjson2_intro_cn#0-fastjson-20%E4%BB%8B%E7%BB%8D -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- Spark Hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- Hadoop HDFS客户端 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 集群模式SpringBoot依赖的Gson和Hadoop、Spark集群依赖有冲突,使用relocation解决冲突 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.1</version>
</dependency>
</dependencies>
<!-- Spring Boot 依赖管理 -->
<dependencyManagement>
<dependencies>
<!-- SpringBoot 依赖管理 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 插件仓库配置 -->
<repositories>
<!-- Central Repository -->
<repository>
<id>central</id>
<name>阿里云中央仓库</name>
<url>https://maven.aliyun.com/repository/central</url>
<!--<name>Maven官方中央仓库</name>
<url>https://repo.maven.apache.org/maven2/</url>-->
</repository>
</repositories>
<!-- 构建配置 -->
<build>
<finalName>${project.name}-${project.version}</finalName>
<plugins>
<!-- Maven 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- Spring Boot Maven 插件 -->
<!--<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>-->
<!-- Maven Shade 打包插件 -->
<!-- https://maven.apache.org/plugins/maven-shade-plugin/shade-mojo.html -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade.version}</version>
<configuration>
<!-- 禁用生成 dependency-reduced-pom.xml 文件 -->
<createDependencyReducedPom>false</createDependencyReducedPom>
<!-- 附加shaded工件时使用的分类器的名称 -->
<shadedClassifierName>shaded</shadedClassifierName>
<!--
重新定位(relocate)类的包名,以防止与其他库中的同名类发生冲突
解决和Spark的gson冲突问题
-->
<relocations>
<relocation>
<pattern>com.google.gson</pattern>
<shadedPattern>shaded.com.google.gson</shadedPattern>
</relocation>
</relocations>
<filters>
<!-- 不复制 META-INF 下的签名文件 -->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>module-info.class</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.MF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/Log4j2Plugins.dat</exclude>
<!--<exclude>META-INF/*.txt</exclude>
<exclude>META-INF/NOTICE</exclude>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/services/java.sql.Driver</exclude>
-->
<!-- 排除resources下的xml配置文件 -->
<!--<exclude>*.xml</exclude>-->
</excludes>
</filter>
</filters>
<artifactSet>
<!-- 排除依赖项 -->
<excludes>
<!--<exclude>org.apache.logging.log4j:*</exclude>
<exclude>org.slf4j:*</exclude>-->
</excludes>
</artifactSet>
</configuration>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- 合并 Log4j2Plugins.dat 文件 -->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>Log4j2Plugins.dat</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/additional-spring-configuration-metadata.json</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring-configuration-metadata.json</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.tooling</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports
</resource>
</transformer>
<transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 指定默认主程序 -->
<mainClass>${start-class}</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<!-- 第一个资源配置块 -->
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
<!-- 第二个资源配置块 -->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>application*</include>
<include>bootstrap*.yml</include>
<include>common*</include>
<include>banner*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
拷贝Hive配置文件
将Hive的配置文件 hive-site.xml 拷贝到 resources 目录下。
编辑应用配置文件
编辑 application.yml 配置文件
yaml
server:
port: 16012
servlet:
context-path: /
spring:
main:
web-application-type: none
application:
name: ${project.artifactId}1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
创建日志配置文件
在 resources 目录下创建 log4j2.xml 日志配置文件
xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} [%t] %-5level %logger{36} - %msg%n" />
</Console>
</Appenders>
<Loggers>
<!-- 设置 org.apache.kafka 包日志级别为 WARN -->
<Logger name="org.apache.kafka" level="warn" additivity="false">
<AppenderRef ref="Console" />
</Logger>
<!-- 设置 org.apache.spark 包日志级别为 WARN -->
<Logger name="org.apache.spark" level="warn" additivity="false">
<AppenderRef ref="Console" />
</Logger>
<!-- 设置 org.sparkproject 包日志级别为 WARN -->
<Logger name="org.sparkproject" level="warn" additivity="false">
<AppenderRef ref="Console" />
</Logger>
<!-- 根日志级别设置为 INFO -->
<Root level="info">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
创建Runner
通过该Runner运行指定的类和方法
java
package local.ateng.java.spark.runner;
import cn.hutool.extra.spring.SpringUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.lang.reflect.Method;
import java.util.List;
/**
* 运行Spark任务
*
* @author 孔余
* @email 2385569970@qq.com
* @since 2025-01-25
*/
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MySparkJobRunner implements ApplicationRunner {
private final ApplicationContext applicationContext;
@Override
public void run(ApplicationArguments args) throws Exception {
// 获取需要运行的任务名称
List<String> classArgs = args.getOptionValues("class");
List<String> methodArgs = args.getOptionValues("method");
if (ObjectUtils.isEmpty(classArgs) || ObjectUtils.isEmpty(methodArgs)) {
System.out.println("请提供参数:--class=xxx --method=xxx");
System.out.println("例如:--class=local.ateng.java.spark.sql.SQLCount --method=run");
int exitCode = SpringApplication.exit(SpringUtil.getApplicationContext(), () -> 1);
System.exit(exitCode);
}
String className = classArgs.get(0);
String methodName = methodArgs.get(0);
System.out.println("运行" + className + "的" + methodName + "方法");
// 使用反射机制调用指定的类和方法
Class<?> clazz = Class.forName(className);
Object instance = applicationContext.getBean(clazz);
Method method = clazz.getMethod(methodName);
method.invoke(instance);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
编辑Spark代码
创建实体类
java
package local.ateng.java.spark.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 用户实体类
*
* @author 孔余
* @email 2385569970@qq.com
* @since 2025-01-24
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MyUser implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 用户ID
*/
private Long id;
/**
* 用户姓名
*/
private String name;
/**
* 用户年龄
* 注意:这里使用Integer类型,表示年龄是一个整数值。
*/
private Integer age;
/**
* 分数
*/
private BigDecimal score;
/**
* 用户生日
* 注意:这里使用Date类型,表示用户的生日。
*/
private LocalDate birthday;
/**
* 用户所在省份
*/
private String province;
/**
* 用户所在城市
*/
private String city;
/**
* 创建时间
*/
private LocalDateTime createTime;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
创建Spark环境配置
java
package local.ateng.java.spark.config;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Spark Config
*
* @author 孔余
* @email 2385569970@qq.com
* @since 2025-01-25
*/
@Configuration
public class MySparkConfig {
/**
* 创建Spark配置
*
* @return Spark上下文
*/
@Bean
public SparkConf sparkConf() {
// 创建Spark配置
SparkConf conf = new SparkConf();
// 设置应用名称
conf.setAppName("SparkOnSpring");
// 开启 Web UI
conf.set("spark.ui.enabled", "true");
// 指定hive仓库中的默认位置
//conf.set("spark.sql.warehouse.dir", "hdfs://server01:8020/hive/warehouse");
// 设置运行环境
String masterValue = conf.get("spark.master", "local[*]");
conf.setMaster(masterValue);
return conf;
}
/**
* 创建Spark上下文
*
* @param conf
* @return JavaSparkContext
*/
@Bean(destroyMethod = "stop")
public JavaSparkContext sparkContext(SparkConf conf) {
return new JavaSparkContext(conf);
}
/**
* 创建Spark Session
*
* @param conf
* @return SparkSession
*/
@Bean(destroyMethod = "stop")
public SparkSession sparkSession(SparkConf conf) {
return SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
创建RDD任务
java
package local.ateng.java.spark.rdd;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.stereotype.Component;
/**
* 读取HDFS文件并计算行数
*
* @author 孔余
* @email 2385569970@qq.com
* @since 2025-01-24
*/
@Component
@Slf4j
public class RDDCount {
public void run() {
// 获取环境
JavaSparkContext sc = SpringUtil.getBean("sparkContext", JavaSparkContext.class);
// 将数据并行化为RDD
JavaRDD<String> textFileRDD = sc.textFile("hdfs://server01:8020/data/my_user.csv");
// 使用count操作,获取RDD中行数。
long count = textFileRDD.count();
// 打印结果
System.out.println("RDD计算结果: " + count);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
创建SQL任务
java
package local.ateng.java.spark.sql;
import cn.hutool.extra.spring.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Component;
/**
* 查询数据数量
*
* @author 孔余
* @email 2385569970@qq.com
* @since 2025-01-24
*/
@Component
@Slf4j
public class SQLCount {
public void run() {
// 获取环境
SparkSession spark = SpringUtil.getBean("sparkSession", SparkSession.class);
// 执行SQL查询
Dataset<Row> ds = spark.sql("SELECT COUNT(*) FROM my_user");
// 显示查询结果
System.out.println("SQL计算结果:");
ds.show();
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
运行任务
IDEA运行
配置参数,运行指定的类和方法
java
--class=local.ateng.java.spark.sql.SQLCount --method=run1

打包Jar
通过Maven将代码打包成Jar,如下图所示

Spark Standalone
部署集群参考:安装Spark集群
将Jar包运行在Spark Standalone集群上,这里以运行读取Hive数据库数据为例。
客户端运行
client 模式适用于需要在本地客户端调试或开发的场景
spark-submit --master spark://server01:7077 \
--deploy-mode client \
--total-executor-cores 3 \
spark-cluster-v1.0.jar \
--class=local.ateng.java.spark.sql.SQLCount \
--method=run1
2
3
4
5
6
2
3
4
5
6
- --total-executor-cores: 指定 Spark 应用总共使用的 CPU 核心
验证是否执行成功,查看数据的写入时间
hadoop fs -ls /data/spark/output1
服务端运行
cluster 模式适用于生产环境,确保集群中的所有资源得到合理利用,并且 Driver 程序不依赖于本地环境
上传Jar到HDFS
hadoop fs -mkdir -p /data/spark
hadoop fs -put spark-cluster-v1.0.jar /data/spark1
2
2
在集群运行任务
spark-submit --master spark://server01:7077 \
--deploy-mode cluster \
--total-executor-cores 3 \
hdfs://server01:8020/data/spark/spark-cluster-v1.0.jar \
--class=local.ateng.java.spark.sql.SQLCount \
--method=run1
2
3
4
5
6
2
3
4
5
6
- --total-executor-cores: 指定 Spark 应用总共使用的 CPU 核心
验证是否执行成功,查看数据的写入时间
hadoop fs -ls /data/spark/output1
YARN
部署集群参考:安装配置Spark On YARN
将Jar包运行在Spark Standalone集群上,这里以运行读取Hive数据库数据为例。
客户端运行
client 模式适用于需要在本地客户端调试或开发的场景
spark-submit --master yarn \
--deploy-mode client \
--num-executors 3 \
spark-cluster-v1.0.jar \
--class=local.ateng.java.spark.sql.SQLCount \
--method=run1
2
3
4
5
6
2
3
4
5
6
- --num-executors: 指定 Executor 数量
验证是否执行成功,查看数据的写入时间
hadoop fs -ls /data/spark/output1
服务端运行
cluster 模式适用于生产环境,确保集群中的所有资源得到合理利用,并且 Driver 程序不依赖于本地环境
spark-submit --master yarn \
--deploy-mode cluster \
--num-executors 3 \
--name "统计Hive数据库数据" \
spark-cluster-v1.0.jar \
--class=local.ateng.java.spark.sql.SQLCount \
--method=run1
2
3
4
5
6
7
2
3
4
5
6
7
- --num-executors: 指定 Executor 数量
查看日志
yarn logs -applicationId application_1737640156298_0008 -log_files stdout1
验证是否执行成功,查看数据的写入时间
hadoop fs -ls /data/spark/output1
Java
将Spark相关的依赖的作用域都设置为compile(默认),最后打包
bash
java -jar \
-server -Xms128m -Xmx1024m \
spark-cluster-v1.0.jar \
--class=local.ateng.java.spark.sql.SQLCount \
--method=run1
2
3
4
5
2
3
4
5

Kubernetes
以下Spark应用配置参考
yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-spring-myapp
namespace: ateng-spark
spec:
type: Java
mode: cluster
image: "registry.lingo.local/service/spark:3.5.4"
imagePullPolicy: Always
proxyUser: admin
#mainClass: org.apache.spark.examples.SparkPi
arguments:
- "--class=local.ateng.java.spark.sql.SQLCount"
- "--method=run"
mainApplicationFile: "http://192.168.1.12:9000/test/spark/spark-cluster-v1.0.jar"
sparkVersion: "3.5.4"
sparkUIOptions:
serviceType: NodePort
dynamicAllocation:
enabled: true
initialExecutors: 1
maxExecutors: 5
minExecutors: 1
restartPolicy:
type: Never
driver:
cores: 2
coreLimit: "2"
coreRequest: "1"
memory: "2g"
env:
- name: TZ
value: Asia/Shanghai
labels:
version: 3.5.4
app.kubernetes.io/name: spark-spring-myapp
app.kubernetes.io/component: driver
hostAliases:
- ip: "192.168.1.18"
hostnames:
- "server01"
serviceAccount: spark-operator-spark
executor:
cores: 2
coreLimit: "2"
coreRequest: "1"
memory: "4g"
instances: 1
env:
- name: TZ
value: Asia/Shanghai
labels:
version: 3.5.4
app.kubernetes.io/name: spark-spring-myapp
app.kubernetes.io/component: executor
hostAliases:
- ip: "192.168.1.18"
hostnames:
- "server01"1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60