Ver código fonte

[collector]采集器支持mysql协议指标采集

tomsun28 4 anos atrás
pai
commit
5a12c07ac4

+ 8 - 1
collector/pom.xml

@@ -31,7 +31,7 @@
         <dependency>
             <groupId>com.usthe.tancloud</groupId>
             <artifactId>common</artifactId>
-            <version>1.0-SNAPSHOT</version>
+            <version>1.0</version>
         </dependency>
         <!-- etcd -->
         <dependency>
@@ -84,6 +84,13 @@
             <artifactId>aviator</artifactId>
             <version>5.2.7</version>
         </dependency>
+        <!--collect-->
+        <!-- mysql -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.27</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 120 - 47
collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java

@@ -1,8 +1,14 @@
 package com.usthe.collector.collect.database;
 
+import com.usthe.collector.collect.AbstractCollect;
 import com.usthe.collector.common.cache.CacheIdentifier;
 import com.usthe.collector.common.cache.CommonCache;
 import com.usthe.collector.common.cache.support.CommonJdbcConnect;
+import com.usthe.collector.util.CollectorConstants;
+import com.usthe.common.entity.job.Metrics;
+import com.usthe.common.entity.job.protocol.JdbcProtocol;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
@@ -11,8 +17,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -21,10 +27,61 @@ import java.util.Optional;
  * @date 2021/9/1 21:37
  */
 @Slf4j
