0%

方法1

这种方法能够查询到一些表信息,但是不够详细

PreparedStatement pst = connection.prepareStatement("select * from t_gateway_logs where 1=2");
ResultSet rs = pst.executeQuery();
ResultSetMetaData rsd = rs.getMetaData();
for(int i = 0; i < rsd.getColumnCount(); i++) {
    System.out.print("java类型:"+rsd.getColumnClassName(i + 1));
    System.out.print("  数据库类型:"+rsd.getColumnTypeName(i + 1));
    System.out.print("  字段名称:"+rsd.getColumnName(i + 1));
    System.out.print("  字段长度:"+rsd.getColumnDisplaySize(i + 1));
    System.out.print("  是否为空:"+rsd.isNullable(i + 1));
    System.out.print("  是否自增:"+rsd.isAutoIncrement(i + 1));
    System.out.println();
}

方法二

查询出来的内容非常多。

查询出库中所有表

ResultSet tableRet = connection.getMetaData().getTables(null, "%","%",new String[]{"TABLE"});
while(tableRet.next()) {
    System.out.println(tableRet.getString("TABLE_NAME"));
}

查询某个表的主键信息

ResultSet pkInfo = connection.getMetaData().getPrimaryKeys(null, "%", "test_table");
while (pkInfo.next()) {
    System.out.println("TABLE_CAT:" + pkInfo.getString("TABLE_CAT") + " ");
    System.out.println("TABLE_NAME:" + pkInfo.getString("TABLE_NAME") + " ");
    System.out.println("COLUMN_NAME:" + pkInfo.getString("COLUMN_NAME") + " ");
    System.out.println("PK_NAME:" + pkInfo.getString("PK_NAME") + " ");
    System.out.println("TABLE_SCHEM:" + pkInfo.getString("TABLE_SCHEM") + " ");
    System.out.println("KEY_SEQ:" + pkInfo.getString("KEY_SEQ") + " ");
}

查询某个表的详细信息

ResultSet colRet = connection.getMetaData().getColumns(null, "%", "test_table", "%");
while (colRet.next()) {
    System.out.println(colRet.getString("TABLE_CAT"));
    System.out.println(colRet.getString("TABLE_SCHEM"));
    System.out.println(colRet.getString("TABLE_NAME"));
    System.out.println(colRet.getString("COLUMN_NAME"));
    System.out.println(colRet.getString("DATA_TYPE"));
    System.out.println(colRet.getString("TYPE_NAME"));
    System.out.println(colRet.getInt("COLUMN_SIZE"));
    System.out.println(colRet.getInt("BUFFER_LENGTH"));
    System.out.println(colRet.getInt("DECIMAL_DIGITS"));
    System.out.println(colRet.getInt("NUM_PREC_RADIX"));
    System.out.println(colRet.getInt("NULLABLE"));
    System.out.println(colRet.getString("REMARKS"));
    System.out.println(colRet.getString("COLUMN_DEF"));
    System.out.println(colRet.getInt("SQL_DATA_TYPE"));
    System.out.println(colRet.getInt("SQL_DATETIME_SUB"));
    System.out.println(colRet.getInt("CHAR_OCTET_LENGTH"));
    System.out.println(colRet.getInt("ORDINAL_POSITION"));
    System.out.println(colRet.getBoolean("IS_NULLABLE"));
    System.out.println(colRet.getInt("SCOPE_CATALOG"));
    System.out.println(colRet.getInt("SCOPE_SCHEMA"));
    System.out.println(colRet.getInt("SCOPE_TABLE"));
    System.out.println(colRet.getInt("SOURCE_DATA_TYPE"));
    System.out.println(colRet.getBoolean("IS_AUTOINCREMENT"));
    System.out.println(colRet.getBoolean("IS_GENERATEDCOLUMN"));
}

问题描述

Maven打包jar用的默认插件是maven-jar-plugin,默认Maven生成的JAR包只包含了编译生成的.class文件和项目资源文件。有两个问题:

  • 运行时提示没有主清单属性;
  • 依赖的jar包也没有打包进去.

解决办法

maven打包插件有很多种:

  • maven-shade-plugin
  • maven-assembly-plugin
  • maven-jar-plugin

我们直接重新定义maven-jar-plugin,详细配置如下:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <!-- 这里指定main函数 -->
                                <mainClass>com.deri.App</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

问题描述

使用logback打印日志,启动服务时logback会打印一些自身的调试信息如下,会打印它检查了哪些配置、看见了哪些配置:

