迁移collector,合并到整个工程

This commit is contained in:
tomsun28
2021-11-10 20:07:27 +08:00
commit bcea680781
23 changed files with 4366 additions and 0 deletions

35
common/pom.xml Normal file
View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>common</artifactId>
<dependencies>
<!-- etcd -->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.5.10</version>
<scope>provided</scope>
</dependency>
<!-- 工具依赖 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,35 @@
package com.usthe.common.entity.job;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 监控配置参数属性及值
* 过程中需要将协议配置参数里面的标识符为^_^key^_^的内容替换为配置参数里的真实值
* @author tomsun28
* @date 2021/10/29 22:04
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Configmap {
/**
* 参数key,将协议配置参数里面的标识符为^^_key_^^的内容替换为配置参数里的真实值
*/
private String key;
/**
* 参数value
*/
private Object value;
/**
* number,string,secret
* 数字,非加密字符串,加密字符串
*/
private String type;
}

View File

@@ -0,0 +1,162 @@
package com.usthe.common.entity.job;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 采集任务详情
* @author tomsun28
* @date 2021/10/17 21:19
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Slf4j
public class Job {
private static final String AVAILABILITY = "availability";
/**
* 任务ID
*/
private long id;
/**
* 监控ID 应用ID
*/
private long appId;
/**
* 监控的类型 eg: linux | mysql | jvm
*/
private String app;
/**
* 任务派发开始时间戳
*/
private long timestamp;
/**
* 任务采集时间间隔(单位秒) eg: 30,60,600
*/
private long interval;
/**
* 是否是循环周期性任务 true为是,false为否
*/
private boolean isCyclic;
/**
* 指标组配置 eg: cpu memory
*/
private List<Metrics> metrics;
/**
* 监控配置参数属性及值 eg: username password timeout host
*/
private List<Configmap> configmap;
/**
* collector使用 - 任务版本,此字段不存储于etcd
*/
private transient long version;
/**
* collector使用 - 指标组任务执行优先级视图
* 0 - availability
* 1 - cpu | memory
* 2 - health
* 3 - otherMetrics
* ....
* 126 - otherMetrics
* 127 - lastPriorMetrics
*/
private transient List<Set<Metrics>> priorMetrics;
/**
* collector使用 - 构造初始化标志
*/
private transient boolean isConstruct = false;
/**
* collector使用 - 构造初始化指标组
*/
public synchronized void constructMetrics() {
if (isConstruct) {
return;
}
Map<Byte, List<Metrics>> map = metrics.stream()
.peek(metric -> {
// 判断是否配置aliasFields 没有则配置默认
if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
metric.setAliasFields(metric.getFields());
}
// 设置默认的指标组执行优先级
if (metric.getPriority() == null) {
if (AVAILABILITY.equals(metric.getName())) {
metric.setPriority((byte)0);
} else {
metric.setPriority(Byte.MAX_VALUE);
}
}
})
.collect(Collectors.groupingBy(Metrics::getPriority));
// 构造指标组任务执行顺序链表
priorMetrics = new LinkedList<>();
map.values().forEach(metric -> {
Set<Metrics> metricsSet = new HashSet<>(metric);
priorMetrics.add(metricsSet);
});
priorMetrics.sort(Comparator.comparing(e -> {
Optional<Metrics> metric = e.stream().findAny();
if (metric.isPresent()) {
return metric.get().getPriority();
} else {
return Byte.MAX_VALUE;
}
}));
}
/**
* collector使用 - 获取下一组优先级的指标组任务
* @param metrics 当前指标组
* @param first 是否是第一次获取
* @return 指标组任务
* 返回null表示job已完成,所有指标组采集结束
* 返回empty的集合表示当前级别下还有指标组采集任务未结束,无法进行下一级别的指标组任务采集
* 返回有数据集合表示:获取到下一组优先级的指标组任务
*/
public synchronized Set<Metrics> getNextCollectMetrics(Metrics metrics, boolean first) {
if (!isConstruct || priorMetrics == null || priorMetrics.isEmpty()) {
return null;
}
Set<Metrics> metricsSet = priorMetrics.get(0);
if (first) {
log.error("metrics must has one [availability] metrics at least.");
return metricsSet;
}
if (metrics == null) {
log.error("metrics can not null when not first get");
}
if (metrics != null && !metricsSet.remove(metrics)) {
log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.",
id, appId, app, metrics.getName());
}
if (metricsSet.isEmpty()) {
if (priorMetrics.size() == 1) {
return null;
}
priorMetrics.remove(0);
return priorMetrics.get(0);
} else {
return Collections.emptySet();
}
}
}