-public class JdbcCommonCollect {
+public class JdbcCommonCollect extends AbstractCollect {
 
+    private static final String QUERY_TYPE_ONE_ROW = "oneRow";
+    private static final String QUERY_TYPE_MULTI_ROW = "multiRow";
+    private static final String QUERY_TYPE_COLUMNS = "columns";
 
-    private Statement getConnection(String username, String password, String url) {
+    private JdbcCommonCollect(){}
+
+    public static JdbcCommonCollect getInstance() {
+        return Singleton.INSTANCE;
+    }
+
+    @Override
+    public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+        long startTime = System.currentTimeMillis();
+        // 简单校验必有参数
+        if (metrics == null || metrics.getJdbc() == null) {
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg("DATABASE collect must has jdbc params");
+            return;
+        }
+        JdbcProtocol jdbcProtocol = metrics.getJdbc();
+        String databaseUrl = constructDatabaseUrl(jdbcProtocol);
+        try {
+            Statement statement = getConnection(jdbcProtocol.getUsername(),
+                    jdbcProtocol.getPassword(), databaseUrl);
+            switch (jdbcProtocol.getQueryType()) {
+                case QUERY_TYPE_ONE_ROW:
+                    queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+                    break;
+                case QUERY_TYPE_MULTI_ROW:
+                    queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+                    break;
+                case QUERY_TYPE_COLUMNS:
+                    queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
+                    break;
+                default:
+                    builder.setCode(CollectRep.Code.FAIL);
+                    builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
+                    break;
+            }
+        } catch (SQLException sqlException) {
+            log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(),
+                    sqlException.getErrorCode(), sqlException);
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg("Query Error: " + sqlException.getMessage() + " Code: " + sqlException.getErrorCode());
+        } catch (Exception e) {
+            log.error("Jdbc error: {}.", e.getMessage(), e);
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg("Query Error: " + e.getMessage());
+        }
+    }
+
+
+    private Statement getConnection(String username, String password, String url) throws Exception {
         CacheIdentifier identifier = CacheIdentifier.builder()
                 .ip(url)
                 .username(username).password(password).build();
@@ -56,20 +113,14 @@ public class JdbcCommonCollect {
             return statement;
         }
         // 复用失败则新建连接
-        try {
-            Connection connection = DriverManager.getConnection(url, username, password);
-            statement = connection.createStatement();
-            // 设置查询超时时间10秒
-            statement.setQueryTimeout(10);
-            // 设置查询最大行数1000行
-            statement.setMaxRows(1000);
-            CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
-            CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
-        } catch (SQLException sqlException) {
-            log.error("Jdbc sql error: {}, code: {}.", sqlException.getMessage(), sqlException.getErrorCode(), sqlException);
-        } catch (Exception e) {
-            log.error("Jdbc error: {}.", e.getMessage(), e);
-        }
+        Connection connection = DriverManager.getConnection(url, username, password);
+        statement = connection.createStatement();
+        // 设置查询超时时间10秒
+        statement.setQueryTimeout(10);
+        // 设置查询最大行数1000行
+        statement.setMaxRows(1000);
+        CommonJdbcConnect jdbcConnect = new CommonJdbcConnect(connection);
+        CommonCache.getInstance().addCache(identifier, jdbcConnect, 10000L);
         return statement;
     }
 
@@ -83,30 +134,28 @@ public class JdbcCommonCollect {
      * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
      * @throws Exception when error happen
      */
-    private List<List<String>> queryOneRow(Statement statement, String sql, List<String> columns) throws Exception {
-        long startTime = System.currentTimeMillis();
+    private void queryOneRow(Statement statement, String sql, List<String> columns,
+                                           CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
         statement.setMaxRows(1);
         ResultSet resultSet = statement.executeQuery(sql);
-        List<List<String>> rowList = new LinkedList<>();
         try {
             if (resultSet.next()) {
-                LinkedList<String> rows = new LinkedList<>();
+                CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
                 for (String column : columns) {
-                    if ("responseTime".equals(column)) {
+                    if (CollectorConstants.RESPONSE_TIME.equals(column)) {
                         long time = System.currentTimeMillis() - startTime;
-                        rows.add(String.valueOf(time));
+                        valueRowBuilder.addColumns(String.valueOf(time));
                     } else {
                         String value = resultSet.getString(column);
-                        value = value == null ? "" : value;
-                        rows.add(value);
+                        value = value == null ? CommonConstants.NULL_VALUE : value;
+                        valueRowBuilder.addColumns(value);
                     }
                 }
-                rowList.add(rows);
+                builder.addValues(valueRowBuilder.build());
             }
         } finally {
             resultSet.close();
         }
-        return rowList;
     }
 
     /**
@@ -120,10 +169,9 @@ public class JdbcCommonCollect {
      * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
      * @throws Exception when error happen
      */
-    private List<List<String>> queryOneRowByMatchTwoColumns(Statement statement, String sql, List<String> columns) throws Exception {
-        long startTime = System.currentTimeMillis();
+    private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List<String> columns,
+                                              CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
         ResultSet resultSet = statement.executeQuery(sql);
-        List<List<String>> rowList = new LinkedList<>();
         try {
             HashMap<String, String> values = new HashMap<>(columns.size());
             while (resultSet.next()) {
@@ -131,22 +179,21 @@ public class JdbcCommonCollect {
                     values.put(resultSet.getString(1).toLowerCase(), resultSet.getString(2));
                 }
             }
-            LinkedList<String> rows = new LinkedList<>();
+            CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
             for (String column : columns) {
-                if ("responseTime".equals(column)) {
+                if (CollectorConstants.RESPONSE_TIME.equals(column)) {
                     long time = System.currentTimeMillis() - startTime;
-                    rows.add(String.valueOf(time));
+                    valueRowBuilder.addColumns(String.valueOf(time));
                 } else {
                     String value = values.get(column.toLowerCase());
-                    value = value == null ? "" : value;
-                    rows.add(value);
+                    value = value == null ? CommonConstants.NULL_VALUE : value;
+                    valueRowBuilder.addColumns(value);
                 }
             }
-            rowList.add(rows);
+            builder.addValues(valueRowBuilder.build());
         } finally {
             resultSet.close();
         }
-        return rowList;
     }
 
     /**
@@ -159,30 +206,56 @@ public class JdbcCommonCollect {
      * @param columns 查询的列头(一般是数据库表字段,也可能包含特殊字段,eg: responseTime)
      * @throws Exception when error happen
      */
-    private List<List<String>> queryMultiRow(Statement statement, String sql, List<String> columns) throws Exception {
-        long startTime = System.currentTimeMillis();
+    private void queryMultiRow(Statement statement, String sql, List<String> columns,
+                               CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
         ResultSet resultSet = statement.executeQuery(sql);
-        List<List<String>> rowList = new LinkedList<>();
         try {
             while (resultSet.next()) {
-                LinkedList<String> rows = new LinkedList<>();
+                CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
                 for (String column : columns) {
-                    if ("responseTime".equals(column)) {
+                    if (CollectorConstants.RESPONSE_TIME.equals(column)) {
                         long time = System.currentTimeMillis() - startTime;
-                        rows.add(String.valueOf(time));
+                        valueRowBuilder.addColumns(String.valueOf(time));
                     } else {
                         String value = resultSet.getString(column);
-                        value = value == null ? "" : value;
-                        rows.add(value);
+                        value = value == null ? CommonConstants.NULL_VALUE : value;
+                        valueRowBuilder.addColumns(value);
                     }
                 }
-                rowList.add(rows);
+                builder.addValues(valueRowBuilder.build());
             }
         } finally {
             resultSet.close();
         }
-        return rowList;
     }
 
+    /**
+     * 根据jdbc入参构造数据库URL
+     * @param jdbcProtocol jdbc
+     * @return URL
+     */
+    private String constructDatabaseUrl(JdbcProtocol jdbcProtocol) {
+        if (Objects.nonNull(jdbcProtocol.getUrl())
+                && !Objects.equals("", jdbcProtocol.getUrl())
+                && jdbcProtocol.getUrl().startsWith("jdbc")) {
+            // 入参数URL有效 则优先级最高返回
+            return jdbcProtocol.getUrl();
+        }
+        String url;
+        switch (jdbcProtocol.getPlatform()) {
+            case "mysql":
+                url = "jdbc:mysql://" + jdbcProtocol.getHost() + ":" + jdbcProtocol.getPort()
+                        + "/" + (jdbcProtocol.getDatabase() == null ? "" : jdbcProtocol.getDatabase())
+                        + "?useUnicode=true&characterEncoding=utf-8&useSSL=false";
+                break;
+            default:
+                throw new IllegalArgumentException("Not support database platform: " + jdbcProtocol.getPlatform());
 
+        }
+        return url;
+    }
+
+    private static class Singleton {
+        private static final JdbcCommonCollect INSTANCE = new JdbcCommonCollect();
+    }
 }

+ 4 - 0
collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java

@@ -3,6 +3,7 @@ package com.usthe.collector.dispatch;
 import com.googlecode.aviator.AviatorEvaluator;
 import com.googlecode.aviator.Expression;
 import com.usthe.collector.collect.AbstractCollect;
+import com.usthe.collector.collect.database.JdbcCommonCollect;
 import com.usthe.collector.collect.http.HttpCollectImpl;
 import com.usthe.collector.collect.icmp.IcmpCollectImpl;
 import com.usthe.collector.collect.telnet.TelnetCollectImpl;
@@ -107,6 +108,9 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
             case DispatchConstants.PROTOCOL_TELNET:
                 abstractCollect = TelnetCollectImpl.getInstance();
                 break;
+            case DispatchConstants.PROTOCOL_JDBC:
+                abstractCollect = JdbcCommonCollect.getInstance();
+                break;
                 // todo
             default: break;
         }

+ 1 - 1
collector/src/main/java/com/usthe/collector/util/JsonPathParser.java

@@ -32,7 +32,7 @@ public class JsonPathParser {
      * 使用jsonPath来解析json内容
      * @param content json内容
      * @param jsonPath jsonPath脚本
-     * @return 解析后的内容
+     * @return 解析后的内容 [{'name': 'tom', 'speed': '433'},{'name': 'lili', 'speed': '543'}]
      */
     public static List<Map<String, Object>> parseContentWithJsonPath(String content, String jsonPath) {
         if (content == null || jsonPath == null || "".equals(content) || "".equals(jsonPath)) {

+ 31 - 0
collector/src/main/java/com/usthe/collector/util/PrometheusTextParser.java

@@ -0,0 +1,31 @@
+package com.usthe.collector.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * prometheus-format-text parser
+ * @author tom
+ * @date 2022/1/9 14:12
+ */
+public class PrometheusTextParser {
+
+    /**
+     * 解析prometheusText
+     * @param content 待解析文本内容
+     * @return eg:[{'name': 'tom', 'speed': '433'},{'name': 'lili', 'speed': '543'},{'name': 'sam', 'speed': '643'}]
+     */
+    public static Map<String, List<Map<String, Object>>> parsePrometheusText(String content) {
+        String[] lines = content.split("\n");
+        Map<String, List<Map<String, Object>>> parseResult = new HashMap<>(8);
+        for (String lineTmp : lines) {
+            String line = lineTmp.trim();
+            if (line.length() == 0 || line.startsWith("#")) {
+                continue;
+            }
+            
+        }
+        return null;
+    }
+}

+ 1 - 1
common/src/main/java/com/usthe/common/entity/dto/Field.java

@@ -29,6 +29,6 @@ public class Field {
     private String unit;
 
     @ApiModelProperty(value = "是否是实例字段", position = 3)
-    private boolean instance;
+    private Boolean instance;
 
 }

+ 5 - 0
common/src/main/java/com/usthe/common/entity/dto/Message.java

@@ -49,6 +49,11 @@ public class Message<T> {
         this.msg = msg;
     }
 
+    public Message(byte code, String msg) {
+        this.code = code;
+        this.msg = msg;
+    }
+
     public Message(T data) {
         this.data = data;
     }

+ 39 - 0
common/src/main/java/com/usthe/common/entity/dto/MetricsHistoryData.java

@@ -0,0 +1,39 @@
+package com.usthe.common.entity.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 历史单指标数据
+ * @author tom
+ * @date 2022/1/21 09:58
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel(description = "历史单指标数据")
+public class MetricsHistoryData {
+
+    @ApiModelProperty(value = "监控ID", position = 0)
+    private Long id;
+
+    @ApiModelProperty(value = "监控类型", position = 1)
+    private String app;
+
+    @ApiModelProperty(value = "监控指标组", position = 2)
+    private String metric;
+
+    @ApiModelProperty(value = "监控指标", position = 4)
+    private Field field;
+
+    @ApiModelProperty(value = "监控指标历史值 instance<==>values", position = 5)
+    private Map<String, List<Value>> values;
+}

+ 11 - 3
common/src/main/java/com/usthe/common/entity/dto/Value.java

@@ -23,18 +23,26 @@ public class Value {
         this.origin = origin;
     }
 
+    public Value(String origin, long time) {
+        this.origin = origin;
+        this.time = time;
+    }
+
     @ApiModelProperty(value = "原始值", position = 0)
     private String origin;
 
     @ApiModelProperty(value = "平均值", position = 1)
     private String mean;
 
-    @ApiModelProperty(value = "中位数值", position = 0)
+    @ApiModelProperty(value = "中位数值,暂不支持", position = 2)
     private String median;
 
-    @ApiModelProperty(value = "最小值", position = 0)
+    @ApiModelProperty(value = "最小值", position = 3)
     private String min;
 
-    @ApiModelProperty(value = "最大值", position = 0)
+    @ApiModelProperty(value = "最大值", position = 4)
     private String max;
+
+    @ApiModelProperty(value = "数据采集时间,此字段查历史数据时有效", position = 5)
+    private Long time;
 }

+ 18 - 2
common/src/main/java/com/usthe/common/entity/job/protocol/JdbcProtocol.java

@@ -22,7 +22,7 @@ public class JdbcProtocol {
     /**
      * 端口号
      */
-    private Integer port;
+    private String port;
     /**
      * 数据库用户名(可选)
      */
@@ -32,7 +32,23 @@ public class JdbcProtocol {
      */
     private String password;
     /**
-     * 数据库链接url eg: jdbc:mysql://localhost:3306
+     * 数据库
+     */
+    private String database;
+    /**
+     * 数据库类型 mysql oracle ...
+     */
+    private String platform;
+    /**
+     * SQL查询方式: oneRow, multiRow, columns
+     */
+    private String queryType;
+    /**
+     * sql
+     */
+    private String sql;
+    /**
+     * 数据库链接url eg: jdbc:mysql://localhost:3306/usthe
      */
     private String url;
 }