18:00:22,729 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
18:00:22,729 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
18:00:22,729 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/D:/DeriGateway/DBCompare/target/classes/logback.xml]
18:00:22,770 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
18:00:22,775 |-INFO in ch.qos.logback.core.joran.action.TimestampAction - Using current interpretation time, i.e. now, as time reference.
18:00:22,788 |-INFO in ch.qos.logback.core.joran.action.TimestampAction - Adding property to the context with key="DATETIME" and value="2020-04-01 18:00:22" to the LOCAL scope
18:00:22,788 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
18:00:22,793 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
18:00:22,796 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
18:00:22,813 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.rolling.RollingFileAppender]
18:00:22,816 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [FILEOUT]
18:00:22,817 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
18:00:22,828 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy@984849465 - No compression will be used
18:00:22,829 |-INFO in c.q.l.core.rolling.TimeBasedRollingPolicy@984849465 - Will use the pattern ./logs//log.%d.%i.log for the active file
18:00:22,830 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@2eee9593 - The date pattern is 'yyyy-MM-dd' from file name pattern './logs//log.%d.%i.log'.
18:00:22,830 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@2eee9593 - Roll-over at midnight.
18:00:22,832 |-INFO in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@2eee9593 - Setting initial period to Wed Apr 01 18:00:22 CST 2020
18:00:22,833 |-WARN in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@2eee9593 - SizeAndTimeBasedFNATP is deprecated. Use SizeAndTimeBasedRollingPolicy instead
18:00:22,833 |-WARN in ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP@2eee9593 - For more information see http://logback.qos.ch/manual/appenders.html#SizeAndTimeBasedRollingPolicy
18:00:22,834 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FILEOUT] - Active log file name: ./logs//log.2020-04-01.0.log
18:00:22,834 |-INFO in ch.qos.logback.core.rolling.RollingFileAppender[FILEOUT] - File property is set to [null]
18:00:22,835 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
18:00:22,835 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
18:00:22,835 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [FILEOUT] to Logger[ROOT]
18:00:22,835 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
18:00:22,836 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@7907ec20 - Registering current configuration as safe fallback point

关闭logback自身调试信息打印

只需增加一行以下配置即可。将状态信息监听器设置为无操作监听器。

<configuration>
    <statusListener class="ch.qos.logback.core.status.NopStatusListener" />
</configuration>

完整logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <statusListener class="ch.qos.logback.core.status.NopStatusListener" />
    <property name="ROOT" value="./logs/" />
    <property name="FILESIZE" value="100MB" />
    <property name="MAXHISTORY" value="30" />
    <timestamp key="DATETIME" datePattern="yyyy-MM-dd HH:mm:ss" />
    <!-- 控制台打印 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>[%-5level] %d{${DATETIME}} [%thread] %logger{36} - %m%n</pattern>
        </encoder>
    </appender>
    <!-- 输入到文件,按日期和文件大小 -->
    <appender name="FILEOUT" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <encoder>
            <pattern>[%-5level] %d{${DATETIME}} [%thread] %logger{36} - %m%n</pattern>
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${ROOT}/log.%d.%i.log</fileNamePattern>
            <maxHistory>${MAXHISTORY}</maxHistory>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>${FILESIZE}</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
        </rollingPolicy>
    </appender>
    <!-- Logger 根目录 -->
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILEOUT" />
    </root>
</configuration>

静态方式

直接在bootstrap.yml或者application.yml中写死;

spring:
  #mysql数据配置
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 123456
    url: jdbc:mysql://192.168.41.128:3306/gw?useUnicode=true&characterEncoding=utf8&useSSL=false

缺点是参数需要改变时,需要重新改写配置,再打包。特别docker部署时,环境变化了,配置改动麻烦。

解决办法

yml文件中,通过${Envirment_variable}的方式可以获取系统环境变量中的值;如上面的配置改写成:

spring:
  #mysql数据配置
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    username: ${USER_NAME:root}
    password: ${PASSWORD:123456}
    url: jdbc:mysql://${IP:192.168.41.128:3306}/gw?useUnicode=true&characterEncoding=utf8&useSSL=false

以上配置,优先设置系统环境变量(USER_NAME、PASSWORD、IP)的值,如果没有,才会使用默认值。

docker部署传入环境变量

  • 通过-e可以传入环境变量;
  • docker-compose可以通过environment传入系统环境变量;

概述

本章创建一个模拟数据源将模拟数据源源不断写到kafka中,然后使用Flink SQL client查询kafka中的数据。

模拟数据由java对象序列化而成的json格式字符串,包含book的id、type、price和时间。

BookPojo.java

package com.deri.pojo.util;

import com.alibaba.fastjson.annotation.JSONField;

import java.sql.Timestamp;
import java.util.Date;

/**
 * @ClassName: BookPojo
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 9:48
 * @Version: v1.0
 **/
public class BookPojo {
    private int id;
    private String type;
    private int price;
    //Currently, JSON format only allowed the timestamp data is in RFC-3339, i.e. "2019-07-09 02:02:00.040Z".
    //注意引入的是java.util.Date;而不是java.sql.Date;
    @JSONField(format = "yyyy-MM-dd'T'HH:mm:ss'Z'")
    private Date ts;

    public BookPojo() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public Date getTs() {
        return ts;
    }

    public void setTs(Date ts) {
        this.ts = ts;
    }
}

模拟书籍Books.java

