Przeglądaj źródła

[warehouse]支持TdEngine数据库存储时序数据

tomsun28 4 lat temu
rodzic
commit
71f5edf5fc

+ 4 - 4
collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java

@@ -18,12 +18,12 @@ import java.util.concurrent.TimeUnit;
 public class MetricsDataExporter implements DisposableBean {
 
     private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToAlertQueue;
-    private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToWarehouseInfluxQueue;
+    private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToPersistentStorageQueue;
     private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToWarehouseRedisQueue;
 
     public MetricsDataExporter() {
         metricsDataToAlertQueue = new LinkedBlockingQueue<>();
-        metricsDataToWarehouseInfluxQueue = new LinkedBlockingQueue<>();
+        metricsDataToPersistentStorageQueue = new LinkedBlockingQueue<>();
         metricsDataToWarehouseRedisQueue = new LinkedBlockingQueue<>();
     }
 
@@ -31,7 +31,7 @@ public class MetricsDataExporter implements DisposableBean {
         return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
     }
 
-    public CollectRep.MetricsData pollWarehouseInfluxMetricsData() throws InterruptedException {
+    public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
         return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
     }
 
@@ -45,7 +45,7 @@ public class MetricsDataExporter implements DisposableBean {
      */
     public void send(CollectRep.MetricsData metricsData) {
         metricsDataToAlertQueue.offer(metricsData);
-        metricsDataToWarehouseInfluxQueue.offer(metricsData);
+        metricsDataToPersistentStorageQueue.offer(metricsData);
         metricsDataToWarehouseRedisQueue.offer(metricsData);
     }
 

+ 9 - 4
warehouse/pom.xml

@@ -33,6 +33,11 @@
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-autoconfigure</artifactId>
         </dependency>
         <dependency>
@@ -40,11 +45,11 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
-        <!-- influxdb -->
+        <!-- tdEngine -->
         <dependency>
-            <groupId>com.influxdb</groupId>
-            <artifactId>influxdb-client-java</artifactId>
-            <version>3.4.0</version>
+            <groupId>com.taosdata.jdbc</groupId>
+            <artifactId>taos-jdbcdriver</artifactId>
+            <version>2.0.36</version>
         </dependency>
         <!-- kafka -->
         <dependency>

+ 79 - 4
warehouse/src/main/java/com/usthe/warehouse/WarehouseProperties.java

@@ -47,13 +47,13 @@ public class WarehouseProperties {
         /**
          * kafka配置信息
          */
-        private EntranceProperties.KafkaProperties kafka;
+        private KafkaProperties kafka;
 
-        public EntranceProperties.KafkaProperties getKafka() {
+        public KafkaProperties getKafka() {
             return kafka;
         }
 
-        public void setKafka(EntranceProperties.KafkaProperties kafka) {
+        public void setKafka(KafkaProperties kafka) {
             this.kafka = kafka;
         }
 
@@ -124,6 +124,10 @@ public class WarehouseProperties {
          * redis配置信息
          */
         private RedisProperties redis;
+        /**
+         * TdEngine配置信息
+         */
+        private TdEngineProperties tdEngine;
 
         public InfluxdbProperties getInfluxdb() {
             return influxdb;
@@ -141,11 +145,19 @@ public class WarehouseProperties {
             this.redis = redis;
         }
 
+        public TdEngineProperties getTdEngine() {
+            return tdEngine;
+        }
+
+        public void setTdEngine(TdEngineProperties tdEngine) {
+            this.tdEngine = tdEngine;
+        }
+
         public static class InfluxdbProperties {
             /**
              * influxdb数据存储是否启动
              */
-            private boolean enabled = true;
+            private boolean enabled = false;
             /**
              * influxdb的连接服务器url
              */
@@ -204,6 +216,69 @@ public class WarehouseProperties {
             }
         }
 
+        public static class TdEngineProperties {
+            /**
+             * TdEngine数据存储是否启动
+             */
+            private boolean enabled = true;
+            /**
+             * TdEngine的连接服务器url
+             */
+            private String url = "jdbc:TAOS-RS://localhost:6041/demo";
+            /**
+             * 驱动类路径
+             */
+            private String driverClassName = "com.taosdata.jdbc.rs.RestfulDriver";
+            /**
+             * 数据库用户名
+             */
+            private String username;
+            /**
+             * 数据库密码
+             */
+            private String password;
+
+            public boolean isEnabled() {
+                return enabled;
+            }
+
+            public void setEnabled(boolean enabled) {
+                this.enabled = enabled;
+            }
+
+            public String getUrl() {
+                return url;
+            }
+
+            public void setUrl(String url) {
+                this.url = url;
+            }
+
+            public String getDriverClassName() {
+                return driverClassName;
+            }
+
+            public void setDriverClassName(String driverClassName) {
+                this.driverClassName = driverClassName;
+            }
+
+            public String getUsername() {
+                return username;
+            }
+
+            public void setUsername(String username) {
+                this.username = username;
+            }
+
+            public String getPassword() {
+                return password;
+            }
+
+            public void setPassword(String password) {
+                this.password = password;
+            }
+        }
+
         public static class RedisProperties {
             /**
              * redis数据存储是否启动

+ 330 - 0
warehouse/src/main/java/com/usthe/warehouse/store/TdEngineDataStorage.java

@@ -0,0 +1,330 @@
+package com.usthe.warehouse.store;
+
+import com.usthe.collector.dispatch.export.MetricsDataExporter;
+import com.usthe.common.entity.dto.Value;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
+import com.usthe.warehouse.WarehouseProperties;
+import com.usthe.warehouse.WarehouseWorkerPool;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * influxdb存储采集数据
+ * @author tom
+ * @date 2021/11/24 18:23
+ */
+@Configuration
+@AutoConfigureAfter(value = {WarehouseProperties.class})
+@ConditionalOnProperty(prefix = "warehouse.store.td-engine",
+        name = "enabled", havingValue = "true", matchIfMissing = true)
+@Slf4j
+public class TdEngineDataStorage implements DisposableBean {
+
+    private HikariDataSource hikariDataSource;
+    private WarehouseWorkerPool workerPool;
+    private MetricsDataExporter dataExporter;
+    private static final String INSERT_TABLE_DATA_SQL = "INSERT INTO %s USING %s TAGS (%s) VALUES %s";
+    private static final String CREATE_SUPER_TABLE_SQL = "CREATE STABLE IF NOT EXISTS %s %s TAGS (monitor BIGINT)";
+    private static final String NO_SUPER_TABLE_ERROR = "Table does not exist";
+    private static final String QUERY_HISTORY_WITH_INSTANCE_SQL
+            = "SELECT ts, instance, %s FROM %s WHERE instance = %s AND ts >= now - %s order by ts desc";
+    private static final String QUERY_HISTORY_SQL
+            = "SELECT ts, instance, %s FROM %s WHERE ts >= now - %s order by ts desc";
+    private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL
+            = "SELECT first(%s), avg(%s), min(%s), max(%s) FROM %s WHERE instance = %s AND ts >= now - %s interval(4h)";
+    private static final String QUERY_INSTANCE_SQL
+            = "SELECT DISTINCT instance FROM %s WHERE ts >= now - 1w";
+
+    public TdEngineDataStorage(WarehouseWorkerPool workerPool, WarehouseProperties properties,
+                               MetricsDataExporter dataExporter) {
+        this.workerPool = workerPool;
+        this.dataExporter = dataExporter;
+        if (properties == null || properties.getStore() == null || properties.getStore().getTdEngine() == null) {
+            log.error("init error, please config Warehouse influxdb props in application.yml");
+            throw new IllegalArgumentException("please config Warehouse influxdb props");
+        }
+        initTdEngineDatasource(properties.getStore().getTdEngine());
+        startStorageData();
+    }
+
+    private void initTdEngineDatasource(WarehouseProperties.StoreProperties.TdEngineProperties tdEngineProperties) {
+        HikariConfig config = new HikariConfig();
+        // jdbc properties
+        config.setJdbcUrl(tdEngineProperties.getUrl());
+        config.setUsername(tdEngineProperties.getUsername());
+        config.setPassword(tdEngineProperties.getPassword());
+        config.setDriverClassName(tdEngineProperties.getDriverClassName());
+        //minimum number of idle connection
+        config.setMinimumIdle(10);
+        //maximum number of connection in the pool
+        config.setMaximumPoolSize(10);
+        //maximum wait milliseconds for get connection from pool
+        config.setConnectionTimeout(30000);
+        // maximum lifetime for each connection
+        config.setMaxLifetime(0);
+        // max idle time for recycle idle connection
+        config.setIdleTimeout(0);
+        //validation query
+        config.setConnectionTestQuery("select server_status()");
+        this.hikariDataSource =  new HikariDataSource(config);
+    }
+
+    private void startStorageData() {
+        Runnable runnable = () -> {
+            Thread.currentThread().setName("warehouse-tdEngine-data-storage");
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    CollectRep.MetricsData metricsData = dataExporter.pollPersistentStorageMetricsData();
+                    if (metricsData != null) {
+                        saveData(metricsData);
+                    }
+                } catch (InterruptedException e) {
+                    log.error(e.getMessage());
+                }
+            }
+        };
+        workerPool.executeJob(runnable);
+        workerPool.executeJob(runnable);
+    }
+
+
+    public void saveData(CollectRep.MetricsData metricsData) {
+        if (metricsData == null || metricsData.getValuesList().isEmpty() || metricsData.getFieldsList().isEmpty()) {
+            return;
+        }
+        String monitorId = String.valueOf(metricsData.getId());
+        String superTable = metricsData.getApp() + "_" + metricsData.getMetrics() + "_super";
+        String table = metricsData.getApp() + "_" + metricsData.getMetrics() + "_" + monitorId;
+        //组建DATA SQL
+        List<CollectRep.Field> fields = metricsData.getFieldsList();
+        StringBuilder sqlBuffer = new StringBuilder();
+        for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
+            StringBuilder sqlRowBuffer = new StringBuilder("(");
+            sqlRowBuffer.append(metricsData.getTime()).append(", ");
+            sqlRowBuffer.append("'").append(valueRow.getInstance()).append("', ");
+            for (int index = 0; index < fields.size(); index++) {
+                CollectRep.Field field = fields.get(index);
+                String value = valueRow.getColumns(index);
+                if (field.getType() == CommonConstants.TYPE_NUMBER) {
+                    // number data
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        sqlRowBuffer.append("NULL");
+                    } else {
+                        try {
+                            double number = Double.parseDouble(value);
+                            sqlRowBuffer.append(number);
+                        } catch (Exception e) {
+                            log.warn(e.getMessage());
+                            sqlRowBuffer.append("NULL");
+                        }
+                    }
+                } else {
+                    // string
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        sqlRowBuffer.append("NULL");
+                    } else {
+                        sqlRowBuffer.append("'").append(value).append("'");
+                    }
+                }
+                if (index != fields.size() - 1) {
+                    sqlRowBuffer.append(", ");
+                }
+            }
+            sqlRowBuffer.append(")");
+            sqlBuffer.append(" ").append(sqlRowBuffer);
+        }
+        String insertDataSql = String.format(INSERT_TABLE_DATA_SQL, table, superTable, monitorId, sqlBuffer);
+        log.info(insertDataSql);
+        Connection connection = null;
+        Statement statement = null;
+        try {
+            connection = hikariDataSource.getConnection();
+            statement = connection.createStatement();
+            statement.execute(insertDataSql);
+            connection.close();
+        } catch (Exception e) {
+            if (e.getMessage().contains(NO_SUPER_TABLE_ERROR)) {
+                // 超级表未创建 创建对应超级表
+                StringBuilder fieldSqlBuilder = new StringBuilder("(");
+                fieldSqlBuilder.append("ts TIMESTAMP, ");
+                fieldSqlBuilder.append("instance NCHAR(100), ");
+                for (int index = 0; index < fields.size(); index++) {
+                    CollectRep.Field field = fields.get(index);
+                    String fieldName = field.getName();
+                    if (field.getType() == CommonConstants.TYPE_NUMBER) {
+                        fieldSqlBuilder.append(fieldName).append(" ").append("DOUBLE");
+                    } else {
+                        fieldSqlBuilder.append(fieldName).append(" ").append("NCHAR(100)");
+                    }
+                    if (index != fields.size() - 1) {
+                        fieldSqlBuilder.append(", ");
+                    }
+                }
+                fieldSqlBuilder.append(")");
+                String createTableSql = String.format(CREATE_SUPER_TABLE_SQL, superTable, fieldSqlBuilder);
+                try {
+                    assert statement != null;
+                    statement.execute(createTableSql);
+                    statement.execute(insertDataSql);
+                } catch (Exception createTableException) {
+                    log.error(e.getMessage(), createTableException);
+                }
+            } else {
+                log.error(e.getMessage());
+            }
+        } finally {
+            try {
+                assert connection != null;
+                connection.close();
+            } catch (Exception e) {
+                log.error(e.getMessage());
+            }
+        }
+    }
+
+
+    @Override
+    public void destroy() throws Exception {
+        if (hikariDataSource != null) {
+            hikariDataSource.close();
+        }
+    }
+
+    /**
+     * 从TD ENGINE时序数据库获取指标历史数据
+     *
+     * @param monitorId 监控ID
+     * @param app 监控类型
+     * @param metrics 指标集合名
+     * @param metric 指标名
+     * @param instance 实例
+     * @param history 历史范围
+     * @return 指标历史数据列表
+     */
+    public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, String instance, String history) {
+        String table = app + "_" + metrics + "_" + monitorId;
+        String selectSql =  instance == null ? String.format(QUERY_HISTORY_SQL, metric, table, history) :
+                String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric, table, instance, history);
+        log.info(selectSql);
+        Connection connection = null;
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
+        try {
+            connection = hikariDataSource.getConnection();
+            Statement statement = connection.createStatement();
+            ResultSet resultSet = statement.executeQuery(selectSql);
+            while (resultSet.next()) {
+                Timestamp ts = resultSet.getTimestamp(1);
+                String instanceValue = resultSet.getString(2);
+                if (instanceValue == null || "".equals(instanceValue)) {
+                    instanceValue = "NULL";
+                }
+                double value = resultSet.getDouble(3);
+                List<Value> valueList = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
+                valueList.add(new Value(String.valueOf(value), ts.getTime()));
+            }
+            resultSet.close();
+            return instanceValuesMap;
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            try {
+                assert connection != null;
+                connection.close();
+            } catch (Exception e) {
+                log.error(e.getMessage());
+            }
+        }
+        return instanceValuesMap;
+    }
+
+    public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
+                                                    String metric, String instance, String history) {
+        String table = app + "_" + metrics + "_" + monitorId;
+        List<String> instances = new LinkedList<>();
+        if (instance != null) {
+            instances.add(instance);
+        }
+        if (instances.isEmpty()) {
+            // 若未指定instance,需查询当前指标数据前1周有多少个instance
+            String queryInstanceSql = String.format(QUERY_INSTANCE_SQL, table);
+            Connection connection = null;
+            try {
+                connection = hikariDataSource.getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(queryInstanceSql);
+                while (resultSet.next()) {
+                    String instanceValue = resultSet.getString(1);
+                    if (instanceValue == null || "".equals(instanceValue)) {
+                        instances.add("''");
+                    } else {
+                        instances.add(instanceValue);
+                    }
+                }
+            } catch (Exception e) {
+                log.error(e.getMessage());
+            } finally {
+                try {
+                    assert connection != null;
+                    connection.close();
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }
+        }
+        Map<String, List<Value>> instanceValuesMap = new HashMap<>(instances.size());
+        for (String instanceValue : instances) {
+            String selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL,
+                            metric, metric, metric, metric, table, instanceValue, history);
+            log.info(selectSql);
+            if ("''".equals(instanceValue)) {
+                instanceValue = "NULL";
+            }
+            List<Value> values = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>());
+            Connection connection = null;
+            try {
+                connection = hikariDataSource.getConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(selectSql);
+                while (resultSet.next()) {
+                    Timestamp ts = resultSet.getTimestamp(1);
+                    double origin = resultSet.getDouble(2);
+                    double avg = resultSet.getDouble(3);
+                    double min = resultSet.getDouble(4);
+                    double max = resultSet.getDouble(5);
+                    Value value = Value.builder()
+                            .origin(String.valueOf(origin)).mean(String.valueOf(avg))
+                            .min(String.valueOf(min)).max(String.valueOf(max))
+                            .time(ts.getTime())
+                            .build();
+                    values.add(value);
+                }
+                resultSet.close();
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            } finally {
+                try {
+                    assert connection != null;
+                    connection.close();
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }
+        }
+        return instanceValuesMap;
+    }
+}

+ 1 - 0
warehouse/src/main/resources/META-INF/spring.factories

@@ -3,4 +3,5 @@ com.usthe.warehouse.WarehouseProperties,\
 com.usthe.warehouse.MetricsDataQueue,\
 com.usthe.warehouse.WarehouseWorkerPool,\
 com.usthe.warehouse.store.RedisDataStorage,\
+com.usthe.warehouse.store.TdEngineDataStorage,\
 com.usthe.warehouse.controller.MetricsDataController