Explorar o código

[collector] jsonPath解析http响应数据和calculates表达式计算实现

tomsun28 %!s(int64=4) %!d(string=hai) anos
pai
achega
8214eba482

+ 6 - 0
collector/server/pom.xml

@@ -52,6 +52,12 @@
             <artifactId>httpclient</artifactId>
             <version>4.5.13</version>
         </dependency>
+        <!--json path parser-->
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>2.6.0</version>
+        </dependency>
         <!-- lru hashmap -->
         <dependency>
             <groupId>com.googlecode.concurrentlinkedhashmap</groupId>

+ 72 - 12
collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java

@@ -1,11 +1,21 @@
 package com.usthe.collector.collect.http;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.spi.cache.CacheProvider;
+import com.jayway.jsonpath.spi.cache.LRUCache;
 import com.usthe.collector.collect.AbstractCollect;
 import com.usthe.collector.common.http.HttpClientPool;
 import com.usthe.collector.dispatch.DispatchConstants;
+import com.usthe.collector.util.JsonPathParser;
 import com.usthe.common.entity.job.Metrics;
 import com.usthe.common.entity.job.protocol.HttpProtocol;
 import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
 import com.usthe.common.util.IpDomainUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpHeaders;
@@ -82,20 +92,20 @@ public class HttpCollectImpl extends AbstractCollect {
                 String parseType = metrics.getHttp().getParseType();
                 try {
                     if (DispatchConstants.PARSE_DEFAULT.equals(parseType)) {
-                        parseResponseByDefault(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
-                    } else if (DispatchConstants.PARSE_PROMETHEUS.equals(parseType)) {
-                        parseResponseByPrometheus(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
+                        parseResponseByDefault(resp, metrics.getAliasFields(), builder);
                     } else if (DispatchConstants.PARSE_JSON_PATH.equals(parseType)) {
                         parseResponseByJsonPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
+                    } else if (DispatchConstants.PARSE_PROMETHEUS.equals(parseType)) {
+                        parseResponseByPrometheus(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
                     } else if (DispatchConstants.PARSE_XML_PATH.equals(parseType)) {
                         parseResponseByXmlPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
                     } else {
-                        parseResponseByDefault(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
+                        parseResponseByDefault(resp, metrics.getAliasFields(), builder);
                     }
                 } catch (Exception e) {
                     log.info("parse error: {}.", e.getMessage(), e);
                     builder.setCode(CollectRep.Code.FAIL);
-                    builder.setMsg("parse response data error.");
+                    builder.setMsg("parse response data error:" + e.getMessage());
                     return;
                 }
             }
@@ -135,21 +145,71 @@ public class HttpCollectImpl extends AbstractCollect {
         }
     }
 
-    private void parseResponseByXmlPath(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
+    private void parseResponseByXmlPath(String resp, List<String> aliasFields, HttpProtocol http,
+                                        CollectRep.MetricsData.Builder builder) {
 
     }
 
-    private void parseResponseByJsonPath(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
-
+    private void parseResponseByJsonPath(String resp, List<String> aliasFields, HttpProtocol http,
+                                         CollectRep.MetricsData.Builder builder) {
+        List<Map<String, Object>> results = JsonPathParser.parseContentWithJsonPath(resp,http. getParseScript());
+        for (Map<String, Object> stringMap : results) {
+            CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+            for (String alias : aliasFields) {
+                Object value = stringMap.get(alias);
+                if (value != null) {
+                    valueRowBuilder.addColumns(String.valueOf(value));
+                } else {
+                    valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+                }
+            }
+            builder.addValues(valueRowBuilder.build());
+        }
     }
 
-    private void parseResponseByPrometheus(String resp, List<String> preFields, HttpProtocol http, CollectRep.MetricsData.Builder builder) {
+    private void parseResponseByPrometheus(String resp, List<String> aliasFields, HttpProtocol http,
+                                           CollectRep.MetricsData.Builder builder) {
 
     }
 
-    private void parseResponseByDefault(String resp, List<String> preFields, HttpProtocol http,
-                                        CollectRep.MetricsData.Builder builder) {
-
+    private void parseResponseByDefault(String resp, List<String> aliasFields, CollectRep.MetricsData.Builder builder) {
+        Gson gson = new Gson();
+        JsonElement element = gson.toJsonTree(resp);
+        if (element.isJsonArray()) {
+            JsonArray array = element.getAsJsonArray();
+            for (JsonElement jsonElement : array) {
+                if (jsonElement.isJsonObject()) {
+                    JsonObject object = jsonElement.getAsJsonObject();
+                    CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+                    for (String alias : aliasFields) {
+                        JsonElement valueElement = object.get(alias);
+                        if (valueElement != null) {
+                            String value = valueElement.getAsString();
+                            valueRowBuilder.addColumns(value);
+                        } else {
+                            valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+                        }
+                    }
+                    builder.addValues(valueRowBuilder.build());
+                }
+            }
+        } else if (element.isJsonObject()) {
+            JsonObject object = element.getAsJsonObject();
+            CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+            StringBuilder instance = new StringBuilder();
+            for (String alias : aliasFields) {
+                JsonElement valueElement = object.get(alias);
+                if (valueElement != null) {
+                    String value = valueElement.getAsString();
+                    valueRowBuilder.addColumns(value);
+                    instance.append(value);
+                } else {
+                    valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+                }
+            }
+            valueRowBuilder.setInstance(instance.toString());
+            builder.addValues(valueRowBuilder.build());
+        }
     }
 
     /**

+ 3 - 1
collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConfiguration.java

@@ -1,6 +1,7 @@
 package com.usthe.collector.dispatch;
 
 import com.googlecode.aviator.AviatorEvaluator;
+import com.googlecode.aviator.Options;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -16,6 +17,7 @@ public class DispatchConfiguration {
     @Bean
     public void configAviatorEvaluator() {
         // 配置AviatorEvaluator使用LRU缓存编译后的表达式
-        AviatorEvaluator.getInstance().useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE);
+        AviatorEvaluator.getInstance()
+                .useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE);
     }
 }

+ 2 - 2
collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java

@@ -51,11 +51,11 @@ public interface DispatchConstants {
     /**
      * 解析方式 自定义json path
      */
-    String PARSE_JSON_PATH = "json_path";
+    String PARSE_JSON_PATH = "jsonPath";
     /**
      * 解析方式 自定义xml path
      */
-    String PARSE_XML_PATH = "xml_path";
+    String PARSE_XML_PATH = "xmlPath";
     /**
      * 解析方式 prometheus规则
      */

+ 51 - 9
collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java

@@ -10,6 +10,8 @@ import com.usthe.collector.dispatch.timer.WheelTimerTask;
 import com.usthe.common.entity.job.Job;
 import com.usthe.common.entity.job.Metrics;
 import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
+import com.usthe.common.util.CommonUtil;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -139,6 +141,7 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
     private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) {
         collectData.addAllFields(metrics.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toSet()));
         // 若不存在需要计算的表达式,则 别名指标aliasFields 的数据就是真正指标 fields的数据
+        // 即直接使用 valueList 即可
         if (metrics.getCalculates() == null || metrics.getCalculates().isEmpty()) {
             return;
         }
@@ -146,6 +149,7 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
         if (aliasRowList == null || aliasRowList.isEmpty()) {
             return;
         }
+        collectData.clearValues();
         // 先预处理 calculates
         Map<String, Expression> fieldExpressionMap = metrics.getCalculates()
                 .stream()
@@ -159,27 +163,65 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
 
         List<Metrics.Field> fields = metrics.getFields();
         List<String> aliasFields = metrics.getAliasFields();
-        Map<String, Object> aliasFieldValueMap = new HashMap<>(16);
+        Map<String, String> aliasFieldValueMap = new HashMap<>(16);
+        Map<String, Object> fieldValueMap = new HashMap<>(16);
+        CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder();
         for (int index = 0; index < aliasRowList.size(); index++) {
             CollectRep.ValueRow aliasRow = aliasRowList.get(index);
             for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) {
-                aliasFieldValueMap.put(aliasFields.get(aliasIndex), aliasRow.getColumns(aliasIndex));
+                String aliasFieldValue = aliasRow.getColumns(aliasIndex);
+                if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) {
+                    aliasFieldValueMap.put(aliasFields.get(aliasIndex), aliasFieldValue);
+                }
             }
-            ProtocolStringList columnList = aliasRow.getColumnsList();
-            columnList.clear();
+            StringBuilder instanceBuilder = new StringBuilder();
             for (int realIndex = 0; realIndex < fields.size(); realIndex++) {
-                String realField = fields.get(realIndex).getField();
+                Metrics.Field field = fields.get(realIndex);
+                String realField = field.getField();
                 Expression expression = fieldExpressionMap.get(realField);
-                String value = "";
+                String value = null;
                 if (expression != null) {
                     // 存在计算表达式 则计算值
-                    value = (String) expression.execute(aliasFieldValueMap);
+                    if ("number".equals(field.getType())) {
+                        for (String variable : expression.getVariableNames()) {
+                            Double doubleValue = CommonUtil.parseDoubleStr(aliasFieldValueMap.get(variable));
+                            if (doubleValue != null) {
+                                fieldValueMap.put(variable, doubleValue);
+                            }
+                        }
+                    } else {
+                        for (String variable : expression.getVariableNames()) {
+                            String strValue = aliasFieldValueMap.get(variable);
+                            if (strValue != null && !"".equals(strValue)) {
+                                fieldValueMap.put(variable, strValue);
+                            }
+                        }
+                    }
+                    try {
+                        Object objValue = expression.execute(fieldValueMap);
+                        if (objValue != null) {
+                            value = String.valueOf(objValue);
+                        }
+                    } catch (Exception e) {
+                        log.warn(e.getMessage());
+                    }
                 } else {
                     // 不存在 则映射别名值
-                    value = (String) aliasFieldValueMap.get(realField);
+                    value = aliasFieldValueMap.get(realField);
+                }
+                if (value == null) {
+                    value = CommonConstants.NULL_VALUE;
+                }
+                realValueRowBuilder.addColumns(value);
+                fieldValueMap.clear();
+                if (field.isInstance()) {
+                    instanceBuilder.append(value);
                 }
-                columnList.add(value);
             }
+            aliasFieldValueMap.clear();
+            // 设置实例instance
+            realValueRowBuilder.setInstance(instanceBuilder.toString());
+            collectData.addValues(realValueRowBuilder.build());
         }
     }
 

+ 44 - 0
collector/server/src/main/java/com/usthe/collector/util/JsonPathParser.java

@@ -0,0 +1,44 @@
+package com.usthe.collector.util;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ParseContext;
+import com.jayway.jsonpath.spi.cache.CacheProvider;
+import com.jayway.jsonpath.spi.cache.LRUCache;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * json path parser
+ * @author tomsun28
+ * @date 2021/11/20 10:16
+ */
+public class JsonPathParser {
+
+    private static final ParseContext PARSER;
+
+    static {
+        Configuration conf = Configuration.defaultConfiguration()
+                .addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL)
+                .addOptions(Option.ALWAYS_RETURN_LIST);
+        CacheProvider.setCache(new LRUCache(128));
+        PARSER = JsonPath.using(conf);
+    }
+
+    /**
+     * 使用jsonPath来解析json内容
+     * @param content json内容
+     * @param jsonPath jsonPath脚本
+     * @return 解析后的内容
+     */
+    public static List<Map<String, Object>> parseContentWithJsonPath(String content, String jsonPath) {
+        if (content == null || jsonPath == null || "".equals(content) || "".equals(jsonPath)) {
+            return Collections.emptyList();
+        }
+        return PARSER.parse(content).read(jsonPath);
+    }
+
+}

+ 4 - 0
common/src/main/java/com/usthe/common/entity/job/Metrics.java

@@ -99,6 +99,10 @@ public class Metrics {
          */
         private String type;
         /**
+         * 此字段是否为实例主键
+         */
+        private boolean instance = false;
+        /**
          * 指标单位
          */
         private String unit;

+ 5 - 0
common/src/main/java/com/usthe/common/util/CommonConstants.java

@@ -57,4 +57,9 @@ public interface CommonConstants {
      */
     byte SUSPENDING = 0x04;
 
+
+    /**
+     * null空值占位符
+     */
+    String NULL_VALUE = "&nbsp;";
 }

+ 30 - 0
common/src/main/java/com/usthe/common/util/CommonUtil.java

@@ -0,0 +1,30 @@
+package com.usthe.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 公共工具类
+ * @author tomsun28
+ * @date 2021/11/20 17:29
+ */
+@Slf4j
+public class CommonUtil {
+
+    /**
+     * 将字符串str转换为double数字类型
+     * @param str string
+     * @return double 数字
+     */
+    public static Double parseDoubleStr(String str) {
+        if (str == null || "".equals(str)) {
+            return null;
+        }
+        try {
+            return Double.parseDouble(str);
+        } catch (Exception e) {
+            log.debug(e.getMessage(), e);
+            return null;
+        }
+    }
+
+}

+ 3 - 0
common/src/main/java/com/usthe/common/util/IpDomainUtil.java

@@ -45,6 +45,9 @@ public class IpDomainUtil {
      * @return 存在true
      */
     public static boolean isHasSchema(String domainIp) {
+        if (domainIp == null || "".equals(domainIp)) {
+            return false;
+        }
         return DOMAIN_SCHEMA.matcher(domainIp).matches();
     }
 

+ 14 - 8
manager/src/main/resources/define/app/A-example.yml

@@ -20,17 +20,21 @@ metrics:
     priority: 0
     # 指标组中的具体监控指标
     fields:
-      # 指标信息 包括 field名称, type字段类型:number数字,string字符串, unit:指标单位
+      # 指标信息 包括 field名称, type字段类型:number数字,string字符串,instance是否为实例主键   unit:指标单位
+      - field: hostname
+        type: instance
+        instance: true
       - field: usage
         type: number
         unit: '%'
       - field: cores
         type: number
-      - field: waitime
+      - field: waitTime
         type: number
         unit: s
 # (非必须)监控指标别名,与上面的指标名映射。用于采集接口数据字段不直接是最终指标名称,需要此别名做映射转换
     aliasFields:
+      - hostname
       - core1
       - core2
       - usage
@@ -39,6 +43,7 @@ metrics:
 # (非必须)指标计算表达式,与上面的别名一起作用,计算出最终需要的指标值
 # eg: cores=core1+core2, usage=usage, waitTime=allTime-runningTime
     calculates:
+      - hostname=hostname
       - cores=core1+core2
       - usage=usage
       - waitTime=allTime-runningTime
@@ -51,7 +56,7 @@ metrics:
       # 端口
       port: ^_^port^_^
       # url请求接口路径
-      url: /cpu
+      url: /metrics/cpu
       # 请求方式 GET POST PUT DELETE PATCH
       method: GET
       # 是否启用ssl/tls,即是http还是https,默认false
@@ -69,13 +74,15 @@ metrics:
         type: Basic Auth
         basicAuthUsername: ^_^username^_^
         basicAuthPassword: ^_^password^_^
-      # 响应数据解析方式: default-系统规则,json_path-jsonPath脚本,xml_path-xmlPath脚本,prometheus-Prometheus数据规则
+      # 响应数据解析方式: default-系统规则,jsonPath-jsonPath脚本,xmlPath-xmlPath脚本,prometheus-Prometheus数据规则
       parseType: jsonPath
-      parseScript: '$.cpu[:1].*'
+      parseScript: '$'
 
   - name: memory
     priority: 1
     fields:
+      - field: hostname
+        type: string
       - field: total
         type: number
         unit: kb
@@ -88,7 +95,7 @@ metrics:
     http:
       host: ^_^host^_^
       port: ^_^port^_^
-      url: /memory
+      url: /metrics/memory
       method: GET
       headers:
         apiVersion: v1
@@ -99,5 +106,4 @@ metrics:
         type: Basic Auth
         basicAuthUsername: ^_^username^_^
         basicAuthPassword: ^_^password^_^
-      parseType: jsonPath
-      parseScript: '$.memory[:1].*'
+      parseType: default

+ 1 - 1
manager/src/main/resources/define/param/A-example.yml

@@ -14,7 +14,7 @@ param:
     name: 端口
     type: number
     # 当type为number时,用range表示范围
-    range: '[0,255]'
+    range: '[0,65535]'
     required: true
   - field: username
     name: 用户名