0%

依赖

<!--mqtt相关依赖-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置

  • application.yml
spring:
  mqtt:
    #tcp://127.0.0.1:1883,tcp://192.168.60.133:1883
    url: tcp://192.168.3.27:1883
    username: root
    password: 123456
    clientId: client-test
    cleanSession: false
    qos: 2
    retained: false
    connectionTimeout: 60
    keepAliveInterval: 30
    subscribeTopics:
      - test/#
      - topic/+
      - top/+/iic
  • MqttConfig.java
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @ClassName: MqttConfig
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 14:37
 * @Version: v1.0
 **/
@ConfigurationProperties(prefix = "spring.mqtt")
@Data
@Configuration
public class MqttConfig {
    private String url;
    private String username;
    private String password;
    private String clientId;
    ///是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),
    // 客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
    //设置为true表示每次连接服务器都是以新的身份
    private boolean cleanSession = false;
    //QoS 0,最多交付一次。
    //QoS 1,至少交付一次。
    //QoS 2,只交付一次。
    private int qos = 2;
    //发布者发布消息时,如果 Retained 标记被设置为 true,则该消息即是 MQTT 中的保留消息(Retained Message)。
    // MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
    private boolean retained = true;
    // 连接超时
    private int connectionTimeout = 60;
    // 设置心跳时间 单位为秒
    private int keepAliveInterval = 30;
    // 默认订阅的主题
    private List<String> subscribeTopics;
}

连接/发布/订阅/重连

  • MqttService.java
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
 * @ClassName: MqttService
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 14:39
 * @Version: v1.0
 **/
@Slf4j
@Service
public class MqttService {
    @Autowired
    MqttConfig config;
    private MqttClient client;

    @PostConstruct
    public void init() {
        connect();
    }

