0%

spring boot通过注解@EnableScheduling@Scheduled实现的是静态定时任务,不能动态添加、停止、修改等.
本文通过ThreadPoolTaskScheduler实现定时任务动态增删改.

  1. 创建ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(10);
    threadPoolTaskScheduler.setRemoveOnCancelPolicy(true);
    return threadPoolTaskScheduler;
}
  1. 创建一个全局任务记录表
@Data
public class Tasks {
    public static Map<String, ScheduledFuture<?>> tasks = new ConcurrentHashMap<>();
}
  1. 创建一个任务类
public class MyRunnable implements Runnable {
    private String id;

    public MyRunnable(String id) {
        this.id = id;
    }

    @Override
    public void run() {
        System.out.println("id: " + this.id + " - " + new Date());
    }
}
  1. 通过接口启动/删除/修改定时任务

    任务的元数据信息可以保存在数据库中.

@RestController
public class DynamicTask {
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    // 增加一个带id的定时任务
    @RequestMapping("/start")
    public String startCron(@RequestParam("id") String id) {
        if (Tasks.tasks.containsKey(id)) {
            return "exist";
        }
        ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(id), new CronTrigger("0/5 * * * * *"));
        Tasks.tasks.put(id, future);
        return "start";
    }

    //根据id删除一个定时任务
    @RequestMapping("/stop")
    public String stopCron(@RequestParam("id") String id) {
        if (!Tasks.tasks.containsKey(id)) {
            return "not exist";
        }
        Tasks.tasks.get(id).cancel(true);
        Tasks.tasks.remove(id);
        return "stop";
    }

    //根据id修改定时任务
    @RequestMapping("/changeCron10")
    public String startCron10(@RequestParam("id") String id) {
        stopCron(id);// 先停止,在开启.
        ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(id), new CronTrigger("*/10 * * * * *"));
        Tasks.tasks.put(id, future);
        return "change";
    }
}

开发带参数的restful接口

@RequestParam

// URL路径格式: /trigger?taskId=xxxxxxxxxx
@GetMapping("/trigger")
public String trigger(@RequestParam(value = "taskId") String taskId) {
    tableCompareService.compare(taskId);
    return "ok";
}

@PathVariable

// URL路径格式: /trigger/taskxxxxxxxxxx
@GetMapping("/trigger/{taskId}")
public void demo(@PathVariable(name = "taskId") String taskId) {
    System.out.println("taskId=" + taskId);
}

@RequestBody

// 请求内容转化成对象
@PostMapping(path = "/demo")
public void demo1(@RequestBody Person person) {
    System.out.println(person.toString());
}

RestTemplateConfig.java类

@Configuration
public class RestTemplateConfig {
 
    @Bean
    public RestTemplate restTemplate(ClientHttpRequestFactory factory){
        return new RestTemplate(factory);
    }
 
    // 超时时间自定义
    @Bean
    public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        factory.setConnectTimeout(15000);
        factory.setReadTimeout(5000);
        return factory;
    }
}

使用RestTemplate调用其它接口样例

@Service
public class demoService {
 
    @Autowired
    private RestTemplate restTemplate;
 
    public String get(Integer id){
        return restTemplate.getForObject("http://localhost:8080/user?userId=id",String.class);
    }
}
@GetMapping("getForEntity/{id}")
public User getById(@PathVariable(name = "id") String id) {
    ResponseEntity<User> response = restTemplate.getForEntity("http://localhost/get/{id}", User.class, id);
    User user = response.getBody();
    return user;
}
@RequestMapping("saveUser")
public String save(User user) {
    ResponseEntity<String> response = restTemplate.postForEntity("http://localhost/save", user, String.class);
    String body = response.getBody();
    return body;
}

针对delete put等一些方法没有返回值或者其它问题,可以使用restTemplate.exchange()方法解决.

SpringBoot中异步调用的使用

同步请求是会一直等待服务端相应的,需要返回结果给客户端的;而异步调用服务端会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务服务端后台自己慢慢跑就行,客户端不会关心。

  1. 开启异步调用,在启动类增加@EnableAsync使异步调用@Async注解生效