View File

@@ -0,0 +1,70 @@
package com.usthe.common.entity.job;
import com.usthe.common.entity.job.protocol.HttpProtocol;
import com.usthe.common.entity.job.protocol.IcmpProtocol;
import com.usthe.common.entity.job.protocol.JdbcProtocol;
import com.usthe.common.entity.job.protocol.TcpUdpProtocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 监控采集的指标集合详情 eg: cpu | memory | health
* @author tomsun28
* @date 2021/10/17 21:24
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Metrics {
/**
* 公共属性-名称 eg: cpu | memory | health
*/
private String name;
/**
* 公共属性-采集监控协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk
*/
private String protocol;
/**
* 范围(0-127)指标组调度优先级,数值越小优先级越高
* 优先级高的调度采集完成后才会调度下一优先级的指标组采集任务
* 可用性指标组(availability)默认优先级为0,其它普通指标组范围为1-127,即需要等availability采集成功后才会调度后面的指标组任务
*/
private Byte priority;
/**
* 公共属性-采集监控的最终结果属性集合 eg: speed | times | size
*/
private List<String> fields;
/**
* 公共属性-采集监控的前置查询属性集合 eg: size1 | size2 | speedSize
*/
private List<String> aliasFields;
/**
* 公共属性-表达式计算,将前置查询属性(preFields)与最终属性(fields)映射,计算出最终属性(fields)值
* eg: size = size1 + size2, speed = speedSize
* https://www.yuque.com/boyan-avfmj/aviatorscript/ban32m
*/
private List<String> calculates;
/**
* 使用http协议的监控配置信息
*/
private HttpProtocol http;
/**
* 使用icmp协议进行ping的监控配置信息
*/
private IcmpProtocol icmp;
/**
* 使用socket实现的tcp或ucp进行服务端口探测配置信息
*/
private TcpUdpProtocol tcpUdp;
/**
* 使用公共的jdbc规范实现的数据库配置信息
*/
private JdbcProtocol jdbc;
}

View File

@@ -0,0 +1,93 @@
package com.usthe.common.entity.job.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* http 协议配置
* @author tomsun28
* @date 2021/10/31 12:41
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class HttpProtocol {
/**
* 对端主机ip或域名
*/
private String host;
/**
* http/https 请求访问的url链接
*/
private String url;
/**
* http是否使用链路加密ssl/tls,即是http还是https
*/
private boolean ssl = false;
/**
* http请求方法: get, post, put, delete, patch
*/
private String method;
/**
* http请求携带头 eg: Content-Type = application/json
*/
private Map<String, String> headers;
/**
* http请求携带查询参数 eg: localhost:80/api?paramKey=value
*/
private Map<String, String> params;
/**
* 认证信息
*/
private Authorization authorization;
/**
* 响应数据解析方式
* default - 自有的数据解析规则
* json_path 自定义jsonPath脚本 https://www.jsonpath.cn/
* xml_path 自定义xmlPath脚本
* prometheus Prometheus数据规则
*/
private String parseType;
/**
* 数据解析脚本 当解析方式为 jsonPath or xmlPath时存在
*/
private String parseScript;
/**
* 认证信息
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Authorization {
/**
* 认证类型Bearer Token, Basic Auth, Digest Auth
*/
private String type;
/**
* Bearer Token's token
*/
private String bearerTokenToken;
/**
* Basic Auth's username
*/
private String basicAuthUsername;
/**
* Basic Auth's password
*/
private String basicAuthPassword;
/**
* Digest Auth's username
*/
private String digestAuthUsername;
/**
* Digest Auth's password
*/
private String digestAuthPassword;
}
}

