Przeglądaj źródła

[collector,manager]支持Linux操作系统监控类型

tomsun28 3 lat temu
rodzic
commit
25d692cf37

+ 6 - 0
collector/pom.xml

@@ -97,6 +97,12 @@
             <artifactId>postgresql</artifactId>
             <version>42.3.3</version>
         </dependency>
+        <!-- linux ssh -->
+        <dependency>
+            <groupId>org.apache.sshd</groupId>
+            <artifactId>sshd-core</artifactId>
+            <version>2.8.0</version>
+        </dependency>
     </dependencies>
 
 </project>

+ 25 - 0
collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java

@@ -0,0 +1,25 @@
+package com.usthe.collector.collect.common.ssh;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+
+/**
+ * ssh公共client
+ * @author tom
+ * @date 2022/3/11 15:58
+ */
+@Slf4j
+public class CommonSshClient {
+
+    private static SshClient sshClient;
+
+
+    static {
+        sshClient = SshClient.setUpDefaultClient();
+        sshClient.start();
+    }
+
+    public static SshClient getSshClient() {
+        return sshClient;
+    }
+}

+ 194 - 0
collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java

@@ -0,0 +1,194 @@
+package com.usthe.collector.collect.ssh;
+
+import com.usthe.collector.collect.AbstractCollect;
+import com.usthe.collector.collect.common.cache.CacheIdentifier;
+import com.usthe.collector.collect.common.cache.CommonCache;
+import com.usthe.collector.collect.common.ssh.CommonSshClient;
+import com.usthe.collector.util.CollectorConstants;
+import com.usthe.common.entity.job.Metrics;
+import com.usthe.common.entity.job.protocol.SshProtocol;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ssh协议采集实现
+ * @author tom
+ * @date 2022/03/11 15:10
+ */
+@Slf4j
+public class SshCollectImpl extends AbstractCollect {
+
+    private static final String PARSE_TYPE_ONE_ROW = "oneRow";
+    private static final String PARSE_TYPE_MULTI_ROW = "multiRow";
+
+    private SshCollectImpl(){}
+
+    public static SshCollectImpl getInstance() {
+        return SshCollectImpl.Singleton.INSTANCE;
+    }
+
+
+    @Override
+    public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+        long startTime = System.currentTimeMillis();
+        // 校验参数
+        try {
+            validateParams(metrics);
+        } catch (Exception e) {
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg(e.getMessage());
+            return;
+        }
+        SshProtocol sshProtocol = metrics.getSsh();
+        // 超时时间默认300毫秒
+        int timeout = 300;
+        try {
+            timeout = Integer.parseInt(sshProtocol.getTimeout());
+        } catch (Exception e) {
+            log.warn(e.getMessage());
+        }
+        try {
+            ClientSession clientSession = getConnectSession(sshProtocol);
+            ClientChannel channel = clientSession.createExecChannel(sshProtocol.getScript());
+            ByteArrayOutputStream response = new ByteArrayOutputStream();
+            channel.setOut(response);
+            if (!channel.open().verify(Integer.parseInt(sshProtocol.getTimeout())).isOpened()) {
+                throw new Exception("open failed");
+            }
+            List<ClientChannelEvent> list = new ArrayList<>();
+            list.add(ClientChannelEvent.CLOSED);
+            channel.waitFor(list, Integer.parseInt(sshProtocol.getTimeout()));
+            Long responseTime  = System.currentTimeMillis() - startTime;
+            channel.close();
+            String result = response.toString();
+            if (!StringUtils.hasText(result)) {
+                builder.setCode(CollectRep.Code.FAIL);
+                builder.setMsg("采集数据失败");
+            }
+            switch (sshProtocol.getParseType()) {
+                case PARSE_TYPE_ONE_ROW:
+                    parseResponseDataByOne(result, metrics.getAliasFields(), builder, responseTime);
+                    break;
+                default: parseResponseDataByMulti(result, metrics.getAliasFields(), builder, responseTime);
+                break;
+            }
+        } catch (ConnectException connectException) {
+            log.debug(connectException.getMessage());
+            builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+            builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙");
+        } catch (IOException ioException) {
+            log.debug(ioException.getMessage());
+            builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+            builder.setMsg("对端连接失败 " + ioException.getMessage());
+        } catch (Exception exception) {
+            log.debug(exception.getMessage());
+            builder.setCode(CollectRep.Code.FAIL);
+            builder.setMsg(exception.getMessage());
+        }
+    }
+
+    private void parseResponseDataByOne(String result, List<String> aliasFields, CollectRep.MetricsData.Builder builder, Long responseTime) {
+        String[] lines = result.split("\n");
+        if (lines.length + 1 < aliasFields.size()) {
+            log.error("ssh response data not enough: {}",  result);
+        }
+        CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+        for (String value : lines) {
+            valueRowBuilder.addColumns(value);
+        }
+        builder.addValues(valueRowBuilder.build());
+    }
+
+    private void parseResponseDataByMulti(String result, List<String> aliasFields,
+                                          CollectRep.MetricsData.Builder builder, Long responseTime) {
+        String[] lines = result.split("\n");
+        if (lines.length <= 1) {
+            log.error("ssh response data only has header: {}",  result);
+        }
+        String[] fields = lines[0].split(" ");
+        Map<String, Integer> fieldMapping = new HashMap<>(fields.length);
+        for (int i = 0; i < fields.length; i++) {
+            fieldMapping.put(fields[i].trim().toLowerCase(), i);
+        }
+        for (int i = 1; i < lines.length; i++) {
+            String[] values = lines[i].split(" ");
+            CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+            for (String alias : aliasFields) {
+                if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
+                    valueRowBuilder.addColumns(responseTime.toString());
+                } else {
+                    Integer index = fieldMapping.get(alias.toLowerCase());
+                    if (index != null && index < values.length) {
+                        valueRowBuilder.addColumns(values[index]);
+                    } else {
+                        valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+                    }
+                }
+            }
+            builder.addValues(valueRowBuilder.build());
+        }
+    }
+
+    private ClientSession getConnectSession(SshProtocol sshProtocol) throws IOException {
+        CacheIdentifier identifier = CacheIdentifier.builder()
+                .ip(sshProtocol.getHost()).port(sshProtocol.getPort())
+                .username(sshProtocol.getUsername()).password(sshProtocol.getPassword())
+                .build();
+        Optional<Object> cacheOption = CommonCache.getInstance().getCache(identifier, true);
+        ClientSession clientSession = null;
+        if (cacheOption.isPresent()) {
+            clientSession = (ClientSession) cacheOption.get();
+            try {
+                if (clientSession.isClosed() || clientSession.isClosing()) {
+                    clientSession = null;
+                    CommonCache.getInstance().removeCache(identifier);
+                }
+            } catch (Exception e) {
+                log.warn(e.getMessage());
+                clientSession = null;
+                CommonCache.getInstance().removeCache(identifier);
+            }
+        }
+        if (clientSession != null) {
+            return clientSession;
+        }
+        SshClient sshClient = CommonSshClient.getSshClient();
+        clientSession = sshClient.connect(sshProtocol.getUsername(), sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
+                .verify(Long.parseLong(sshProtocol.getTimeout()), TimeUnit.MILLISECONDS).getSession();
+        if (StringUtils.hasText(sshProtocol.getPassword())) {
+            clientSession.addPasswordIdentity(sshProtocol.getPassword());
+        }
+        // 进行认证
+        if (!clientSession.auth().verify(Long.parseLong(sshProtocol.getTimeout()), TimeUnit.MILLISECONDS).isSuccess()) {
+            throw new IllegalArgumentException("认证失败");
+        }
+        CommonCache.getInstance().addCache(identifier, clientSession, 10000L);
+        return clientSession;
+    }
+
+    private void validateParams(Metrics metrics) throws Exception {
+        if (metrics == null || metrics.getSsh() == null) {
+            throw new Exception("Ssh collect must has ssh params");
+        }
+    }
+
+    private static class Singleton {
+        private static final SshCollectImpl INSTANCE = new SshCollectImpl();
+    }
+}