@EnableAsync
public class DbCompareApplication {
    public static void main(String[] args) {
        SpringApplication.run(DbCompareApplication.class, args);
    }
}
  1. 在需要异步调用的方法,增加注解@Async
@Component
public class TestSleep {
    @Async
    public void sleep(){
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("sleep");;
    }
}
  1. Controller调用异步方法
@RestController
public class TaskController {
    @Autowired
    TestSleep testSleep;
    @GetMapping("/test")
    public String test(){
        testSleep.sleep();
        return "ok";
    }
}
  1. 效果
  • 如果不加@Async注解,客户端的调用,需要10秒才能返回结果;
  • 加了@Async注解,客户端的调用立马返回结果,调用的方法在服务端后台运行.
  1. 注意事项:部分情况下@Async注解失效
  • 调用同一个类下注有@Async异步方法
  • 调用的是静态(static)方法
  • 调用(private)私有化方法

下载

配置ssh无密码登录

# 测试ssh localhost, 默认需要密码
[root@node1 hadoop]# ssh localhost
The authenticity of host 'localhost (::1)' can't be established.
ECDSA key fingerprint is SHA256:Ii9RadytomW4X2LEvMQwRxoOTeGgxfNbOgwXrc/wwZI.
ECDSA key fingerprint is MD5:bc:b5:ef:93:e6:fd:7c:cd:a3:4f:a7:f6:4c:24:c7:a7.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'localhost' (ECDSA) to the list of known hosts.
root@localhost's password: 
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
# 测试免密登录,无需密码即可登录成功
[root@node1 hadoop]# ssh localhost
Last failed login: Thu May  7 11:44:07 CST 2020 from localhost on ssh:notty
There was 1 failed login attempt since the last successful login.
Last login: Thu May  7 11:43:04 2020 from 192.168.41.1
[root@node1 ~]# 

配置java环境

#已经部署过了
[root@node1 hadoop]# echo $JAVA_HOME
/usr/local/bin/jdk1.8.0_112

安装hadoop

hadoop-2.7.7.tar.gz拷贝到/root/hadoop目录下并解压.

[root@node1 hadoop]# pwd
/root/hadoop
[root@node1 hadoop]# tar -zxvf hadoop-2.7.7.tar.gz 

/etc/profile或者~/.bash_profile配置环境变量HADOOP_HOME.

export HADOOP_HOME=/root/hadoop/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin

使配置生效

source /etc/profile

配置hadoop

进入$HADOOP_HOME/etc/hadoop目录,配置hadoop-env.sh等。涉及的配置文件如下:

hadoop-2.9.2/etc/hadoop/hadoop-env.sh
hadoop-2.9.2/etc/hadoop/yarn-env.sh
hadoop-2.9.2/etc/hadoop/core-site.xml
hadoop-2.9.2/etc/hadoop/hdfs-site.xml
hadoop-2.9.2/etc/hadoop/mapred-site.xml
hadoop-2.9.2/etc/hadoop/yarn-site.xml
  1. 配置hadoop-env.sh
# The java implementation to use.
export JAVA_HOME=${JAVA_HOME}
  1. 配置yarn-env.sh
#export JAVA_HOME=/home/y/libexec/jdk1.6.0/
  1. 配置core-site.xml
<configuration>
<property>
  <name>fs.default.name</name>
  <value>hdfs://192.168.41.128:9000</value>
  <description>HDFS的URI,文件系统://namenode标识:端口号</description>
</property>

<property>
  <name>hadoop.tmp.dir</name>
  <value>/root/hadoop/tmp</value>
  <description>namenode上本地的hadoop临时文件夹</description>
</property>
</configuration>
  1. 配置hdfs-site.xml
<configuration>
<property>
  <name>dfs.name.dir</name>
  <value>/root/hadoop/name</value>
  <description>namenode上存储hdfs名字空间元数据 </description> 
</property>

<property>
  <name>dfs.data.dir</name>
  <value>/root/hadoop/data</value>
  <description>datanode上数据块的物理存储位置</description>
</property>

<property>
  <name>dfs.replication</name>
  <value>1</value>
  <description>副本个数,配置默认是3,应小于datanode机器数量</description>
  </property>
</configuration>
  1. 配置mapred-site.xml
<configuration>
<property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
  </property>
</configuration>
  1. 配置yarn-site.xml
