diff --git a/collector/server/pom.xml b/collector/server/pom.xml index 4d000a9..61dc023 100644 --- a/collector/server/pom.xml +++ b/collector/server/pom.xml @@ -38,7 +38,7 @@ io.etcd jetcd-core - 0.5.10 + 0.5.11 diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index dcc94c7..ec1cf4d 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -140,7 +140,9 @@ public class MetricsCollect implements Runnable, Comparable { * @param collectData 采集数据 */ private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) { - collectData.addAllFields(metrics.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toSet())); + collectData.addAllFields(metrics.getFields().stream().map(item -> + CollectRep.Field.newBuilder().setName(item.getField()).setType(item.getType()).build() + ).collect(Collectors.toSet())); List aliasRowList = collectData.getValuesList(); if (aliasRowList == null || aliasRowList.isEmpty()) { return; @@ -165,8 +167,7 @@ public class MetricsCollect implements Runnable, Comparable { Map aliasFieldValueMap = new HashMap<>(16); Map fieldValueMap = new HashMap<>(16); CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder(); - for (int index = 0; index < aliasRowList.size(); index++) { - CollectRep.ValueRow aliasRow = aliasRowList.get(index); + for (CollectRep.ValueRow aliasRow : aliasRowList) { for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) { String aliasFieldValue = aliasRow.getColumns(aliasIndex); if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) { @@ -174,8 +175,7 @@ public class MetricsCollect implements Runnable, Comparable { } } StringBuilder instanceBuilder = new StringBuilder(); - for (int realIndex = 0; realIndex < fields.size(); realIndex++) { - Metrics.Field field = fields.get(realIndex); + for (Metrics.Field field : fields) { String realField = field.getField(); Expression expression = fieldExpressionMap.get(realField); String value = null; diff --git a/common/README.md b/common/README.md new file mode 100644 index 0000000..66f2643 --- /dev/null +++ b/common/README.md @@ -0,0 +1,5 @@ +## 公共包 + +- 指标字段格式类型 +> 用 type 表示,0-number数字, 1-string字符串 +- \ No newline at end of file diff --git a/common/pom.xml b/common/pom.xml index 9a1b193..65949be 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -29,7 +29,7 @@ io.etcd jetcd-core - 0.5.10 + 0.5.11 provided @@ -47,6 +47,12 @@ com.google.protobuf protobuf-java-util 3.19.1 + + + com.google.guava + guava + + \ No newline at end of file diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java index 073d77c..22061a3 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java @@ -1,6 +1,5 @@ package com.usthe.warehouse.store; -import com.google.protobuf.ProtocolStringList; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApi; @@ -8,6 +7,7 @@ import com.influxdb.client.WriteOptions; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; import com.usthe.warehouse.MetricsDataQueue; import com.usthe.warehouse.WarehouseProperties; import com.usthe.warehouse.WarehouseWorkerPool; @@ -18,6 +18,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import java.time.Instant; +import java.util.List; /** * influxdb存储采集数据 @@ -81,17 +82,40 @@ public class InfluxdbDataStorage implements DisposableBean { public void saveData(CollectRep.MetricsData metricsData) { - String measurement = metricsData.getApp() + "--" + metricsData.getMetrics(); + String measurement = metricsData.getApp() + "_" + metricsData.getMetrics(); String monitorId = String.valueOf(metricsData.getId()); Instant collectTime = Instant.ofEpochMilli(metricsData.getTime()); - ProtocolStringList fields = metricsData.getFieldsList(); + + List fields = metricsData.getFieldsList(); for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { Point point = Point.measurement(measurement) .addTag("id", monitorId) .addTag("instance", valueRow.getInstance()) .time(collectTime, WritePrecision.MS); for (int index = 0; index < fields.size(); index++) { - point.addField(fields.get(index), valueRow.getColumns(index)); + CollectRep.Field field = fields.get(index); + String value = valueRow.getColumns(index); + if (field.getType() == CommonConstants.TYPE_NUMBER) { + // number data + if (CommonConstants.NULL_VALUE.equals(value)) { + point.addField(field.getName(), (Number) null); + } else { + try { + double number = Double.parseDouble(value); + point.addField(field.getName(), number); + } catch (Exception e) { + log.warn(e.getMessage()); + point.addField(field.getName(), (Number) null); + } + } + } else { + // string + if (CommonConstants.NULL_VALUE.equals(value)) { + point.addField(field.getName(), (String) null); + } else { + point.addField(field.getName(), value); + } + } } writeApi.writePoint(point); }