From b085d288f912ae90b4f74d3f5d406cd867a6057a Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Thu, 25 Nov 2021 22:09:46 +0800 Subject: [PATCH] =?UTF-8?q?[warehouse]=20=E6=94=AF=E6=8C=81influx=E5=AD=98?= =?UTF-8?q?=E5=82=A8number=E7=B1=BB=E5=9E=8B=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collector/server/pom.xml | 2 +- .../collector/dispatch/MetricsCollect.java | 10 +++--- common/README.md | 5 +++ common/pom.xml | 8 ++++- .../warehouse/store/InfluxdbDataStorage.java | 32 ++++++++++++++++--- 5 files changed, 46 insertions(+), 11 deletions(-) create mode 100644 common/README.md diff --git a/collector/server/pom.xml b/collector/server/pom.xml index 4d000a9..61dc023 100644 --- a/collector/server/pom.xml +++ b/collector/server/pom.xml @@ -38,7 +38,7 @@ io.etcd jetcd-core - 0.5.10 + 0.5.11 diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java index dcc94c7..ec1cf4d 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java +++ b/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java @@ -140,7 +140,9 @@ public class MetricsCollect implements Runnable, Comparable { * @param collectData 采集数据 */ private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) { - collectData.addAllFields(metrics.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toSet())); + collectData.addAllFields(metrics.getFields().stream().map(item -> + CollectRep.Field.newBuilder().setName(item.getField()).setType(item.getType()).build() + ).collect(Collectors.toSet())); List aliasRowList = collectData.getValuesList(); if (aliasRowList == null || aliasRowList.isEmpty()) { return; @@ -165,8 +167,7 @@ public class MetricsCollect implements Runnable, Comparable { Map aliasFieldValueMap = new HashMap<>(16); Map fieldValueMap = new HashMap<>(16); CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder(); - for (int index = 0; index < aliasRowList.size(); index++) { - CollectRep.ValueRow aliasRow = aliasRowList.get(index); + for (CollectRep.ValueRow aliasRow : aliasRowList) { for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) { String aliasFieldValue = aliasRow.getColumns(aliasIndex); if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) { @@ -174,8 +175,7 @@ public class MetricsCollect implements Runnable, Comparable { } } StringBuilder instanceBuilder = new StringBuilder(); - for (int realIndex = 0; realIndex < fields.size(); realIndex++) { - Metrics.Field field = fields.get(realIndex); + for (Metrics.Field field : fields) { String realField = field.getField(); Expression expression = fieldExpressionMap.get(realField); String value = null; diff --git a/common/README.md b/common/README.md new file mode 100644 index 0000000..66f2643 --- /dev/null +++ b/common/README.md @@ -0,0 +1,5 @@ +## 公共包 + +- 指标字段格式类型 +> 用 type 表示,0-number数字, 1-string字符串 +- \ No newline at end of file diff --git a/common/pom.xml b/common/pom.xml index 9a1b193..65949be 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -29,7 +29,7 @@ io.etcd jetcd-core - 0.5.10 + 0.5.11 provided @@ -47,6 +47,12 @@ com.google.protobuf protobuf-java-util 3.19.1 + + + com.google.guava + guava + + \ No newline at end of file diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java index 073d77c..22061a3 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java @@ -1,6 +1,5 @@ package com.usthe.warehouse.store; -import com.google.protobuf.ProtocolStringList; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApi; @@ -8,6 +7,7 @@ import com.influxdb.client.WriteOptions; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.CommonConstants; import com.usthe.warehouse.MetricsDataQueue; import com.usthe.warehouse.WarehouseProperties; import com.usthe.warehouse.WarehouseWorkerPool; @@ -18,6 +18,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; import java.time.Instant; +import java.util.List; /** * influxdb存储采集数据 @@ -81,17 +82,40 @@ public class InfluxdbDataStorage implements DisposableBean { public void saveData(CollectRep.MetricsData metricsData) { - String measurement = metricsData.getApp() + "--" + metricsData.getMetrics(); + String measurement = metricsData.getApp() + "_" + metricsData.getMetrics(); String monitorId = String.valueOf(metricsData.getId()); Instant collectTime = Instant.ofEpochMilli(metricsData.getTime()); - ProtocolStringList fields = metricsData.getFieldsList(); + + List fields = metricsData.getFieldsList(); for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { Point point = Point.measurement(measurement) .addTag("id", monitorId) .addTag("instance", valueRow.getInstance()) .time(collectTime, WritePrecision.MS); for (int index = 0; index < fields.size(); index++) { - point.addField(fields.get(index), valueRow.getColumns(index)); + CollectRep.Field field = fields.get(index); + String value = valueRow.getColumns(index); + if (field.getType() == CommonConstants.TYPE_NUMBER) { + // number data + if (CommonConstants.NULL_VALUE.equals(value)) { + point.addField(field.getName(), (Number) null); + } else { + try { + double number = Double.parseDouble(value); + point.addField(field.getName(), number); + } catch (Exception e) { + log.warn(e.getMessage()); + point.addField(field.getName(), (Number) null); + } + } + } else { + // string + if (CommonConstants.NULL_VALUE.equals(value)) { + point.addField(field.getName(), (String) null); + } else { + point.addField(field.getName(), value); + } + } } writeApi.writePoint(point); }