package com.deri.pojo.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: Books
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 10:52
 * @Version: v1.0
 **/
public class Books {

    public static BookPojo getBook(){
        List<BookPojo> bookPojos = new ArrayList<>();
        {
            BookPojo book1 = new BookPojo();
            book1.setId(1);
            book1.setType("cs");
            book1.setPrice(80);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(2);
            book1.setType("math");
            book1.setPrice(70);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(3);
            book1.setType("ph");
            book1.setPrice(60);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(4);
            book1.setType("cs");
            book1.setPrice(50);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(5);
            book1.setType("math");
            book1.setPrice(40);
            bookPojos.add(book1);
        }

        {
            BookPojo book1 = new BookPojo();
            book1.setId(6);
            book1.setType("ph");
            book1.setPrice(30);
            bookPojos.add(book1);
        }
        return bookPojos.get(new Random().nextInt(6));
    }
}
package com.deri.pojo.util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: MyNoParalleSource
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:15
 * @Version: v1.0
 **/
//使用并行度为1的source
public class MyPojoSource implements SourceFunction<String> {//1

    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            BookPojo book = Books.getBook();
            book.setTs(new Date());
            ctx.collectWithTimestamp(JSON.toJSONString(book),book.getTs().getTime());
            Thread.sleep(new Random().nextInt(4000));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

创建flink任务,此任务只是将模拟数据写到kafka中

package com.deri.pojo;

import com.deri.pojo.util.MyPojoSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @ClassName: KafkaProducer
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:16
 * @Version: v1.0
 **/
public class KafkaPojoProducer {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyPojoSource()).setParallelism(1)/*设置并行度*/;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("pojo4", new SimpleStringSchema(), properties);

        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("POJO Kafka Source");
    }

}

配置sql-client-defaults.yaml

配置文件位于flink根目录/conf

配置table

tables: 
  - name: book
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo4
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts TIMESTAMP>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

启动SQL client

进入到flink根目录下,执行

./bin/sql-client.sh embedded

如果没有报错,可以看到一个小松鼠,并进入以下界面

...
Flink SQL> 

一些测试SQL语句

  1. 最简单的查询
SELECT * FROM book;
  1. 查询价格大于50的
SELECT * FROM book GROUP BY  id,price,type,tss  having price > 50;
  1. 每五分钟book的总销售额
SELECT 
id,
type,
TUMBLE_END(tss, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY  
id,
type,
TUMBLE(tss, INTERVAL '5' MINUTE);
  1. 创建一个新的view,用于存放SELECT结果
CREATE VIEW book_view AS
SELECT * FROM book GROUP BY  id,price,type,tss  having price > 60;
  1. 根据id查询卖出的总数量
SELECT id,type,count(price) AS cnt FROM book GROUP BY  id,type;
  1. 根据type查询卖出的总数量
SELECT type,count(*) AS cnt FROM book GROUP BY type;
  1. 查询过去5分钟不同id的书卖出的总额,每1分钟刷新一次
SELECT 
id,
type,
HOP_START (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY  
id,
type,
HOP (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE);
  1. 创建一个模拟列,存放系统处理的时间
select id,price,tss, PROCTIME() AS ts from book;

滚动窗口(TUMBLE)

滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分成[0:00 - 0:05)、[0:05, 0:10)、 [0:10, 0:15)等窗口。

tumble

语法

TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。

TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit

示例

SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username

具体参考阿里云

滑动窗口(HOP)

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

滑动窗口有两个参数:slide和size。slide为每次滑动的步长,size为窗口的大小。

  • slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
  • slide = size,则等同于滚动窗口(TUMBLE)。
  • slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。

hop

通常情况下大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。

滑动窗口函数语法

HOP函数用在group by子句中,用来定义滑动窗口。

HOP(<time-attr>, <slide-interval>,<size-interval>)
<slide-interval>: INTERVAL 'string' timeUnit
<size-interval>: INTERVAL 'string' timeUnit  

示例

SELECT
    HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    username,
    COUNT (click_url)
FROM
    user_clicks
GROUP BY
    HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    username  

具体参考阿里云

会话窗口(SESSION)

会话窗口(SESSION)通过Session活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,这个窗口就会关闭。

会话窗口通过一个间隔时间(Gap)来配置,这个间隔定义了非活跃周期的长度。例如,一个表示鼠标点击活动的数据流可能具有长时间的空闲时间,并在两段空闲之间散布着高浓度的点击。 如果数据在指定的间隔(Gap)之后到达,则会开始一个新的窗口。

会话窗口示例如下图。每个Key由于不同的数据分布,形成了不同的Window。
session

会话窗口函数语法

SESSION函数用于在GROUP BY子句中定义会话窗口。

SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit

示例

SELECT
SESSION_START(ts, INTERVAL '30' SECOND),
SESSION_END(ts, INTERVAL '30' SECOND),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY SESSION(ts, INTERVAL '30' SECOND), username

具体参考阿里云