    public void connect() {
        try {
            client = new MqttClient(config.getUrl(), config.getClientId(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(config.isCleanSession());
            // 设置自动重连
            options.setAutomaticReconnect(true);
            options.setUserName(config.getUsername());
            options.setPassword(config.getPassword().toCharArray());
            options.setConnectionTimeout(config.getConnectionTimeout());
            options.setKeepAliveInterval(config.getKeepAliveInterval());
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
//            options.setWill("willTopic", (config.getClientId() + "与服务器断开连接").getBytes(), 0, false);
            client.setCallback(new MqttCallBack(config, client));
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void send(String topic, String message) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(config.getQos());
        mqttMessage.setRetained(config.isRetained());
        mqttMessage.setPayload(message.getBytes());
        MqttTopic mqttTopic = client.getTopic(topic);
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
            log.info("消息ID: {}, topic: {}, message {} 已发送.", token.getMessageId(), topic, message);
        } catch (MqttException e) {
            log.info("topic: {} message {} 发送异常.", topic, message);
            e.printStackTrace();
        }
    }
}
  • MqttCallBack.java
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @ClassName: MqttCallBack
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2023/4/13 15:04
 * @Version: v1.0
 **/
@Slf4j
@Data
public class MqttCallBack implements MqttCallbackExtended {
    private MqttConfig config;
    private MqttClient client;

    public MqttCallBack(MqttConfig config, MqttClient client) {
        this.config = config;
        this.client = client;
    }

    @Override
    public void connectionLost(Throwable throwable) {
        log.error("{} 断开连接.", config.getClientId());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        log.info("收到 {} 消息 {}.", topic, msg);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("消息ID: {} 发送成功.", token.getMessageId());
    }

    @Override
    public void connectComplete(boolean b, String url) {
        log.info("MQTT {} 连接 {} 成功.", config.getClientId(), url);
        try {
            if (client.isConnected()) {
                for (String topic : config.getSubscribeTopics()) {
                    client.subscribe(topic, config.getQos());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

说明

通过配置实现redis连接单机还是哨兵.

实现

  • application.yml
spring:
  redis:
    #  引入的自定义配置告知是使用单机还是哨兵:standalone / sentinel / cluster
    type: ${REDIS_TYPE:sentinel}
    port: ${REDIS_PORT:6379}
    host: ${REDIS_HOST:192.168.3.27}
    password: ${REDIS_PASSWORD}
    database: ${REDIS_DATABASE:8}
    sentinel:
      master: ${REDIS_SENTINEL_MASTER:mymaster}
      nodes: ${REDIS_SENTINEL_NODES:192.168.3.17:26379,192.168.3.19:26379,192.168.3.20:26379}
      password: ${REDIS_SENTINEL_PASSWORD}
  • RedisConfig.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * redis 配置
 */
@Configuration
@Slf4j
public class RedisConfig {
    @Value("${spring.redis.type}")
    private String type;
    @Autowired
    RedisProperties properties;

    @Bean
    public RedisTemplate<String, String> redisTemplate(@Autowired RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(keySerializer());
        redisTemplate.setHashKeySerializer(keySerializer());
        redisTemplate.setValueSerializer(valueSerializer());
        redisTemplate.setHashValueSerializer(valueSerializer());
        return redisTemplate;
    }

    private RedisSerializer<String> keySerializer() {
        return new StringRedisSerializer();
    }

    //使用Jackson序列化器
    private StringRedisSerializer valueSerializer() {
        return new StringRedisSerializer();
    }

    // 根据配置的type创建RedisConnectionFactory
    @Bean
    public RedisConnectionFactory connectionFactory() throws Exception {
        switch (type) {
            case "standalone": {
                RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();
                redisConfig.setHostName(properties.getHost());
                redisConfig.setPort(properties.getPort());
                redisConfig.setPassword(properties.getPassword());
                redisConfig.setDatabase(properties.getDatabase());
                return new LettuceConnectionFactory(redisConfig);
            }
            case "sentinel": {
                RedisProperties.Sentinel sentinel = properties.getSentinel();
                RedisSentinelConfiguration redisConfig = new RedisSentinelConfiguration();
                redisConfig.master(sentinel.getMaster());
                redisConfig.setSentinelPassword(sentinel.getPassword());
                redisConfig.setDatabase(properties.getDatabase());
                for (String node : sentinel.getNodes()) {
                    String split[] = node.split(":");
                    redisConfig.addSentinel(new RedisNode(split[0], Integer.parseInt(split[1])));
                }
                return new LettuceConnectionFactory(redisConfig);
            }
            case "cluster": {
                // 集群的不常用,暂未实现,可以参考 https://www.zhuqiaolun.com/2020/11/1605606341401/1605606341401/ 实现
                RedisClusterConfiguration redisConfig = new RedisClusterConfiguration();
                throw new Exception("redis cluster config not support.");
            }
            default:
                throw new Exception("redis config type not support.");
        }
    }
}

说明

本文通过 JDK 中原生的 JAXB 实现 XMLjava bean 之间的互转,不依赖任何第三方包.

  • JAXBContext 类,是应用的入口,用于管理XML/Java绑定信息
  • Marshaller 接口,将Java对象序列化为XML数据
  • Unmarshaller 接口,将XML数据反序列化为Java对象

常用Annotation

  1. @XmlAccessorType(XmlAccessType.NONE) ,控制字段或属性的序列化
  • FIELD: JAXB 绑定类中的每个非静态、非瞬态字段将会自动绑定到 XML,除非由 XmlTransient 注释。
  • NONE: 所有字段或属性都不能绑定到 XML,除非使用一些 JAXB 注释专门对它们进行注释。
  • PROPERTY: JAXB 绑定类中的每个获取方法/设置方法对将会自动绑定到 XML,除非由 XmlTransient 注释。
  • PUBLIC_MEMBER:每个公共获取方法/设置方法对和每个公共字段将会自动绑定到 XML,除非由 XmlTransient 注释。
  1. @XmlRootElement
    xml 文件的根元素

  2. @XmlElement
    代表的生成xml的属性名称,一般用在属性上,或者set方法上。

  • 用法 @XmlElement(name="elementName")
  1. @XmlTransient
    放在属性上,或者set方法上,代表不进行序列化

  2. @XmlAttribute
    该属性作为xml的attribute

  • @XmlAttribute(name = "NAME")
  1. @XmlAccessorOrder
    控制JAXB 绑定类中属性和字段的排序

  2. @XmlJavaTypeAdapter
    使用定制的适配器(即扩展抽象类XmlAdapter并覆盖marshal()和unmarshal()方法),以序列化Java类为XML

  3. @XmlElementWrapper
    对于数组或集合(即包含多个元素的成员变量),生成一个包装该数组或集合的XML元素(称为包装器)

XML和Bean互转Util

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import java.io.StringReader;
import java.io.StringWriter;

/**
 * @ClassName: FormatUtil
 * @Description: XML 和 JSON 互转工具
 * @Author: wuzhiyong
 * @Time: 2023/4/4 14:39
 * @Version: v1.0
 **/
public class FormatUtil {

    public static <T> T xmlToBean(String xml, Class<T> tClass) {
        Object object = null;
        try {
            JAXBContext context = JAXBContext.newInstance(tClass);
            Unmarshaller unmarshaller = context.createUnmarshaller();
            object = unmarshaller.unmarshal(new StringReader(xml));
        } catch (JAXBException e) {
            e.printStackTrace();
        }
        return (T) object;
    }

    /**
     * 对象转xml
     *
     * @param obj
     * @return
     */
    public static String beanToXml(Object obj) {
        StringWriter writer = null;
        try {
            JAXBContext context = JAXBContext.newInstance(obj.getClass());
            Marshaller marshaller = context.createMarshaller();
            // 格式化
            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
            // 去掉生成xml的默认报文头 <?xml version="1.0" encoding="utf-8" standalone="yes"?>
            marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
            marshaller.setProperty(Marshaller.JAXB_ENCODING, "utf-8");
            writer = new StringWriter();
            marshaller.marshal(obj, writer);
        } catch (JAXBException e) {
            e.printStackTrace();
        }
        return writer != null ? writer.toString() : null;
    }
}

测试

  • School.java
@Data
@NoArgsConstructor
@XmlRootElement(name = "SCHOOL")
@XmlAccessorType(XmlAccessType.NONE)
public class School {
    @XmlElement(name = "ADDRESS")
    private String addr;

    @XmlElement(name = "NUMBER")
    private int count;

    @XmlElement(name = "TEACHERS")
    private Teachers teachers;
}
  • Teachers.java
@Data
@NoArgsConstructor
@AllArgsConstructor
@XmlAccessorType(XmlAccessType.FIELD)
public class Teachers {
    @XmlElement(name = "TEACHER")
    private List<Teacher> teachers;
}
  • Teacher.java
@Data
@NoArgsConstructor
@AllArgsConstructor
@XmlAccessorType(XmlAccessType.FIELD)
public class Teacher {
    @XmlAttribute(name = "NAME")
    private String name;

    @XmlValue
    private String desc;
}
  • 测试.java
public class Test {
    public static void main(String[] args) {
        // 测试数据
        School school = new School();
        school.setAddr("Nanjing");
        school.setCount(1000);
        List<Teacher> teachers = new ArrayList<>();
        teachers.add(new Teacher("张三", "高级教师"));
        teachers.add(new Teacher("李四", "中级教师"));
        teachers.add(new Teacher("王五", "初级教师"));
        school.setTeachers(new Teachers(teachers));

        System.out.println(beanToXml(school));
        /**
         * 输出
         * <SCHOOL>
         *     <ADDRESS>Nanjing</ADDRESS>
         *     <NUMBER>1000</NUMBER>
         *     <TEACHERS>
         *         <TEACHER NAME="张三">高级教师</TEACHER>
         *         <TEACHER NAME="李四">中级教师</TEACHER>
         *         <TEACHER NAME="王五">初级教师</TEACHER>
         *     </TEACHERS>
         * </SCHOOL>
         */

        String xml = "<SCHOOL>\n" +
                "    <ADDRESS>Nanjing</ADDRESS>\n" +
                "    <NUMBER>1000</NUMBER>\n" +
                "    <TEACHERS>\n" +
                "        <TEACHER NAME=\"张三\">高级教师</TEACHER>\n" +
                "        <TEACHER NAME=\"李四\">中级教师</TEACHER>\n" +
                "        <TEACHER NAME=\"王五\">初级教师</TEACHER>\n" +
                "    </TEACHERS>\n" +
                "</SCHOOL>";
        School school1 = xmlToBean(xml, School.class);
        System.out.println(school.getAddr());
        System.out.println(school.getCount());
        System.out.println(school.getTeachers());
        /**
         * 输出
         * Nanjing
         * 1000
         * Teachers(teachers=[Teacher(name=张三, desc=高级教师), Teacher(name=李四, desc=中级教师), Teacher(name=王五, desc=初级教师)])
         */
    }
}

nfs-client

安装NFS

# 安装并设置自动启动
yum install nfs-utils
systemctl enable rpcbind
systemctl enable nfs
# 启动
systemctl start rpcbind
systemctl start nfs
# 服务启动之后,我们在服务端配置一个共享目录
mkdir /data
chmod 755 /data
# 根据这个目录,相应配置导出目录
vi /etc/exports
# 添加如下配置
/data/     192.168.0.0/24(rw,sync,no_root_squash,no_all_squash)
# 保存设置之后,重启 NFS 服务
systemctl restart nfs
# 可以检查一下本地的共享目录
showmount -e localhost
Export list for localhost:
/data 192.168.0.0/24

helm安装nfs-client

# 添加仓库
helm repo add nfs-subdir-external-provisioner https://kubernetes-sigs.github.io/nfs-subdir-external-provisioner/
# 更新
helm repo update
# 安装
helm install my-nfs nfs-subdir-external-provisioner/nfs-subdir-external-provisioner \
    --set nfs.server=x.x.x.x \
    --set nfs.path=/data

# 设置镜像仓库,默认仓库拉不下来
--set image.repository=hub.deri.org.cn/k8s/nfs-subdir-external-provisioner
# 设置SC名字,默认nfs-client
--set storageClass.name=my-nfs
# 设置供应商名字,默认自动生成一个
--set storageClass.provisionerName=cluster.local/nfsxxx
# 完整安装命令
helm install nfs-client nfs-subdir-external-provisioner/nfs-subdir-external-provisioner --set nfs.server=192.168.3.22 --set nfs.path=/data/nfs --set image.repository=hub.deri.org.cn/k8s/nfs-subdir-external-provisioner --set storageClass.name=nfs-client --set storageClass.provisionerName=cluster.local/nfs-client

etcd

ETCD对磁盘要求较高,官方推荐SSD起步。

ENDPOINTS=https://192.168.3.28:2379,https://192.168.3.29:2379,https://192.168.3.30:2379

# 查看集群状态
/usr/local/bin/etcdctl --cacert=/etc/ssl/etcd/ssl/ca.pem --cert=/etc/ssl/etcd/ssl/member-05.pem --key=/etc/ssl/etcd/ssl/member-05-key.pem  --endpoints=$ENDPOINT endpoint status

# 查看健康状态
/usr/local/bin/etcdctl --cacert=/etc/ssl/etcd/ssl/ca.pem --cert=/etc/ssl/etcd/ssl/member-05.pem --key=/etc/ssl/etcd/ssl/member-05-key.pem  --endpoints=$ENDPOINT endpoint health

# 查看所有监控指标
curl --cacert /etc/ssl/etcd/ssl/ca.pem --cert /etc/ssl/etcd/ssl/member-05.pem --key /etc/ssl/etcd/ssl/member-05-key.pem  https://10.201.112.28:2379/metrics 
# 查看和磁盘相关的
curl --cacert /etc/ssl/etcd/ssl/ca.pem --cert /etc/ssl/etcd/ssl/member-05.pem --key /etc/ssl/etcd/ssl/member-05-key.pem  https://10.201.112.28:2379/metrics | grep disk_backend_commit_duration_seconds

说明

bookstack作为confluence的开源替代品,部署简单,使用方便.

docker部署

部署步骤

  • 需要准备一个MySQL,创建好数据库bookstack.
  • docker启动
# 注意APP_URL,这里是你访问bookstack的地址,如果你是通过域名访问则输入你的域名,如https://book.test.com
docker run -d --name bookstack  -e PUID=1000 -e PGID=1000 -e DB_HOST=192.168.3.27  -e DB_PORT=3306 -e DB_USER=root -e DB_PASS=123456 -e DB_DATABASE=bookstack -p 6875:80 -e APP_URL=http://192.168.3.27:6875 -e TZ=Asia/Shanghai  linuxserver/bookstack

接入LDAP

docker run -d --name bookstack  -e PUID=1000 -e PGID=1000 -e DB_HOST=192.168.3.27  -e DB_PORT=3306 -e DB_USER=root -e DB_PASS=123456 -e DB_DATABASE=bookstack -p 6875:80 -e APP_URL=http://192.168.3.27:6875 -e TZ=Asia/Shanghai -e AUTH_METHOD=ldap -e LDAP_SERVER=192.168.0.9:389 -e LDAP_BASE_DN="ou=xxxxx,dc=xxxx,dc=xxxx" -e LDAP_DN="cn=xxx,dc=xxx,dc=xxxx" -e LDAP_VERSION=3 -e LDAP_PASS="xxxxxxxxxxxxxxxxxx"  linuxserver/bookstack

也可以直接修改.env文件然后映射到容器中/config/www/.env. .env文件中有其它默认配置,别弄丢了.

持久化

-v /data/bookstack/app:/app -v /data/bookstack/config:/config