Explorar o código

[warehouse] 支持influx存储number类型数据

tomsun28 %!s(int64=4) %!d(string=hai) anos
pai
achega
b085d288f9

+ 1 - 1
collector/server/pom.xml

@@ -38,7 +38,7 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>0.5.10</version>
+            <version>0.5.11</version>
         </dependency>
         <!-- kafka -->
         <dependency>

+ 5 - 5
collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java

@@ -140,7 +140,9 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
      * @param collectData 采集数据
      */
     private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) {
-        collectData.addAllFields(metrics.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toSet()));
+        collectData.addAllFields(metrics.getFields().stream().map(item ->
+            CollectRep.Field.newBuilder().setName(item.getField()).setType(item.getType()).build()
+        ).collect(Collectors.toSet()));
         List<CollectRep.ValueRow> aliasRowList = collectData.getValuesList();
         if (aliasRowList == null || aliasRowList.isEmpty()) {
             return;
@@ -165,8 +167,7 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
         Map<String, String> aliasFieldValueMap = new HashMap<>(16);
         Map<String, Object> fieldValueMap = new HashMap<>(16);
         CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder();
-        for (int index = 0; index < aliasRowList.size(); index++) {
-            CollectRep.ValueRow aliasRow = aliasRowList.get(index);
+        for (CollectRep.ValueRow aliasRow : aliasRowList) {
             for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) {
                 String aliasFieldValue = aliasRow.getColumns(aliasIndex);
                 if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) {
@@ -174,8 +175,7 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
                 }
             }
             StringBuilder instanceBuilder = new StringBuilder();
-            for (int realIndex = 0; realIndex < fields.size(); realIndex++) {
-                Metrics.Field field = fields.get(realIndex);
+            for (Metrics.Field field : fields) {
                 String realField = field.getField();
                 Expression expression = fieldExpressionMap.get(realField);
                 String value = null;

+ 5 - 0
common/README.md

@@ -0,0 +1,5 @@
+## 公共包   
+
+- 指标字段格式类型 
+> 用 type 表示,0-number数字, 1-string字符串   
+-  

+ 7 - 1
common/pom.xml

@@ -29,7 +29,7 @@
         <dependency>
             <groupId>io.etcd</groupId>
             <artifactId>jetcd-core</artifactId>
-            <version>0.5.10</version>
+            <version>0.5.11</version>
             <scope>provided</scope>
         </dependency>
         <!-- 工具依赖  -->
@@ -47,6 +47,12 @@
             <groupId>com.google.protobuf</groupId>
             <artifactId>protobuf-java-util</artifactId>
             <version>3.19.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 </project>

+ 28 - 4
warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java

@@ -1,6 +1,5 @@
 package com.usthe.warehouse.store;
 
-import com.google.protobuf.ProtocolStringList;
 import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.InfluxDBClientFactory;
 import com.influxdb.client.WriteApi;
@@ -8,6 +7,7 @@ import com.influxdb.client.WriteOptions;
 import com.influxdb.client.domain.WritePrecision;
 import com.influxdb.client.write.Point;
 import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
 import com.usthe.warehouse.MetricsDataQueue;
 import com.usthe.warehouse.WarehouseProperties;
 import com.usthe.warehouse.WarehouseWorkerPool;
@@ -18,6 +18,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Instant;
+import java.util.List;
 
 /**
  * influxdb存储采集数据
@@ -81,17 +82,40 @@ public class InfluxdbDataStorage implements DisposableBean {
 
 
     public void saveData(CollectRep.MetricsData metricsData) {
-        String measurement = metricsData.getApp() + "--" + metricsData.getMetrics();
+        String measurement = metricsData.getApp() + "_" + metricsData.getMetrics();
         String monitorId = String.valueOf(metricsData.getId());
         Instant collectTime = Instant.ofEpochMilli(metricsData.getTime());
-        ProtocolStringList fields = metricsData.getFieldsList();
+
+        List<CollectRep.Field> fields = metricsData.getFieldsList();
         for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
             Point point = Point.measurement(measurement)
                     .addTag("id", monitorId)
                     .addTag("instance", valueRow.getInstance())
                     .time(collectTime, WritePrecision.MS);
             for (int index = 0; index < fields.size(); index++) {
-                point.addField(fields.get(index), valueRow.getColumns(index));
+                CollectRep.Field field = fields.get(index);
+                String value = valueRow.getColumns(index);
+                if (field.getType() == CommonConstants.TYPE_NUMBER) {
+                    // number data
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        point.addField(field.getName(), (Number) null);
+                    } else {
+                        try {
+                            double number = Double.parseDouble(value);
+                            point.addField(field.getName(), number);
+                        } catch (Exception e) {
+                            log.warn(e.getMessage());
+                            point.addField(field.getName(), (Number) null);
+                        }
+                    }
+                } else {
+                    // string
+                    if (CommonConstants.NULL_VALUE.equals(value)) {
+                        point.addField(field.getName(), (String) null);
+                    } else {
+                        point.addField(field.getName(), value);
+                    }
+                }
             }
             writeApi.writePoint(point);
         }