<configuration>
<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address</name>
  <value>${yarn.resourcemanager.hostname}:8099</value>
</property>
</configuration>

启动hadoop

  1. 格式化hdfs文件系统
bin/hadoop namenode -format
  1. 启动namenode
sbin/hadoop-daemon.sh start namenode
  1. 启动datanode
sbin/hadoop-daemon.sh start datanode
  1. 启动yarn
sbin/start-yarn.sh
  1. 验证
    查看logs/目录下是否有错误日志,通过jps命令查看后台进程.
[root@node1 hadoop-2.7.7]# jps
17152 NodeManager
17920 Jps
16721 DataNode #数据节点
16866 ResourceManager
62190 HMaster # Hbase
16623 NameNode #名称节点
  1. 查看UI

提交MapReduce作业

bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar pi 2 100

hdfs使用

  1. 创建一个目录
hadoop fs -mkdir /test
  1. 上传一个文件到指定目录
hadoop fs -put README.txt /test

或者

hadoop fs -moveFromLocal README.txt /test
  1. 查看目录下文件
[root@node1 hadoop-2.7.7]# hadoop fs -ls /test

更多命令参考

Hadoop版本支持矩阵:

  • S=支持
  • X=不支持
  • NT=未测试
HBase的-1.1.x中 HBase的-1.2.x的 HBase的-1.3.x的 HBase的-2.0.x版本
Hadoop-2.0.x-alpha X X X X
Hadoop-2.1.0-beta X X X X
Hadoop-2.2.0 NT X X X
Hadoop-2.3.x NT X X X
Hadoop-2.4.x S S S X
Hadoop-2.5.x S S S X
Hadoop-2.6.0 X X X X
Hadoop-2.6.1+ NT S S S
Hadoop-2.7.0 X X X X
Hadoop-2.7.1+ NT S S S
Hadoop-2.8.0 X X X X
Hadoop-2.8.1 X X X X
Hadoop-3.0.0 NT NT NT NT

建议使用 Hadoop 2.x:Hadoop 2.x 速度更快,包括短路读取功能,这将有助于提高您的 HBase 随机读取配置文件;Hadoop 2.x 还包括重要的 bug 修复,可以改善您的整体 HBase 体验;HBase 不支持使用早期版本的 Hadoop 运行;有关特定于不同 HBase 版本的要求,请参见下表;Hadoop 3.x 仍处于早期访问版本中,尚未被 HBase 社区对生产用例进行充分测试。

命令 使用方法 说明
ls hadoop fs -ls 返回文件详细信息或者目录列表
lsr hadoop fs -lsr 递归返回文件详细信息或者目录列表,类似ls -R
cat hadoop fs -cat URI 返回文件内容
chgrp hadoop fs -chgrp [-R] GROUP URI 改变文件所属组
chmod hadoop fs -chmod [-R] 改变文件的权限
chown hadoop fs -chown [-R] 改变文件拥有者
put hadoop fs -put 上传文件
copyFromLocal hadoop fs -copyFromLocal URI 上传文件
moveFromLocal hadoop fs -moveFromLocal 上传文件
get hadoop fs -get [-ignorecrc] [-crc] 下载文件
copyToLocal hadoop fs -copyToLocal [-ignorecrc] [-crc] 下载文件
cp hadoop fs -cp URI 复制文件
du hadoop fs -du URI 显示所有文件大小
dus hadoop fs -dus 显示文件大小
expunge hadoop fs -expunge 清空回收站
getmerge hadoop fs -getmerge [addnl]
mkdir hadoop fs -mkdir 创建目录
mv hadoop fs -mv URI 移动
rm hadoop fs -rm URI 删除非空目录和文件
rmr hadoop fs -rmr 递归删除
setrep hadoop fs -setrep [-R] 改变文件副本数
stat hadoop fs -stat URI 返回统计信息
tail hadoop fs -tail [-f] URI 返回文件尾部1K字节内容
test hadoop fs -test -[ezd] URI -e检查文件是否存在,-z检查文件是否为空,-d检查文件是否是目录
text hadoop fs -text 将文件输出和为文本格式,允许的格式zip TextRecordInputStream
touchz hadoop fs -touchz 创建一个空文件