+ 1 - 1
collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java

@@ -13,7 +13,7 @@ import java.io.IOException;
 import java.net.ConnectException;
 
 /**
- * icmp协议采集实现 - ping
+ * telnet协议采集实现
  * @author tom
  * @date 2021/12/4 12:32
  */

+ 4 - 0
collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java

@@ -24,6 +24,10 @@ public interface DispatchConstants {
      * 协议 jdbc
      */
     String PROTOCOL_JDBC = "jdbc";
+    /**
+     * 协议 ssh
+     */
+    String PROTOCOL_SSH = "ssh";
     // 协议类型相关 - end //
 
     // http协议相关 - start 需尽可能先复用 HttpHeaders //

+ 4 - 0
collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java

@@ -6,6 +6,7 @@ import com.usthe.collector.collect.AbstractCollect;
 import com.usthe.collector.collect.database.JdbcCommonCollect;
 import com.usthe.collector.collect.http.HttpCollectImpl;
 import com.usthe.collector.collect.icmp.IcmpCollectImpl;
+import com.usthe.collector.collect.ssh.SshCollectImpl;
 import com.usthe.collector.collect.telnet.TelnetCollectImpl;
 import com.usthe.collector.dispatch.timer.Timeout;
 import com.usthe.collector.dispatch.timer.WheelTimerTask;
@@ -111,6 +112,9 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
             case DispatchConstants.PROTOCOL_JDBC:
                 abstractCollect = JdbcCommonCollect.getInstance();
                 break;
+            case DispatchConstants.PROTOCOL_SSH:
+                abstractCollect = SshCollectImpl.getInstance();
+                break;
                 // todo
             default: break;
         }

