diff --git a/collector/pom.xml b/collector/pom.xml
index 7ffd674..70deccc 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -97,6 +97,12 @@
postgresql
42.3.3
+
+
+ org.apache.sshd
+ sshd-core
+ 2.8.0
+
\ No newline at end of file
diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java
index 078ec3d..712c01a 100644
--- a/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java
+++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/CommonCache.java
@@ -21,9 +21,9 @@ import java.util.concurrent.TimeUnit;
public class CommonCache {
/**
- * 默认缓存时间 30minute
+ * 默认缓存时间 10minute
*/
- private static final long DEFAULT_CACHE_TIMEOUT = 30 * 60 * 1000L;
+ private static final long DEFAULT_CACHE_TIMEOUT = 10 * 60 * 1000L;
/**
* 默认最大缓存数量
@@ -155,6 +155,15 @@ public class CommonCache {
});
}
+ /**
+ * 新增或更新cache
+ * @param key 存储对象key
+ * @param value 存储对象
+ */
+ public void addCache(Object key, Object value) {
+ addCache(key, value, DEFAULT_CACHE_TIMEOUT);
+ }
+
/**
* 根据缓存key获取缓存对象
* @param key key
diff --git a/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java
new file mode 100644
index 0000000..509bbbc
--- /dev/null
+++ b/collector/src/main/java/com/usthe/collector/collect/common/ssh/CommonSshClient.java
@@ -0,0 +1,25 @@
+package com.usthe.collector.collect.common.ssh;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+
+/**
+ * ssh公共client
+ * @author tom
+ * @date 2022/3/11 15:58
+ */
+@Slf4j
+public class CommonSshClient {
+
+ private static SshClient sshClient;
+
+
+ static {
+ sshClient = SshClient.setUpDefaultClient();
+ sshClient.start();
+ }
+
+ public static SshClient getSshClient() {
+ return sshClient;
+ }
+}
diff --git a/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java
new file mode 100644
index 0000000..44c43fd
--- /dev/null
+++ b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java
@@ -0,0 +1,202 @@
+package com.usthe.collector.collect.ssh;
+
+import com.usthe.collector.collect.AbstractCollect;
+import com.usthe.collector.collect.common.cache.CacheIdentifier;
+import com.usthe.collector.collect.common.cache.CommonCache;
+import com.usthe.collector.collect.common.ssh.CommonSshClient;
+import com.usthe.collector.util.CollectorConstants;
+import com.usthe.common.entity.job.Metrics;
+import com.usthe.common.entity.job.protocol.SshProtocol;
+import com.usthe.common.entity.message.CollectRep;
+import com.usthe.common.util.CommonConstants;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.channel.ClientChannelEvent;
+import org.apache.sshd.client.session.ClientSession;
+import org.springframework.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ssh协议采集实现
+ * @author tom
+ * @date 2022/03/11 15:10
+ */
+@Slf4j
+public class SshCollectImpl extends AbstractCollect {
+
+ private static final String PARSE_TYPE_ONE_ROW = "oneRow";
+ private static final String PARSE_TYPE_MULTI_ROW = "multiRow";
+
+ private SshCollectImpl(){}
+
+ public static SshCollectImpl getInstance() {
+ return SshCollectImpl.Singleton.INSTANCE;
+ }
+
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
+ long startTime = System.currentTimeMillis();
+ // 校验参数
+ try {
+ validateParams(metrics);
+ } catch (Exception e) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg(e.getMessage());
+ return;
+ }
+ SshProtocol sshProtocol = metrics.getSsh();
+ // 超时时间默认300毫秒
+ int timeout = 3000;
+ try {
+ timeout = Integer.parseInt(sshProtocol.getTimeout());
+ } catch (Exception e) {
+ log.warn(e.getMessage());
+ }
+ try {
+ ClientSession clientSession = getConnectSession(sshProtocol, timeout);
+ ClientChannel channel = clientSession.createExecChannel(sshProtocol.getScript());
+ ByteArrayOutputStream response = new ByteArrayOutputStream();
+ channel.setOut(response);
+ if (!channel.open().verify(timeout).isOpened()) {
+ throw new Exception("open failed");
+ }
+ List list = new ArrayList<>();
+ list.add(ClientChannelEvent.CLOSED);
+ channel.waitFor(list, timeout);
+ Long responseTime = System.currentTimeMillis() - startTime;
+ channel.close();
+ String result = response.toString();
+ if (!StringUtils.hasText(result)) {
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg("采集数据失败");
+ }
+ switch (sshProtocol.getParseType()) {
+ case PARSE_TYPE_ONE_ROW:
+ parseResponseDataByOne(result, metrics.getAliasFields(), builder, responseTime);
+ break;
+ default: parseResponseDataByMulti(result, metrics.getAliasFields(), builder, responseTime);
+ break;
+ }
+ } catch (ConnectException connectException) {
+ log.debug(connectException.getMessage());
+ builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+ builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙");
+ } catch (IOException ioException) {
+ log.debug(ioException.getMessage());
+ builder.setCode(CollectRep.Code.UN_CONNECTABLE);
+ builder.setMsg("对端连接失败 " + ioException.getMessage());
+ } catch (Exception exception) {
+ log.debug(exception.getMessage());
+ builder.setCode(CollectRep.Code.FAIL);
+ builder.setMsg(exception.getMessage());
+ }
+ }
+
+ private void parseResponseDataByOne(String result, List aliasFields, CollectRep.MetricsData.Builder builder, Long responseTime) {
+ String[] lines = result.split("\n");
+ if (lines.length + 1 < aliasFields.size()) {
+ log.error("ssh response data not enough: {}", result);
+ }
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ int aliasIndex = 0;
+ int lineIndex = 0;
+ while (aliasIndex < aliasFields.size()) {
+ if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(aliasFields.get(aliasIndex))) {
+ valueRowBuilder.addColumns(responseTime.toString());
+ } else {
+ valueRowBuilder.addColumns(lines[lineIndex].trim());
+ lineIndex++;
+ }
+ aliasIndex++;
+ }
+ builder.addValues(valueRowBuilder.build());
+ }
+
+ private void parseResponseDataByMulti(String result, List aliasFields,
+ CollectRep.MetricsData.Builder builder, Long responseTime) {
+ String[] lines = result.split("\n");
+ if (lines.length <= 1) {
+ log.error("ssh response data only has header: {}", result);
+ }
+ String[] fields = lines[0].split(" ");
+ Map fieldMapping = new HashMap<>(fields.length);
+ for (int i = 0; i < fields.length; i++) {
+ fieldMapping.put(fields[i].trim().toLowerCase(), i);
+ }
+ for (int i = 1; i < lines.length; i++) {
+ String[] values = lines[i].split(" ");
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String alias : aliasFields) {
+ if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
+ valueRowBuilder.addColumns(responseTime.toString());
+ } else {
+ Integer index = fieldMapping.get(alias.toLowerCase());
+ if (index != null && index < values.length) {
+ valueRowBuilder.addColumns(values[index]);
+ } else {
+ valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
+ }
+ }
+ }
+ builder.addValues(valueRowBuilder.build());
+ }
+ }
+
+ private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout) throws IOException {
+ CacheIdentifier identifier = CacheIdentifier.builder()
+ .ip(sshProtocol.getHost()).port(sshProtocol.getPort())
+ .username(sshProtocol.getUsername()).password(sshProtocol.getPassword())
+ .build();
+ Optional