View File

@@ -0,0 +1,23 @@
package com.usthe.common.entity.job.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* icmp(即ping)协议配置
* @author tomsun28
* @date 2021/10/31 16:41
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class IcmpProtocol {
/**
* 对端主机ip或域名
*/
private String host;
}

View File

@@ -0,0 +1,38 @@
package com.usthe.common.entity.job.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 公共的jdbc规范实现的数据库配置信息
* @author tomsun28
* @date 2021/10/31 17:33
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class JdbcProtocol {
/**
* 对端主机ip或域名
*/
private String host;
/**
* 端口号
*/
private Integer port;
/**
* 数据库用户名(可选)
*/
private String username;
/**
* 数据库密码(可选)
*/
private String password;
/**
* 数据库链接url eg: jdbc:mysql://localhost:3306
*/
private String url;
}

View File

@@ -0,0 +1,30 @@
package com.usthe.common.entity.job.protocol;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 使用socket实现的tcp或ucp进行服务端口可用性探测
* @author tomsun28
* @date 2021/10/31 17:27
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TcpUdpProtocol {
/**
* 具体协议类型 tcp, udp
*/
private String protocol;
/**
* 对端主机ip或域名
*/
private String host;
/**
* 端口号
*/
private Integer port;
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,41 @@
package com.usthe.common.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.etcd.jetcd.ByteSequence;
import javax.annotation.concurrent.ThreadSafe;
import java.nio.charset.StandardCharsets;
/**
* gson 工具类
* @author tomsun28
* @date 2021/10/16 20:49
*/
@ThreadSafe
public class GsonUtil {
private static Gson gson;
static {
gson = new GsonBuilder().enableComplexMapKeySerialization()
.serializeNulls()
.create();
}
public static String toJson(Object source) {
return gson.toJson(source);
}
public static <T> T fromJson(String jsonStr, Class<T> clazz) {
return gson.fromJson(jsonStr, clazz);
}
public static <T> T fromJson(ByteSequence byteSequence, Class<T> clazz) {
if (byteSequence == null || byteSequence.isEmpty()) {
return null;
}
return gson.fromJson(byteSequence.toString(StandardCharsets.UTF_8), clazz);
}
}

View File

@@ -0,0 +1,46 @@
syntax = "proto3";
package com.usthe.common.entity.message;
message MetricsData
{
// 监控的ID
uint64 id = 1;
// 监控的类型 eg: linux | mysql | jvm
string app = 2;
// 监控采集的指标集合 eg: cpu | memory | health
string metrics = 3;
// 采集时间
uint64 time = 4;
// 采集响应码
Code code = 5;
// 采集响应信息
string msg = 6;
// 采集指标名
repeated string fields = 7;
// 采集指标值集合(fields作为字段名称与ValueRow映射)
repeated ValueRow values = 8;
}
message ValueRow
{
// 主键实例,唯一标识这行数据
string instance = 1;
// 采集指标值
repeated string columns = 2;
}
enum Code
{
// 采集成功
SUCCESS = 0;
// 采集器不可用
UN_AVAILABLE = 1;
// 对端不可达(网络层icmp)
UN_REACHABLE = 2;
// 对端连接失败(传输层tcp,udp)
UN_CONNECTABLE = 3;
// 数据采集失败(应用层http,ssh,snmp)
FAIL = 4;
// 采集超时
TIMEOUT = 5;
}