+ 5 - 0
common/src/main/java/com/usthe/common/entity/job/Metrics.java

@@ -3,6 +3,7 @@ package com.usthe.common.entity.job;
 import com.usthe.common.entity.job.protocol.HttpProtocol;
 import com.usthe.common.entity.job.protocol.IcmpProtocol;
 import com.usthe.common.entity.job.protocol.JdbcProtocol;
+import com.usthe.common.entity.job.protocol.SshProtocol;
 import com.usthe.common.entity.job.protocol.TcpUdpProtocol;
 import com.usthe.common.entity.job.protocol.TelnetProtocol;
 import lombok.AllArgsConstructor;
@@ -73,6 +74,10 @@ public class Metrics {
      * 使用公共的jdbc规范实现的数据库配置信息
      */
     private JdbcProtocol jdbc;
+    /**
+     * 使用公共的ssh协议的监控配置信息
+     */
+    private SshProtocol ssh;
 
     @Override
     public boolean equals(Object o) {

+ 58 - 0
common/src/main/java/com/usthe/common/entity/job/protocol/SshProtocol.java

@@ -0,0 +1,58 @@
+package com.usthe.common.entity.job.protocol;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * ssh 协议参数配置
+ * @author tom
+ * @date 2022/3/11 15:20
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SshProtocol {
+
+    /**
+     * 对端主机ip或域名
+     */
+    private String host;
+
+    /**
+     * 对端主机端口
+     */
+    private String port;
+
+    /**
+     * 超时时间
+     */
+    private String timeout = "3000";
+
+    /**
+     * 用户名
+     */
+    private String username;
+
+    /**
+     * 密码(可选)
+     */
+    private String password;
+
+    /**
+     * 公钥(可选)
+     */
+    private String publicKey;
+
+    /**
+     * SSH执行脚本
+     */
+    private String script;
+
+    /**
+     * 响应数据解析方式:oneRow, multiRow
+     */
+    private String parseType;
+}

+ 76 - 0
manager/src/main/resources/define/app/linux.yml

@@ -0,0 +1,76 @@
+# 此监控类型所属类别:service-应用服务监控 db-数据库监控 custom-自定义监控 os-操作系统监控
+category: os
+# 监控应用类型(与文件名保持一致) eg: linux windows tomcat mysql aws...
+app: linux
+name:
+  zh-CN: Linux操作系统
+  en-US: OS Linux
+# 参数映射map. type是参数类型: 0-number数字, 1-string明文字符串, 2-secret加密字符串
+# 强制固定必须参数 - host
+configmap:
+  - key: host
+    type: 1
+  - key: port
+    type: 0
+  - key: username
+    type: 1
+  - key: password
+    type: 2
+# 指标组列表
+metrics:
+# 第一个监控指标组 basic
+# 注意:内置监控指标有 (responseTime - 响应时间)
+  - name: basic
+    # 指标组调度优先级(0-127)越小优先级越高,优先级低的指标组会等优先级高的指标组采集完成后才会被调度,相同优先级的指标组会并行调度采集
+    # 优先级为0的指标组为可用性指标组,即它会被首先调度,采集成功才会继续调度其它指标组,采集失败则中断调度
+    priority: 0
+    # 指标组中的具体监控指标
+    fields:
+      # 指标信息 包括 field名称   type字段类型:0-number数字,1-string字符串   instance是否为实例主键   unit:指标单位
+      - field: hostname
+        type: 1
+        instance: true
+      - field: version
+        type: 1
+      - field: uptime
+        type: 1
+# 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk
+    protocol: ssh
+# 当protocol为http协议时具体的采集配置
+    ssh:
+      # 主机host: ipv4 ipv6 域名
+      host: ^_^host^_^
+      # 端口
+      port: ^_^port^_^
+      username: ^_^username^_^
+      password: ^_^password^_^
+      script: (uname -r ; hostname ; uptime | awk -F "," '{print $1}' | sed  "s/ //g") | sed ":a;N;s/\n/^/g;ta" | awk -F '^' 'BEGIN{print "version hostname uptime"} {print $1, $2, $3}'
+      # 响应数据解析方式:oneRow, multiRow
+      parseType: multiRow
+
+  - name: cpu
+    priority: 1
+    fields:
+      # 指标信息 包括 field名称   type字段类型:0-number数字,1-string字符串   instance是否为实例主键   unit:指标单位
+      - field: info
+        type: 1
+      - field: cores
+        type: 0
+      - field: interrupt
+        type: 0
+      - field: load
+        type: 1
+      - field: context_switch
+        type: 0
+    # 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk
+    protocol: ssh
+    # 当protocol为http协议时具体的采集配置
+    ssh:
+      # 主机host: ipv4 ipv6 域名
+      host: ^_^host^_^
+      # 端口
+      port: ^_^port^_^
+      username: ^_^username^_^
+      password: ^_^password^_^
+      script: "LANG=C lscpu | awk -F: '/Model name/ {print $2}';awk '/processor/{core++} END{print core}' /proc/cpuinfo;uptime | sed 's/,/ /g' | awk '{for(i=NF-2;i<=NF;i++)print $i }' | xargs;vmstat 1 1 | awk 'NR==3{print $11}';vmstat 1 1 | awk 'NR==3{print $12}'"
+      parseType: oneRow

+ 22 - 0
manager/src/main/resources/define/param/linux.yml

@@ -0,0 +1,22 @@
+app: linux
+param:
+  - field: host
+    name: 主机Host
+    type: host
+    required: true
+  - field: port
+    name: 端口
+    type: number
+    range: '[0,65535]'
+    required: true
+    defaultValue: 22
+    placeholder: '请输入端口'
+  - field: username
+    name: 用户名
+    type: text
+    limit: 20
+    required: true
+  - field: password
+    name: 密码
+    type: password
+    required: true