[monitor]优化去除Kafka etcd依赖

This commit is contained in:
tomsun28
2022-01-29 17:11:22 +08:00
parent 6f8e400cab
commit 739dcd6308
60 changed files with 3136 additions and 707 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 250 KiB

View File

@@ -4,16 +4,12 @@
### 模块
- **[manager](manager)** 提供监控管理,系统管理基础服务
> 开发中,提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。
> 提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。
- **[collector](collector)** 提供监控数据采集服务
> 开发中,使用通用协议远程采集获取对端指标数据。
> 使用通用协议远程采集获取对端指标数据。
- **[scheduler](scheduler)** 提供监控任务调度服务
> 开发完成,采集任务管理,一次性任务和周期性任务的调度分发。
> 采集任务管理,一次性任务和周期性任务的调度分发。
- **[warehouse](warehouse)** 提供监控数据仓储服务
> 开发中,采集指标结果数据管理,数据落盘,查询,计算统计。
> 采集指标结果数据管理,数据落盘,查询,计算统计。
- **[alerter](alerter)** 提供告警服务
> 开发中,告警计算触发,监控状态联动,告警配置,告警通知。
### 结构
![arch](Architecture.jpg)
> 告警计算触发,监控状态联动,告警配置,告警通知。

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -20,7 +20,14 @@
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<!-- collector -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>collector</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
<!-- spring -->
<dependency>

View File

@@ -1,7 +1,6 @@
package com.usthe.alert;
import com.usthe.alert.pojo.entity.Alert;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -17,22 +16,12 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class AlerterDataQueue {
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataQueue;
private final LinkedBlockingQueue<Alert> alertDataQueue;
public AlerterDataQueue() {
metricsDataQueue = new LinkedBlockingQueue<>();
alertDataQueue = new LinkedBlockingQueue<>();
}
public void addMetricsData(CollectRep.MetricsData metricsData) {
metricsDataQueue.offer(metricsData);
}
public CollectRep.MetricsData pollMetricsData() throws InterruptedException {
return metricsDataQueue.poll(2, TimeUnit.SECONDS);
}
public void addAlertData(Alert alert) {
alertDataQueue.offer(alert);
}

View File

@@ -4,16 +4,15 @@ import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.alert.AlerterWorkerPool;
import com.usthe.alert.AlerterDataQueue;
import com.usthe.alert.entrance.KafkaDataConsume;
import com.usthe.alert.pojo.entity.Alert;
import com.usthe.alert.pojo.entity.AlertDefine;
import com.usthe.alert.service.AlertDefineService;
import com.usthe.alert.util.AlertTemplateUtil;
import com.usthe.collector.dispatch.export.MetricsDataExporter;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.CommonUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@@ -27,20 +26,21 @@ import java.util.concurrent.ConcurrentHashMap;
* @date 2021/12/9 14:19
*/
@Configuration
@AutoConfigureAfter(value = {KafkaDataConsume.class})
@Slf4j
public class CalculateAlarm {
private AlerterWorkerPool workerPool;
private AlerterDataQueue dataQueue;
private MetricsDataExporter dataExporter;
private AlertDefineService alertDefineService;
private Map<String, Alert> triggeredAlertMap;
private Map<Long, CollectRep.Code> triggeredMonitorStateAlertMap;
public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue,
AlertDefineService alertDefineService) {
AlertDefineService alertDefineService, MetricsDataExporter dataExporter) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.dataExporter = dataExporter;
this.alertDefineService = alertDefineService;
this.triggeredAlertMap = new ConcurrentHashMap<>(128);
this.triggeredMonitorStateAlertMap = new ConcurrentHashMap<>(128);
@@ -51,7 +51,7 @@ public class CalculateAlarm {
Runnable runnable = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollMetricsData();
CollectRep.MetricsData metricsData = dataExporter.pollAlertMetricsData();
if (metricsData != null) {
calculate(metricsData);
}

View File

@@ -1,80 +0,0 @@
package com.usthe.alert.entrance;
import com.usthe.alert.AlerterProperties;
import com.usthe.alert.AlerterWorkerPool;
import com.usthe.alert.AlerterDataQueue;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 从Kafka消费指标组采集数据处理
* @author tom
* @date 2021/11/24 18:03
*/
@Configuration
@AutoConfigureAfter(value = {AlerterProperties.class})
@ConditionalOnProperty(prefix = "alerter.entrance.kafka",
name = "enabled", havingValue = "true", matchIfMissing = true)
@Slf4j
public class KafkaDataConsume implements DisposableBean {
private KafkaConsumer<Long, CollectRep.MetricsData> consumer;
private AlerterWorkerPool workerPool;
private AlerterDataQueue dataQueue;
public KafkaDataConsume(AlerterProperties properties, AlerterWorkerPool workerPool,
AlerterDataQueue dataQueue) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
initConsumer(properties);
startConsumeData();
}
private void startConsumeData() {
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-kafka-data-consumer");
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<Long, CollectRep.MetricsData> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
dataQueue.addMetricsData(record.value());
});
}
};
workerPool.executeJob(runnable);
}
private void initConsumer(AlerterProperties properties) {
if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) {
log.error("init error, please config Warehouse kafka props in application.yml");
throw new IllegalArgumentException("please config Warehouse kafka props");
}
AlerterProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka();
Properties consumerProp = new Properties();
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers());
consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId());
consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class);
consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
consumer = new KafkaConsumer<>(consumerProp);
consumer.subscribe(Collections.singleton(kafkaProp.getTopic()));
}
@Override
public void destroy() throws Exception {
if (consumer != null) {
consumer.close();
}
}
}

View File

@@ -1,24 +0,0 @@
package com.usthe.alert.entrance;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
/**
* MetricsData的反序列化
* @author tom
* @date 2021/11/24 17:29
*/
@Slf4j
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {
@Override
public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) {
try {
return CollectRep.MetricsData.parseFrom(bytes);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}

View File

@@ -6,7 +6,6 @@ com.usthe.alert.AlerterWorkerPool,\
com.usthe.alert.AlerterProperties,\
com.usthe.alert.AlerterDataQueue,\
com.usthe.alert.AlerterConfiguration,\
com.usthe.alert.entrance.KafkaDataConsume,\
com.usthe.alert.calculate.CalculateAlarm,\
com.usthe.alert.controller.AlertsController,\
com.usthe.alert.controller.AlertDefinesController

View File

@@ -1,44 +0,0 @@
<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd
http://maven.apache.org/ASSEMBLY/2.0.0 ">
<!--必填,会追加到打包文件名称的末尾-->
<id>1.0</id>
<!--打包类型,可以设置多种类型,打包的时候不同的类型都会打包打出来-->
<formats>
<format>tar.gz</format>
<format>zip</format>
</formats>
<!--文件相关设置-->
<fileSets>
<!--bin文件下的所有脚本文件输出到打包后的bin目录下-->
<fileSet>
<directory>../../assembly/collector/bin</directory>
<!-- 是否进行属性替换 即使用 ${project.artifactId} -->
<filtered>true</filtered>
<outputDirectory>bin</outputDirectory>
</fileSet>
<!-- src/main/resources目录下配置文件打包到config目录下 -->
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>application.yml</include>
<include>logback-spring.xml</include>
</includes>
<!-- 是否进行属性替换 即使用 ${project.artifactId} -->
<filtered>true</filtered>
<outputDirectory>${file.separator}config</outputDirectory>
</fileSet>
<!-- 将target目录下的启动jar打包到目录下-->
<fileSet>
<directory>target</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>*executable.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>

View File

@@ -1,109 +0,0 @@
#!/bin/bash
# 项目名称
SERVER_NAME="${project.artifactId}"
# jar名称
JAR_NAME="${project.build.finalName}-executable.jar"
# 进入bin目录
cd `dirname $0`
# bin目录绝对路径
BIN_DIR=`pwd`
# 返回到上一级项目根目录路径
cd ..
# 打印项目根目录绝对路径
# `pwd` 执行系统命令并获得结果
DEPLOY_DIR=`pwd`
# 外部配置文件绝对目录,如果是目录需要/结尾,也可以直接指定文件
# 如果指定的是目录,spring则会读取目录中的所有配置文件
CONF_DIR=$DEPLOY_DIR/config
# SERVER_PORT=`sed '/server.port/!d;s/.*=//' config/application.properties | tr -d '\r'`
# 获取应用的端口号
SERVER_PORT=`sed -nr '/port: [0-9]+/ s/.*port: +([0-9]+).*/\1/p' config/application.yml`
PIDS=`ps -f | grep java | grep "$CONF_DIR" |awk '{print $2}'`
if [ "$1" = "status" ]; then
if [ -n "$PIDS" ]; then
echo "The $SERVER_NAME is running...!"
echo "PID: $PIDS"
exit 0
else
echo "The $SERVER_NAME is stopped"
exit 0
fi
fi
if [ -n "$PIDS" ]; then
echo "ERROR: The $SERVER_NAME already started!"
echo "PID: $PIDS"
exit 1
fi
if [ -n "$SERVER_PORT" ]; then
SERVER_PORT_COUNT=`netstat -tln | grep $SERVER_PORT | wc -l`
if [ $SERVER_PORT_COUNT -gt 0 ]; then
echo "ERROR: The $SERVER_NAME port $SERVER_PORT already used!"
exit 1
fi
fi
# 项目日志输出绝对路径
LOGS_DIR=$DEPLOY_DIR/logs
# 如果logs文件夹不存在,则创建文件夹
if [ ! -d $LOGS_DIR ]; then
mkdir $LOGS_DIR
fi
STDOUT_FILE=$LOGS_DIR/catalina.log
# JVM Configuration
JAVA_OPTS=" -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true "
JAVA_DEBUG_OPTS=""
if [ "$1" = "debug" ]; then
JAVA_DEBUG_OPTS=" -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n "
fi
JAVA_JMX_OPTS=""
if [ "$1" = "jmx" ]; then
JAVA_JMX_OPTS=" -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false "
fi
JAVA_MEM_OPTS=""
BITS=`java -version 2>&1 | grep -i 64-bit`
if [ -n "$BITS" ]; then
JAVA_MEM_OPTS=" -server -Xmx512m -Xms512m -Xmn256m -XX:PermSize=128m -Xss256k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "
else
JAVA_MEM_OPTS=" -server -Xms512m -Xmx512m -XX:PermSize=128m -XX:SurvivorRatio=2 -XX:+UseParallelGC "
fi
# 加载外部log4j2文件的配置
LOG_IMPL_FILE=log4j2.xml
LOGGING_CONFIG=""
if [ -f "$CONF_DIR/$LOG_IMPL_FILE" ]
then
LOGGING_CONFIG="-Dlogging.config=$CONF_DIR/$LOG_IMPL_FILE"
fi
CONFIG_FILES=" -Dlogging.path=$LOGS_DIR $LOGGING_CONFIG -Dspring.config.location=$CONF_DIR/ "
echo -e "Starting the $SERVER_NAME ..."
nohup java $JAVA_OPTS $JAVA_MEM_OPTS $JAVA_DEBUG_OPTS $JAVA_JMX_OPTS $CONFIG_FILES -jar $DEPLOY_DIR/lib/$JAR_NAME > $STDOUT_FILE 2>&1 &
COUNT=0
while [ $COUNT -lt 1 ]; do
echo -e ".\c"
sleep 1
if [ -n "$SERVER_PORT" ]; then
COUNT=`netstat -an | grep $SERVER_PORT | wc -l`
else
COUNT=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}' | wc -l`
fi
if [ $COUNT -gt 0 ]; then
break
fi
done
echo "OK!"
PIDS=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}'`
echo "PID: $PIDS"
echo "STDOUT: $STDOUT_FILE"

View File

@@ -1,18 +0,0 @@
#!/bin/bash
# 项目名称
APPLICATION="${project.artifactId}"
# 项目启动jar包名称
APPLICATION_JAR="${project.build.finalName}.jar"
# 通过项目名称查找到PI然后kill -9 pid
PID=$(ps -ef | grep "${APPLICATION_JAR}" | grep -v grep | awk '{ print $2 }')
if [[ -z "$PID" ]]
then
echo ${APPLICATION} is already stopped
else
echo kill ${PID}
kill -9 ${PID}
echo ${APPLICATION} stopped successfully
fi

View File

@@ -33,16 +33,4 @@
#### HELP
1. ARK插件类隔离未生效
> 注意需构建在jdk1.8环境中运行
> 插件是否配置导入并配置正确
> 本地DEBUG时需单独IDEA打开运行collector工程不能将plugin和collector在同一工程打开运行
2. metaspace元空间内存占用多或溢出
> 建议调整JVM参数 ```-Dsun.reflect.inflationThreshold=100000```
> 由于使用太多反射,超过参数`inflationThreshold`默认值15阈值导致触发JVM反射优化(加快反射速度),
> 反射获取类信息由使用*JNI存取器**膨胀(Inflation)*
> 为*反射每个方法生成一个类加载器DelegatingClassLoader和Java类MethodAccessor*.
> 动态加载的字节码导致PermGen持续增长.

View File

@@ -1,19 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>collector</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>plugins</artifactId>
<packaging>pom</packaging>
<modules>
<module>sample-plugin</module>
</modules>
</project>

View File

@@ -1,48 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>plugins</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sample-plugin</artifactId>
<build>
<plugins>
<plugin>
<!--link https://www.sofastack.tech/projects/sofa-boot/sofa-ark-ark-plugin/ -->
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-ark-plugin-maven-plugin</artifactId>
<version>1.1.6</version>
<executions>
<execution>
<id>default-cli</id>
<goals>
<goal>ark-plugin</goal>
</goals>
<configuration>
<!-- 配置优先级数字越小优先级越高优先启动优先导出类默认1000 -->
<priority>2000</priority>
<!-- 配置导出类、资源 -->
<exported>
<!-- 配置类级别导出类 -->
<classes>
<class>com.com.usthe.plugin.sample.ExportDemo</class>
</classes>
</exported>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,12 +0,0 @@
package com.usthe.collector.plugin;
/**
* @author tomsun28
* @date 2021/10/8 15:12
*/
public class SameClass {
public static String hello() {
return "hello plugin";
}
}

View File

@@ -1,14 +0,0 @@
package com.usthe.plugin.sample;
import com.usthe.collector.plugin.SameClass;
/**
* @author tomsun28
* @date 2021/10/8 15:11
*/
public class ExportDemo {
public String hello() {
return SameClass.hello();
}
}

View File

@@ -5,14 +5,85 @@
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector</artifactId>
<packaging>pom</packaging>
<modules>
<module>server</module>
<module>plugins</module>
</modules>
<dependencies>
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<!-- common -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- etcd -->
<dependency>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.5.11</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<!-- http -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!--network-->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.8.0</version>
</dependency>
<!--json path parser-->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.6.0</version>
</dependency>
<!-- lru hashmap -->
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
<dependency>
<groupId>com.googlecode.aviator</groupId>
<artifactId>aviator</artifactId>
<version>5.2.7</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,23 @@
package com.usthe.collector.collect;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
/**
* 具体的指标组采集实现抽象类
* @author tomsun28
* @date 2021/11/4 9:35
*/
public abstract class AbstractCollect {
/**
* 真正的采集实现接口
* @param builder response builder
* @param appId 应用监控ID
* @param app 应用类型
* @param metrics 指标组配置
* return response builder
*/
public abstract void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics);
}

View File

@@ -0,0 +1,343 @@
package com.usthe.collector.collect.http;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.common.http.HttpClientPool;
import com.usthe.collector.dispatch.DispatchConstants;
import com.usthe.collector.util.CollectorConstants;
import com.usthe.collector.util.JsonPathParser;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.HttpProtocol;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.IpDomainUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.springframework.http.HttpMethod;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
/**
* http https 采集实现类
* @author tomsun28
* @date 2021/11/4 15:37
*/
@Slf4j
public class HttpCollectImpl extends AbstractCollect {
private HttpCollectImpl() {}
public static HttpCollectImpl getInstance() {
return Singleton.INSTANCE;
}
@Override
public void collect(CollectRep.MetricsData.Builder builder,
long appId, String app, Metrics metrics) {
long startTime = System.currentTimeMillis();
// 简单校验必有参数
if (metrics == null || metrics.getHttp() == null) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Http/Https collect must has http params");
return;
}
HttpContext httpContext = createHttpContext(metrics.getHttp());
HttpUriRequest request = createHttpRequest(metrics.getHttp());
try {
CloseableHttpResponse response = HttpClientPool.getHttpClient()
.execute(request, httpContext);
int statusCode = response.getStatusLine().getStatusCode();
log.debug("http response status: {}", statusCode);
if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_BAD_REQUEST) {
// 1XX 3XX 4XX 5XX 状态码 失败
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("StatusCode " + statusCode);
return;
} else {
// 2xx 状态码 成功
String resp = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
// 根据不同的解析方式解析
if (resp == null || "".equals(resp)) {
log.info("http response entity is empty, status: {}.", statusCode);
builder.setCode(CollectRep.Code.SUCCESS);
builder.setMsg("statusCode: " + statusCode + ",entity empty.");
return;
}
Long responseTime = System.currentTimeMillis() - startTime;
String parseType = metrics.getHttp().getParseType();
try {
if (DispatchConstants.PARSE_DEFAULT.equals(parseType)) {
parseResponseByDefault(resp, metrics.getAliasFields(), builder, responseTime);
} else if (DispatchConstants.PARSE_JSON_PATH.equals(parseType)) {
parseResponseByJsonPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder, responseTime);
} else if (DispatchConstants.PARSE_PROMETHEUS.equals(parseType)) {
parseResponseByPrometheus(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else if (DispatchConstants.PARSE_XML_PATH.equals(parseType)) {
parseResponseByXmlPath(resp, metrics.getAliasFields(), metrics.getHttp(), builder);
} else if (DispatchConstants.PARSE_WEBSITE.equals(parseType)){
parseResponseByWebsite(resp, metrics.getAliasFields(), builder, responseTime);
} else {
parseResponseByDefault(resp, metrics.getAliasFields(), builder, responseTime);
}
} catch (Exception e) {
log.info("parse error: {}.", e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("parse response data error:" + e.getMessage());
return;
}
}
} catch (ClientProtocolException e1) {
log.error(e1.getCause().getMessage(), e1);
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg(e1.getCause().getMessage());
return;
} catch (UnknownHostException e2) {
// 对端不可达
log.info(e2.getMessage());
builder.setCode(CollectRep.Code.UN_REACHABLE);
builder.setMsg("unknown host");
return;
} catch (InterruptedIOException | ConnectException | SSLException e3) {
// 对端连接失败
log.info(e3.getMessage());
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg(e3.getMessage());
return;
} catch (IOException e4) {
// 其它IO异常
log.info(e4.getMessage());
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e4.getMessage());
return;
} catch (Exception e) {
// 其它异常
log.error(e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg(e.getMessage());
return;
} finally {
if (request != null) {
request.abort();
}
}
}
private void parseResponseByWebsite(String resp, List<String> aliasFields,
CollectRep.MetricsData.Builder builder, Long responseTime) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
// todo resp 网站关键字监测
for (String alias : aliasFields) {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
valueRowBuilder.addColumns(responseTime.toString());
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
builder.addValues(valueRowBuilder.build());
}
private void parseResponseByXmlPath(String resp, List<String> aliasFields, HttpProtocol http,
CollectRep.MetricsData.Builder builder) {
}
private void parseResponseByJsonPath(String resp, List<String> aliasFields, HttpProtocol http,
CollectRep.MetricsData.Builder builder, Long responseTime) {
List<Map<String, Object>> results = JsonPathParser.parseContentWithJsonPath(resp,http. getParseScript());
for (Map<String, Object> stringMap : results) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String alias : aliasFields) {
Object value = stringMap.get(alias);
if (value != null) {
valueRowBuilder.addColumns(String.valueOf(value));
} else {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
valueRowBuilder.addColumns(responseTime.toString());
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
}
builder.addValues(valueRowBuilder.build());
}
}
private void parseResponseByPrometheus(String resp, List<String> aliasFields, HttpProtocol http,
CollectRep.MetricsData.Builder builder) {
}
private void parseResponseByDefault(String resp, List<String> aliasFields,
CollectRep.MetricsData.Builder builder, Long responseTime) {
JsonElement element = JsonParser.parseString(resp);
if (element.isJsonArray()) {
JsonArray array = element.getAsJsonArray();
for (JsonElement jsonElement : array) {
if (jsonElement.isJsonObject()) {
JsonObject object = jsonElement.getAsJsonObject();
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String alias : aliasFields) {
JsonElement valueElement = object.get(alias);
if (valueElement != null) {
String value = valueElement.getAsString();
valueRowBuilder.addColumns(value);
} else {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
valueRowBuilder.addColumns(responseTime.toString());
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
}
builder.addValues(valueRowBuilder.build());
}
}
} else if (element.isJsonObject()) {
JsonObject object = element.getAsJsonObject();
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String alias : aliasFields) {
JsonElement valueElement = object.get(alias);
if (valueElement != null) {
String value = valueElement.getAsString();
valueRowBuilder.addColumns(value);
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
builder.addValues(valueRowBuilder.build());
}
}
/**
* 创建httpContext
* @param httpProtocol http protocol
* @return context
*/
private HttpContext createHttpContext(HttpProtocol httpProtocol) {
HttpProtocol.Authorization auth = httpProtocol.getAuthorization();
if (auth != null && !DispatchConstants.BEARER_TOKEN.equals(auth.getType())) {
HttpClientContext clientContext = new HttpClientContext();
if (DispatchConstants.BASIC_AUTH.equals(auth.getType()) && auth.getBasicAuthUsername() != null
&& auth.getBasicAuthPassword() != null) {
CredentialsProvider provider = new BasicCredentialsProvider();
UsernamePasswordCredentials credentials
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
provider.setCredentials(AuthScope.ANY, credentials);
clientContext.setCredentialsProvider(provider);
} else if (DispatchConstants.DIGEST_AUTH.equals(auth.getType()) && auth.getDigestAuthUsername() != null
&& auth.getDigestAuthPassword() != null) {
CredentialsProvider provider = new BasicCredentialsProvider();
UsernamePasswordCredentials credentials
= new UsernamePasswordCredentials(auth.getBasicAuthUsername(), auth.getBasicAuthPassword());
provider.setCredentials(AuthScope.ANY, credentials);
clientContext.setCredentialsProvider(provider);
} else {
clientContext = null;
}
return clientContext;
}
return null;
}
/**
* 根据http配置参数构造请求头
* @param httpProtocol http参数配置
* @return 请求体
*/
private HttpUriRequest createHttpRequest(HttpProtocol httpProtocol) {
RequestBuilder requestBuilder;
// method
String httpMethod = httpProtocol.getMethod().toUpperCase();
if (HttpMethod.GET.matches(httpMethod)) {
requestBuilder = RequestBuilder.get();
} else if (HttpMethod.POST.matches(httpMethod)) {
requestBuilder = RequestBuilder.post();
} else if (HttpMethod.PUT.matches(httpMethod)) {
requestBuilder = RequestBuilder.put();
} else if (HttpMethod.DELETE.matches(httpMethod)) {
requestBuilder = RequestBuilder.delete();
} else if (HttpMethod.PATCH.matches(httpMethod)) {
requestBuilder = RequestBuilder.patch();
} else {
// not support the method
log.error("not support the http method: {}.", httpProtocol.getMethod());
return null;
}
// params
Map<String, String> params = httpProtocol.getParams();
if (params != null && !params.isEmpty()) {
for (Map.Entry<String, String> param : params.entrySet()) {
requestBuilder.addParameter(param.getKey(), param.getValue());
}
}
// headers
Map<String, String> headers = httpProtocol.getHeaders();
if (headers != null && !headers.isEmpty()) {
for (Map.Entry<String, String> header : headers.entrySet()) {
requestBuilder.addHeader(header.getKey(), header.getValue());
}
}
// keep-alive
requestBuilder.addHeader(HttpHeaders.CONNECTION, "keep-alive");
// add accept
if (DispatchConstants.PARSE_DEFAULT.equals(httpProtocol.getParseType())
|| DispatchConstants.PARSE_JSON_PATH.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, "application/json");
} else if (DispatchConstants.PARSE_XML_PATH.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, "text/xml,application/xml");
} else if (DispatchConstants.PARSE_PROMETHEUS.equals(httpProtocol.getParseType())) {
requestBuilder.addHeader(HttpHeaders.ACCEPT, DispatchConstants.PARSE_PROMETHEUS_ACCEPT);
requestBuilder.addHeader(HttpHeaders.ACCEPT_ENCODING, "gzip");
} else {
requestBuilder.addHeader(HttpHeaders.ACCEPT, "*/*");
}
// 判断是否使用Bearer Token认证
if (httpProtocol.getAuthorization() != null
&& DispatchConstants.BEARER_TOKEN.equals(httpProtocol.getAuthorization().getType())) {
// 若使用 将token放入到header里面
String value = DispatchConstants.BEARER + " " + httpProtocol.getAuthorization().getBearerTokenToken();
requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
}
// todo 处理请求内容 body 暂不支持body
// uri
if (IpDomainUtil.isHasSchema(httpProtocol.getHost())) {
requestBuilder.setUri(httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
} else {
boolean ssl = Boolean.parseBoolean(httpProtocol.getSsl());
if (ssl) {
requestBuilder.setUri("https://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
} else {
requestBuilder.setUri("http://" + httpProtocol.getHost() + ":" + httpProtocol.getPort() + httpProtocol.getUrl());
}
}
return requestBuilder.build();
}
private static class Singleton {
private static final HttpCollectImpl INSTANCE = new HttpCollectImpl();
}
}

View File

@@ -0,0 +1,86 @@
package com.usthe.collector.collect.icmp;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.util.CollectorConstants;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.IcmpProtocol;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* icmp协议采集实现 - ping
* @author tom
* @date 2021/12/4 12:32
*/
@Slf4j
public class IcmpCollectImpl extends AbstractCollect {
private IcmpCollectImpl(){}
public static IcmpCollectImpl getInstance() {
return IcmpCollectImpl.Singleton.INSTANCE;
}
@Override
public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
long startTime = System.currentTimeMillis();
// 简单校验必有参数
if (metrics == null || metrics.getIcmp() == null) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("ICMP collect must has icmp params");
return;
}
IcmpProtocol icmp = metrics.getIcmp();
// 超时时间默认300毫秒
int timeout = 300;
try {
timeout = Integer.parseInt(icmp.getTimeout());
} catch (Exception e) {
log.warn(e.getMessage());
}
try {
// todo 需要配置java虚拟机root权限从而使用ICMP否则是判断telnet对端7号端口是否开通
// https://stackoverflow.com/questions/11506321/how-to-ping-an-ip-address
boolean status = InetAddress.getByName(icmp.getHost()).isReachable(timeout);
long responseTime = System.currentTimeMillis() - startTime;
if (status) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String alias : metrics.getAliasFields()) {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
valueRowBuilder.addColumns(Long.toString(responseTime));
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
builder.addValues(valueRowBuilder.build());
} else {
builder.setCode(CollectRep.Code.UN_REACHABLE);
builder.setMsg("对端不可达Timeout " + timeout + "ms");
return;
}
} catch (UnknownHostException unknownHostException) {
builder.setCode(CollectRep.Code.UN_REACHABLE);
builder.setMsg("UnknownHost " + unknownHostException.getMessage());
return;
} catch (IOException ioException) {
builder.setCode(CollectRep.Code.UN_REACHABLE);
builder.setMsg("IOException " + ioException.getMessage());
return;
} catch (Exception e) {
log.error(e.getMessage(), e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("IllegalArgument " + e.getMessage());
}
}
private static class Singleton {
private static final IcmpCollectImpl INSTANCE = new IcmpCollectImpl();
}
}

View File

@@ -0,0 +1,93 @@
package com.usthe.collector.collect.telnet;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.util.CollectorConstants;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.job.protocol.TelnetProtocol;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.telnet.TelnetClient;
import java.io.IOException;
import java.net.ConnectException;
/**
* icmp协议采集实现 - ping
* @author tom
* @date 2021/12/4 12:32
*/
@Slf4j
public class TelnetCollectImpl extends AbstractCollect {
private TelnetCollectImpl(){}
public static TelnetCollectImpl getInstance() {
return TelnetCollectImpl.Singleton.INSTANCE;
}
@Override
public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) {
long startTime = System.currentTimeMillis();
// 简单校验必有参数
if (metrics == null || metrics.getTelnet() == null) {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Telnet collect must has telnet params");
return;
}
TelnetProtocol telnet = metrics.getTelnet();
// 超时时间默认300毫秒
int timeout = 300;
try {
timeout = Integer.parseInt(telnet.getTimeout());
} catch (Exception e) {
log.warn(e.getMessage());
}
TelnetClient telnetClient = null;
try {
//指明Telnet终端类型否则会返回来的数据中文会乱码
telnetClient = new TelnetClient("vt200");
telnetClient.setConnectTimeout(timeout);
telnetClient.connect(telnet.getHost(),Integer.parseInt(telnet.getPort()));
long responseTime = System.currentTimeMillis() - startTime;
if (telnetClient.isConnected()) {
CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
for (String alias : metrics.getAliasFields()) {
if (CollectorConstants.RESPONSE_TIME.equalsIgnoreCase(alias)) {
valueRowBuilder.addColumns(Long.toString(responseTime));
} else {
valueRowBuilder.addColumns(CommonConstants.NULL_VALUE);
}
}
builder.addValues(valueRowBuilder.build());
} else {
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("对端连接失败Timeout " + timeout + "ms");
return;
}
telnetClient.disconnect();
} catch (ConnectException connectException) {
log.debug(connectException.getMessage());
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("对端拒绝连接:服务未启动端口监听或防火墙");
} catch (IOException ioException) {
log.debug(ioException.getMessage());
builder.setCode(CollectRep.Code.UN_CONNECTABLE);
builder.setMsg("对端连接失败 " + ioException.getMessage());
} finally {
if (telnetClient != null) {
try {
telnetClient.disconnect();
} catch (Exception e) {
log.warn(e.getMessage());
}
}
}
}
private static class Singleton {
private static final TelnetCollectImpl INSTANCE = new TelnetCollectImpl();
}
}

View File

@@ -0,0 +1,15 @@
package com.usthe.collector.common;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* java common 的配置属性
* @author tomsun28
* @date 2021/10/16 14:23
*/
@Component
@ConfigurationProperties(prefix = "collector.common")
public class CollectorProperties {
}

View File

@@ -0,0 +1,23 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
/**
* 采集数据调度器接口
* @author tomsun28
* @date 2021/11/2 11:20
*/
public interface CollectDataDispatch {
/**
* 处理分发采集结果数据
* @param timeout 时间轮timeout
* @param metrics 下面的指标组采集任务
* @param metricsData 采集结果数据
*/
void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData);
}

View File

@@ -0,0 +1,202 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.export.MetricsDataExporter;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.collector.dispatch.timer.TimerDispatch;
import com.usthe.collector.dispatch.timer.WheelTimerTask;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 指标组采集任务与响应数据调度器
* @author tomsun28
* @date 2021/11/2 11:24
*/
@Component
@Slf4j
public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatch {
/**
* 指标组采集任务超时时间值
*/
private static final long DURATION_TIME = 120_000L;
/**
* 指标组采集任务优先级队列
*/
private MetricsCollectorQueue jobRequestQueue;
/**
* 时间轮任务调度器
*/
private TimerDispatch timerDispatch;
/**
* kafka采集数据导出器
*/
private MetricsDataExporter kafkaDataExporter;
/**
* 指标组任务与开始时间映射map
*/
private Map<String, MetricsTime> metricsTimeoutMonitorMap;
public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch,
MetricsDataExporter kafkaDataExporter, WorkerPool workerPool) {
this.kafkaDataExporter = kafkaDataExporter;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS,
new SynchronousQueue<>(), r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
});
// 从任务队列拉取指标组采集任务放入线程池执行
poolExecutor.execute(() -> {
Thread.currentThread().setName("metrics-task-dispatcher");
while (!Thread.currentThread().isInterrupted()) {
MetricsCollect metricsCollect = null;
try {
metricsCollect = jobRequestQueue.getJob();
if (metricsCollect != null) {
workerPool.executeJob(metricsCollect);
}
} catch (RejectedExecutionException rejected) {
log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " +
"sleep and put in queue again.");
try {
Thread.sleep(1000);
if (metricsCollect != null) {
// 在队列里的优先级增大
metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1));
jobRequestQueue.addJob(metricsCollect);
}
} catch (InterruptedException interruptedException){}
} catch (Exception e) {
log.error("[Dispatcher]-{}.", e.getMessage(), e);
}
}
});
// 监控指标组采集任务执行时间
metricsTimeoutMonitorMap = new ConcurrentHashMap<>(128);
poolExecutor.execute(() -> {
Thread.currentThread().setName("metrics-task-monitor");
while (!Thread.currentThread().isInterrupted()) {
try {
// 检测每个指标组采集单元是否超时2分钟,超时则丢弃并返回异常
long deadline = System.currentTimeMillis() - DURATION_TIME;
for (Map.Entry<String, MetricsTime> entry : metricsTimeoutMonitorMap.entrySet()) {
MetricsTime metricsTime = entry.getValue();
if (metricsTime.getStartTime() < deadline) {
// 指标组采集超时
WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task();
CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder()
.setId(timerJob.getJob().getMonitorId())
.setApp(timerJob.getJob().getApp())
.setMetrics(metricsTime.getMetrics().getName())
.setTime(System.currentTimeMillis())
.setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build();
dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData);
metricsTimeoutMonitorMap.remove(entry.getKey());
}
}
Thread.sleep(20000);
} catch (Exception e){
log.error("[Monitor]-{}.", e.getMessage(), e);
}
}
});
}
@Override
public void dispatchMetricsTask(Timeout timeout) {
// 将单个应用的采集任务根据其下的指标组拆分为对应的指标组采集任务 AbstractCollect
// 将每个指标组放入线程池进行调度
WheelTimerTask timerTask = (WheelTimerTask) timeout.task();
Job job = timerTask.getJob();
job.constructPriorMetrics();
Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
metricsSet.forEach(metrics -> {
MetricsCollect metricsCollect = new MetricsCollect(metrics, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
}
@Override
public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
metricsTimeoutMonitorMap.remove(job.getId() + "-" + metrics.getName());
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
kafkaDataExporter.send(metricsData);
if (metricsSet == null) {
// 此Job所有指标组采集执行完成
// 周期性任务再次将任务push到时间轮
// 先判断此次任务执行时间与任务采集间隔时间
if (timeout.isCancelled()) {
return;
}
long spendTime = System.currentTimeMillis() - job.getDispatchTime();
long interval = job.getInterval() - spendTime / 1000;
interval = interval <= 0 ? 0 : interval;
// 重置构造执行指标组视图
job.constructPriorMetrics();
timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
// 需等待其它同级别指标组执行完成后进入下一级别执行
}
} else {
// 若是临时性一次任务,需等待所有指标组的采集数据统一包装返回
// 将当前指标组数据插入job里统一组装
job.addCollectMetricsData(metricsData);
if (metricsSet == null) {
// 此Job所有指标组采集执行完成
// 将所有指标组数据组合一起通知结果监听器
timerDispatch.responseSyncJobData(job.getId(), job.getResponseDataTemp());
} else if (!metricsSet.isEmpty()) {
// 当前级别指标组执行完成,开始执行下一级别的指标组
metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout));
});
} else {
// 当前执行级别的指标组列表未全执行完成,
// 需等待其它同级别指标组执行完成后进入下一级别执行
}
}
}
@Data
@AllArgsConstructor
private static class MetricsTime {
private long startTime;
private Metrics metrics;
private Timeout timeout;
}
}

View File

@@ -0,0 +1,68 @@
package com.usthe.collector.dispatch;
/**
* dispatch 常量
* @author tomsun28
* @date 2021/11/3 16:50
*/
public interface DispatchConstants {
// 协议类型相关 - start //
/**
* 协议 http
*/
String PROTOCOL_HTTP = "http";
/**
* 协议 icmp
*/
String PROTOCOL_ICMP = "icmp";
/**
* 协议 telnet
*/
String PROTOCOL_TELNET = "telnet";
/**
* 协议 jdbc
*/
String PROTOCOL_JDBC = "jdbc";
// 协议类型相关 - end //
// http协议相关 - start 需尽可能先复用 HttpHeaders //
/**
* 认证方式 Bearer Token
*/
String BEARER_TOKEN = "Bearer Token";
/**
* Bearer Token 的认证参数字符
*/
String BEARER = "Bearer";
/**
* 认证方式 Basic Auth
*/
String BASIC_AUTH = "Basic Auth";
/**
* 认证方式 Digest Auth
*/
String DIGEST_AUTH = "Digest Auth";
/**
* 解析方式 默认规则
*/
String PARSE_DEFAULT = "default";
/**
* 解析方式 自定义json path
*/
String PARSE_JSON_PATH = "jsonPath";
/**
* 解析方式 自定义xml path
*/
String PARSE_XML_PATH = "xmlPath";
/**
* 解析方式 网站可用性监控规则 提供responseTime指标
*/
String PARSE_WEBSITE = "website";
/**
* 解析方式 prometheus规则
*/
String PARSE_PROMETHEUS = "prometheus";
String PARSE_PROMETHEUS_ACCEPT = "application/openmetrics-text; version=0.0.1,text/plain;version=0.0.4;q=0.5,*/*;q=0.1";
// http协议相关 - end //
}

View File

@@ -0,0 +1,225 @@
package com.usthe.collector.dispatch;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 调度分发任务配置属性
* @author tomsun28
* @date 2021/10/16 14:54
*/
@Component
@ConfigurationProperties(prefix = "collector.dispatch")
public class DispatchProperties {
/**
* 调度入口配置属性
*/
private EntranceProperties entrance;
/**
* 调度数据出口配置属性
*/
private ExportProperties export;
public EntranceProperties getEntrance() {
return entrance;
}
public void setEntrance(EntranceProperties entrance) {
this.entrance = entrance;
}
public ExportProperties getExport() {
return export;
}
public void setExport(ExportProperties export) {
this.export = export;
}
/**
* 调度入口配置属性
* 入口可以时etcd信息,http请求,消息中间件消息请求
*/
public static class EntranceProperties {
/**
* etcd配置信息
*/
private EtcdProperties etcd;
public EtcdProperties getEtcd() {
return etcd;
}
public void setEtcd(EtcdProperties etcd) {
this.etcd = etcd;
}
public static class EtcdProperties {
/**
* etcd调度是否启动
*/
private boolean enabled = true;
/**
* etcd的连接端点url
*/
private String[] endpoints = new String[] {"http://127.0.0.1:2379"};
/**
* etcd连接用户名
*/
private String username;
/**
* etcd连接密码
*/
private String password;
/**
* etcd租约的有效时间 单位秒
*/
private long ttl = 200;
/**
* 采集器注册目录
*/
private String collectorDir = "/usthe/dispatch/collector/";
/**
* 任务调度分发目录
*/
private String assignDir = "/usthe/dispatch/assign/";
/**
* 任务详细目录
*/
private String jobDir = "/usthe/dispatch/job/";
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String[] getEndpoints() {
return endpoints;
}
public void setEndpoints(String[] endpoints) {
this.endpoints = endpoints;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public long getTtl() {
return ttl;
}
public void setTtl(long ttl) {
this.ttl = ttl;
}
public String getCollectorDir() {
return collectorDir;
}
public void setCollectorDir(String collectorDir) {
this.collectorDir = collectorDir;
}
public String getAssignDir() {
return assignDir;
}
public void setAssignDir(String assignDir) {
this.assignDir = assignDir;
}
public String getJobDir() {
return jobDir;
}
public void setJobDir(String jobDir) {
this.jobDir = jobDir;
}
}
}
/**
* 调度数据出口配置属性
*/
public static class ExportProperties {
/**
* kafka配置信息
*/
private KafkaProperties kafka;
public KafkaProperties getKafka() {
return kafka;
}
public void setKafka(KafkaProperties kafka) {
this.kafka = kafka;
}
public static class KafkaProperties {
/**
* kafka数据出口是否启动
*/
private boolean enabled = true;
/**
* kafka的连接服务器url
*/
private String servers = "http://127.0.0.1:2379";
/**
* 发送数据的topic名称
*/
private String topic;
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getServers() {
return servers;
}
public void setServers(String servers) {
this.servers = servers;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}
}
}

View File

@@ -0,0 +1,265 @@
package com.usthe.collector.dispatch;
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.usthe.collector.collect.AbstractCollect;
import com.usthe.collector.collect.http.HttpCollectImpl;
import com.usthe.collector.collect.icmp.IcmpCollectImpl;
import com.usthe.collector.collect.telnet.TelnetCollectImpl;
import com.usthe.collector.dispatch.timer.Timeout;
import com.usthe.collector.dispatch.timer.WheelTimerTask;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.common.util.CommonUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 指标组采集
* @author tomsun28
* @date 2021/10/10 15:35
*/
@Slf4j
@Data
public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
/**
* 监控ID
*/
protected long monitorId;
/**
* 监控类型名称
*/
protected String app;
/**
* 指标组配置
*/
protected Metrics metrics;
/**
* 时间轮timeout
*/
protected Timeout timeout;
/**
* 任务和数据调度
*/
protected CollectDataDispatch collectDataDispatch;
/**
* 任务执行优先级
*/
protected byte runPriority;
/**
* 是周期性采集还是一次性采集 true-周期性 false-一次性
*/
protected boolean isCyclic;
/**
* 指标组采集任务新建时间
*/
protected long newTime;
/**
* 指标组采集任务开始执行时间
*/
protected long startTime;
public MetricsCollect(Metrics metrics, Timeout timeout, CollectDataDispatch collectDataDispatch) {
this.newTime = System.currentTimeMillis();
this.timeout = timeout;
this.metrics = metrics;
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
this.monitorId = job.getMonitorId();
this.app = job.getApp();
this.collectDataDispatch = collectDataDispatch;
this.isCyclic = job.isCyclic();
// 临时一次性任务执行优先级高
if (isCyclic) {
runPriority = (byte) -1;
} else {
runPriority = (byte) 1;
}
}
@Override
public void run() {
this.startTime = System.currentTimeMillis();
setNewThreadName(monitorId, app, startTime, metrics);
CollectRep.MetricsData.Builder response = CollectRep.MetricsData.newBuilder();
response.setApp(app);
response.setId(monitorId);
response.setMetrics(metrics.getName());
// 根据指标组采集协议,应用类型等来调度到真正的应用指标组采集实现类
AbstractCollect abstractCollect = null;
switch (metrics.getProtocol()) {
case DispatchConstants.PROTOCOL_HTTP:
abstractCollect = HttpCollectImpl.getInstance();
break;
case DispatchConstants.PROTOCOL_ICMP:
abstractCollect = IcmpCollectImpl.getInstance();
break;
case DispatchConstants.PROTOCOL_TELNET:
abstractCollect = TelnetCollectImpl.getInstance();
break;
// todo
default: break;
}
if (abstractCollect == null) {
log.error("[Dispatcher] - not support this: app: {}, metrics: {}, protocol: {}.",
app, metrics.getName(), metrics.getProtocol());
response.setCode(CollectRep.Code.FAIL);
response.setMsg("not support " + app + ", "
+ metrics.getName() + ", " + metrics.getProtocol());
return;
} else {
try {
abstractCollect.collect(response, monitorId, app, metrics);
} catch (Exception e) {
String msg = e.getMessage();
if (msg == null && e.getCause() != null) {
msg = e.getCause().getMessage();
}
log.error("[Metrics Collect]: {}.", msg, e);
response.setCode(CollectRep.Code.FAIL);
if (msg != null) {
response.setMsg(e.getMessage());
}
}
}
// 别名属性表达式替换计算
if (fastFailed()) {
return;
}
calculateFields(metrics, response);
CollectRep.MetricsData metricsData = validateResponse(response);
collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
}
/**
* 根据 calculates 和 aliasFields 配置计算出真正的指标(fields)值
* 计算instance实例值
* @param metrics 指标组配置
* @param collectData 采集数据
*/
private void calculateFields(Metrics metrics, CollectRep.MetricsData.Builder collectData) {
collectData.setPriority(metrics.getPriority());
List<CollectRep.Field> fieldList = new LinkedList<>();
for (Metrics.Field field : metrics.getFields()) {
fieldList.add(CollectRep.Field.newBuilder().setName(field.getField()).setType(field.getType()).build());
}
collectData.addAllFields(fieldList);
List<CollectRep.ValueRow> aliasRowList = collectData.getValuesList();
if (aliasRowList == null || aliasRowList.isEmpty()) {
return;
}
collectData.clearValues();
// 先预处理 calculates
if (metrics.getCalculates() == null) {
metrics.setCalculates(Collections.emptyList());
}
Map<String, Expression> fieldExpressionMap = metrics.getCalculates()
.stream()
.map(cal -> {
int splitIndex = cal.indexOf("=");
String field = cal.substring(0, splitIndex);
String expressionStr = cal.substring(splitIndex + 1);
Expression expression = AviatorEvaluator.compile(expressionStr, true);
return new Object[]{field, expression}; })
.collect(Collectors.toMap(arr -> (String)arr[0], arr -> (Expression) arr[1]));
List<Metrics.Field> fields = metrics.getFields();
List<String> aliasFields = metrics.getAliasFields();
Map<String, String> aliasFieldValueMap = new HashMap<>(16);
Map<String, Object> fieldValueMap = new HashMap<>(16);
CollectRep.ValueRow.Builder realValueRowBuilder = CollectRep.ValueRow.newBuilder();
for (CollectRep.ValueRow aliasRow : aliasRowList) {
for (int aliasIndex = 0; aliasIndex < aliasFields.size(); aliasIndex++) {
String aliasFieldValue = aliasRow.getColumns(aliasIndex);
if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) {
aliasFieldValueMap.put(aliasFields.get(aliasIndex), aliasFieldValue);
}
}
StringBuilder instanceBuilder = new StringBuilder();
for (Metrics.Field field : fields) {
String realField = field.getField();
Expression expression = fieldExpressionMap.get(realField);
String value = null;
if (expression != null) {
// 存在计算表达式 则计算值
if (CommonConstants.TYPE_NUMBER == field.getType()) {
for (String variable : expression.getVariableNames()) {
Double doubleValue = CommonUtil.parseDoubleStr(aliasFieldValueMap.get(variable));
if (doubleValue != null) {
fieldValueMap.put(variable, doubleValue);
}
}
} else {
for (String variable : expression.getVariableNames()) {
String strValue = aliasFieldValueMap.get(variable);
if (strValue != null && !"".equals(strValue)) {
fieldValueMap.put(variable, strValue);
}
}
}
try {
Object objValue = expression.execute(fieldValueMap);
if (objValue != null) {
value = String.valueOf(objValue);
}
} catch (Exception e) {
log.warn(e.getMessage());
}
} else {
// 不存在 则映射别名值
value = aliasFieldValueMap.get(realField);
}
if (value == null) {
value = CommonConstants.NULL_VALUE;
}
realValueRowBuilder.addColumns(value);
fieldValueMap.clear();
if (field.isInstance() && !CommonConstants.NULL_VALUE.equals(value)) {
instanceBuilder.append(value);
}
}
aliasFieldValueMap.clear();
// 设置实例instance
realValueRowBuilder.setInstance(instanceBuilder.toString());
collectData.addValues(realValueRowBuilder.build());
}
}
private boolean fastFailed() {
return this.timeout == null || this.timeout.isCancelled();
}
private CollectRep.MetricsData validateResponse(CollectRep.MetricsData.Builder builder) {
long endTime = System.currentTimeMillis();
builder.setTime(endTime);
log.debug("[Collect]: newTime: {}, startTime: {}, spendTime: {}.", newTime, startTime, endTime - startTime);
if (builder.getCode() != CollectRep.Code.SUCCESS) {
log.info("[Collect Fail] Reason: {}", builder.getMsg());
} else {
log.info("[Collect Success].");
}
return builder.build();
}
private void setNewThreadName(long monitorId, String app, long startTime, Metrics metrics) {
String builder = monitorId + "-" + app + "-" + metrics.getName() +
"-" + String.valueOf(startTime).substring(9);
Thread.currentThread().setName(builder);
}
@Override
public int compareTo(MetricsCollect collect) {
return runPriority - collect.runPriority;
}
}

View File

@@ -0,0 +1,32 @@
package com.usthe.collector.dispatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 待运行的job队列
* @author tomsun28
* @date 2021/10/10 20:20
*/
@Component
@Slf4j
public class MetricsCollectorQueue {
private final PriorityBlockingQueue<MetricsCollect> jobQueue;
public MetricsCollectorQueue() {
jobQueue = new PriorityBlockingQueue<>(2000);
}
public void addJob(MetricsCollect job) {
jobQueue.offer(job);
}
public MetricsCollect getJob() throws InterruptedException {
return jobQueue.poll(2, TimeUnit.SECONDS);
}
}

View File

@@ -0,0 +1,17 @@
package com.usthe.collector.dispatch;
import com.usthe.collector.dispatch.timer.Timeout;
/**
* 指标组采集任务调度器接口
* @author tomsun28
* @date 2021/11/2 11:19
*/
public interface MetricsTaskDispatch {
/**
* 调度
* @param timeout timeout
*/
void dispatchMetricsTask(Timeout timeout);
}

View File

@@ -0,0 +1,62 @@
package com.usthe.collector.dispatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 采集任务工作线程池
* @author tomsun28
* @date 2021/10/15 0:01
*/
@Component
@Slf4j
public class WorkerPool implements DisposableBean {
private ThreadPoolExecutor workerExecutor;
public WorkerPool() {
initWorkExecutor();
}
private void initWorkExecutor() {
// 线程工厂
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("workerExecutor has uncaughtException.");
log.error(throwable.getMessage(), throwable); })
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
800,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
/**
* 运行采集任务线程
* @param runnable 任务
* @throws RejectedExecutionException when 线程池满
*/
public void executeJob(Runnable runnable) throws RejectedExecutionException {
workerExecutor.execute(runnable);
}
@Override
public void destroy() throws Exception {
if (workerExecutor != null) {
workerExecutor.shutdownNow();
}
}
}

View File

@@ -0,0 +1,83 @@
package com.usthe.collector.dispatch.entrance.internal;
import com.usthe.collector.dispatch.timer.TimerDispatch;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.SnowFlakeIdGenerator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 采集job管理提供api接口
* @author tomsun28
* @date 2021/11/6 13:58
*/
@Service
@Slf4j
public class CollectJobService {
@Autowired
private TimerDispatch timerDispatch;
/**
* 执行一次性采集任务,获取采集数据响应
* @param job 采集任务详情
* @return 采集结果
*/
public List<CollectRep.MetricsData> collectSyncJobData(Job job) {
final List<CollectRep.MetricsData> metricsData = new LinkedList<>();
final CountDownLatch countDownLatch = new CountDownLatch(1);
CollectResponseEventListener listener = new CollectResponseEventListener() {
@Override
public void response(List<CollectRep.MetricsData> responseMetrics) {
if (responseMetrics != null) {
metricsData.addAll(responseMetrics);
}
countDownLatch.countDown();
}
};
timerDispatch.addJob(job, listener);
try {
countDownLatch.await(100, TimeUnit.SECONDS);
} catch (Exception e) {
log.info("同步任务运行100秒无响应返回");
}
return metricsData;
}
/**
* 下发周期性异步采集任务
* @param job 采集任务详情
* @return long 任务ID
*/
public long addAsyncCollectJob(Job job) {
long jobId = SnowFlakeIdGenerator.generateId();
job.setId(jobId);
timerDispatch.addJob(job, null);
return jobId;
}
/**
* 更新已经下发的周期性异步采集任务
* @param modifyJob 采集任务详情
*/
public void updateAsyncCollectJob(Job modifyJob) {
timerDispatch.deleteJob(modifyJob.getId(), true);
timerDispatch.addJob(modifyJob, null);
}
/**
* 取消周期性异步采集任务
* @param jobId 任务ID
*/
public void cancelAsyncCollectJob(Long jobId) {
timerDispatch.deleteJob(jobId, true);
}
}

View File

@@ -0,0 +1,20 @@
package com.usthe.collector.dispatch.entrance.internal;
import com.usthe.common.entity.message.CollectRep;
import java.util.EventListener;
import java.util.List;
/**
* 一次性采集任务响应结果监听器
* @author tomsun28
* @date 2021/11/16 10:09
*/
public interface CollectResponseEventListener extends EventListener {
/**
* 采集任务完成结果通知
* @param responseMetrics 响应数据
*/
default void response(List<CollectRep.MetricsData> responseMetrics) {}
}

View File

@@ -0,0 +1,56 @@
package com.usthe.collector.dispatch.export;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 采集数据消息发送
* @author tomsun28
* @date 2021/11/3 15:22
*/
@Component
@Slf4j
public class MetricsDataExporter implements DisposableBean {
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToAlertQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToWarehouseInfluxQueue;
private final LinkedBlockingQueue<CollectRep.MetricsData> metricsDataToWarehouseRedisQueue;
public MetricsDataExporter() {
metricsDataToAlertQueue = new LinkedBlockingQueue<>();
metricsDataToWarehouseInfluxQueue = new LinkedBlockingQueue<>();
metricsDataToWarehouseRedisQueue = new LinkedBlockingQueue<>();
}
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
}
public CollectRep.MetricsData pollWarehouseInfluxMetricsData() throws InterruptedException {
return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS);
}
public CollectRep.MetricsData pollWarehouseRedisMetricsData() throws InterruptedException {
return metricsDataToWarehouseRedisQueue.poll(2, TimeUnit.SECONDS);
}
/**
* 发送消息
* @param metricsData 指标组采集数据
*/
public void send(CollectRep.MetricsData metricsData) {
metricsDataToAlertQueue.offer(metricsData);
metricsDataToWarehouseInfluxQueue.offer(metricsData);
metricsDataToWarehouseRedisQueue.offer(metricsData);
}
@Override
public void destroy() throws Exception {
metricsDataToAlertQueue.clear();
}
}

View File

@@ -0,0 +1,809 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
* <h3>Tick Duration</h3>
* <p>
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
* them.
* <p>
* You can increase or decrease the accuracy of the execution timing by
* specifying smaller or larger tick duration in the constructor. In most
* network applications, I/O timeout does not need to be accurate. Therefore,
* the default tick duration is 100 milliseconds and you will not need to try
* different configurations in most cases.
*
* <h3>Ticks per Wheel (Wheel Size)</h3>
* <p>
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
* function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
* if you are going to schedule a lot of timeouts.
*
* <h3>Do not create many instances.</h3>
* <p>
* {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
* started. Therefore, you should make sure to create only one instance and
* share it across your application. One of the common mistakes, that makes
* your application unresponsive, is to create a new instance for every connection.
*
* <h3>Implementation Details</h3>
* <p>
* {@link HashedWheelTimer} is based on
* <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
* Tony Lauck's paper,
* <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'</a>. More comprehensive slides are located
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
* @author from netty | dubbo
*/
@SuppressWarnings("PMD")
public class HashedWheelTimer implements Timer {
private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final Worker worker = new Worker();
private final Thread workerThread;
private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;
/**
* 0 - init, 1 - started, 2 - shut down
*/
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
private volatile long startTime;
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}), default tick duration, and
* default number of ticks per wheel.
*/
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}) and default number of ticks
* per wheel.
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
/**
* Creates a new timer with the default thread factory
* ({@link Executors#defaultThreadFactory()}).
*
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
/**
* Creates a new timer with the default tick duration and default number of
* ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @throws NullPointerException if {@code threadFactory} is {@code null}
*/
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
/**
* Creates a new timer with the default number of ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if {@code tickDuration} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed if
* this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
// we have not yet shutdown then we want to make sure we decrement the active instance count.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
}
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = ticksPerWheel - 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
return normalizedTicksPerWheel + 1;
}
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
}
return worker.unprocessedTimeouts();
}
@Override
public boolean isStop() {
return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
/**
* Returns the number of pending timeouts of this {@link Timer}.
*/
public long pendingTimeouts() {
return pendingTimeouts.get();
}
private static void reportTooManyInstances() {
logger.error("You are creating too many HashedWheelTimer instances. " +
"HashedWheelTimer is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
}
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// Ensure we don't schedule for past.
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
*
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
private static final class HashedWheelTimeout implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
/**
* RemainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
* HashedWheelTimeout will be added to the correct HashedWheelBucket.
*/
long remainingRounds;
/**
* This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
* As only the workerThread will act on it there is no need for synchronization / volatile.
*/
HashedWheelTimeout next;
HashedWheelTimeout prev;
/**
* The bucket to which the timeout was added
*/
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
StringBuilder buf = new StringBuilder(192)
.append("HashedWheelTimer")
.append('(')
.append("deadline: ");
if (remaining > 0) {
buf.append(remaining)
.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
.append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(", task: ")
.append(task())
.append(')')
.toString();
}
}
/**
* Bucket that stores HashedWheelTimeouts. These are stored in a linked-list like datastructure to allow easy
* removal of HashedWheelTimeouts in the middle. Also the HashedWheelTimeout act as nodes themself and so no
* extra object creation is needed.
*/
private static final class HashedWheelBucket {
/**
* Used for the linked-list datastructure
*/
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds--;
}
timeout = next;
}
}
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
void clearTimeouts(Set<Timeout> set) {
for (; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
private static final boolean IS_OS_WINDOWS = System.getProperty("os.name", "").toLowerCase(Locale.US).contains("win");
private boolean isWindows() {
return IS_OS_WINDOWS;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
/**
* A handle associated with a {@link TimerTask} that is returned by a
* {@link Timer}.
* @author from netty | dubbo
*/
@SuppressWarnings("PMD")
public interface Timeout {
/**
* Returns the {@link Timer} that created this handle.
*/
Timer timer();
/**
* Returns the {@link TimerTask} which is associated with this handle.
*/
TimerTask task();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been expired.
*/
boolean isExpired();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been cancelled.
*/
boolean isCancelled();
/**
* Attempts to cancel the {@link TimerTask} associated with this handle.
* If the task has been executed or cancelled already, it will return with
* no side effect.
*
* @return True if the cancellation completed successfully, otherwise false
*/
boolean cancel();
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Schedules {@link TimerTask}s for one-time future execution in a background
* thread.
* @author from netty | dubbo
*/
public interface Timer {
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
*
* @param task the {@link TimerTask
* @param delay the delay
* @param unit the unit of time
* @return a handle which is associated with the specified task
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
* @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
* can cause instability in the system.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* Releases all resources acquired by this {@link Timer} and cancels all
* tasks which were scheduled but not executed yet.
*
* @return the handles associated with the tasks which were canceled by
* this method
*/
Set<Timeout> stop();
/**
* the timer is stop
*
* @return true for stop
*/
boolean isStop();
}

View File

@@ -0,0 +1,46 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.message.CollectRep;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 时间轮调度接口
* @author tomsun28
* @date 2021/10/17 22:14
*/
public interface TimerDispatch {
/**
* 增加新的job
* @param addJob job
* @param eventListener 一次性同步任务监听器异步任务不需要listener
*/
void addJob(Job addJob, CollectResponseEventListener eventListener);
/**
* 调度循环周期性job
* @param timerTask timerTask
* @param interval 开始调度的间隔时间
* @param timeUnit 时间单位
*/
void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit);
/**
* 删除存在的job
* @param jobId jobId
* @param isCyclic 是否是周期性任务,true是, false为临时性任务
*/
void deleteJob(long jobId, boolean isCyclic);
/**
* 一次性同步采集任务采集结果通知监听器
* @param jobId jobId
* @param metricsDataTemps 采集结果数据
*/
void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps);
}

View File

@@ -0,0 +1,95 @@
package com.usthe.collector.dispatch.timer;
import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.message.CollectRep;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author tomsun28
* @date 2021/10/17 23:06
*/
@Component
public class TimerDispatcher implements TimerDispatch {
/**
* 时间轮调度
*/
private Timer wheelTimer;
/**
* 已存在的周期性调度任务
*/
private Map<Long, Timeout> currentCyclicTaskMap;
/**
* 已存在的临时性调度任务
*/
private Map<Long, Timeout> currentTempTaskMap;
/**
* 一次性任务响应监听器持有
* jobId - listener
*/
private Map<Long, CollectResponseEventListener> eventListeners;
public TimerDispatcher() {
this.wheelTimer = new HashedWheelTimer(r -> {
Thread ret = new Thread(r, "wheelTimer");
ret.setDaemon(true);
return ret;
}, 10, TimeUnit.SECONDS, 512);
this.currentCyclicTaskMap = new ConcurrentHashMap<>(1024);
this.currentTempTaskMap = new ConcurrentHashMap<>(64);
eventListeners = new ConcurrentHashMap<>(64);
}
@Override
public void addJob(Job addJob, CollectResponseEventListener eventListener) {
WheelTimerTask timerJob = new WheelTimerTask(addJob);
if (addJob.isCyclic()) {
Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
currentCyclicTaskMap.put(addJob.getId(), timeout);
} else {
Timeout timeout = wheelTimer.newTimeout(timerJob, 0, TimeUnit.SECONDS);
currentTempTaskMap.put(addJob.getId(), timeout);
eventListeners.put(addJob.getId(), eventListener);
}
}
@Override
public void cyclicJob(WheelTimerTask timerTask, long interval, TimeUnit timeUnit) {
Long jobId = timerTask.getJob().getId();
// 判断此周期性job是否已经被取消
if (currentCyclicTaskMap.containsKey(jobId)) {
Timeout timeout = wheelTimer.newTimeout(timerTask, interval, TimeUnit.SECONDS);
currentCyclicTaskMap.put(timerTask.getJob().getId(), timeout);
}
}
@Override
public void deleteJob(long jobId, boolean isCyclic) {
if (isCyclic) {
Timeout timeout = currentCyclicTaskMap.remove(jobId);
if (timeout != null) {
timeout.cancel();
}
} else {
Timeout timeout = currentTempTaskMap.remove(jobId);
if (timeout != null) {
timeout.cancel();
}
}
}
@Override
public void responseSyncJobData(long jobId, List<CollectRep.MetricsData> metricsDataTemps) {
currentTempTaskMap.remove(jobId);
CollectResponseEventListener eventListener = eventListeners.remove(jobId);
if (eventListener != null) {
eventListener.response(metricsDataTemps);
}
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.usthe.collector.dispatch.timer;
import java.util.concurrent.TimeUnit;
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}.
* @author from netty | dubbo
*/
public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout a handle which is associated with this task
* @throws Exception when error happen
*/
void run(Timeout timeout) throws Exception;
}

View File

@@ -0,0 +1,125 @@
package com.usthe.collector.dispatch.timer;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.usthe.collector.dispatch.MetricsTaskDispatch;
import com.usthe.collector.util.SpringContextHolder;
import com.usthe.common.entity.job.Configmap;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* TimerTask实现
* @author tomsun28
* @date 2021/11/1 17:18
*/
public class WheelTimerTask implements TimerTask {
private final Job job;
private final MetricsTaskDispatch metricsTaskDispatch;
private static final Gson GSON = new Gson();
public WheelTimerTask(Job job) {
this.metricsTaskDispatch = SpringContextHolder.getBean(MetricsTaskDispatch.class);
this.job = job;
// 初始化job 将监控实际参数值对采集字段进行替换
initJobMetrics(job);
}
/**
* 初始化job填充信息
* @param job job
*/
private void initJobMetrics(Job job) {
// 将监控实际参数值对采集字段进行替换
List<Configmap> config = job.getConfigmap();
Map<String, Configmap> configmap = config.stream().collect(Collectors.toMap(Configmap::getKey, item -> item));
List<Metrics> metrics = job.getMetrics();
List<Metrics> metricsTmp = new ArrayList<>(metrics.size());
for (Metrics metric : metrics) {
JsonElement jsonElement = GSON.toJsonTree(metric);
jsonElement = replaceSpecialValue(jsonElement, configmap);
metric = GSON.fromJson(jsonElement, Metrics.class);
metricsTmp.add(metric);
}
job.setMetrics(metricsTmp);
}
/**
* json参数替换
* @param jsonElement json
* @param configmap 参数map
* @return json
*/
private JsonElement replaceSpecialValue(JsonElement jsonElement, Map<String, Configmap> configmap) {
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
Iterator<Map.Entry<String, JsonElement>> iterator = jsonObject.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, JsonElement> entry = iterator.next();
JsonElement element = entry.getValue();
if (element.isJsonPrimitive()) {
// 判断是否含有特殊字符 替换
String value = element.getAsString();
if (value.startsWith("^_^") && value.endsWith("^_^")) {
value = value.replaceAll("\\^_\\^", "");
Configmap param = configmap.get(value);
if (param != null) {
value = (String) param.getValue();
jsonObject.addProperty(entry.getKey(), value);
} else {
iterator.remove();
}
}
} else {
jsonObject.add(entry.getKey(), replaceSpecialValue(entry.getValue(), configmap));
}
}
} else if (jsonElement.isJsonArray()) {
JsonArray jsonArray = jsonElement.getAsJsonArray();
Iterator<JsonElement> iterator = jsonArray.iterator();
int index = 0;
while (iterator.hasNext()) {
JsonElement element = iterator.next();
if (element.isJsonPrimitive()) {
// 判断是否含有特殊字符 替换
String value = element.getAsString();
if (value.startsWith("^_^") && value.endsWith("^_^")) {
value = value.replaceAll("\\^_\\^", "");
Configmap param = configmap.get(value);
if (param != null) {
value = (String) param.getValue();
jsonArray.set(index, new JsonPrimitive(value));
} else {
iterator.remove();
}
}
} else {
jsonArray.set(index, replaceSpecialValue(element, configmap));
}
index++;
}
}
return jsonElement;
}
@Override
public void run(Timeout timeout) throws Exception {
job.setDispatchTime(System.currentTimeMillis());
metricsTaskDispatch.dispatchMetricsTask(timeout);
}
public Job getJob() {
return job;
}
}

View File

@@ -0,0 +1,11 @@
package com.usthe.collector.util;
/**
* collector 常量
* @author tom
* @date 2021/12/3 12:15
*/
public interface CollectorConstants {
String RESPONSE_TIME = "responseTime";
}

View File

@@ -0,0 +1,44 @@
package com.usthe.collector.util;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.spi.cache.CacheProvider;
import com.jayway.jsonpath.spi.cache.LRUCache;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* json path parser
* @author tomsun28
* @date 2021/11/20 10:16
*/
public class JsonPathParser {
private static final ParseContext PARSER;
static {
Configuration conf = Configuration.defaultConfiguration()
.addOptions(Option.DEFAULT_PATH_LEAF_TO_NULL)
.addOptions(Option.ALWAYS_RETURN_LIST);
CacheProvider.setCache(new LRUCache(128));
PARSER = JsonPath.using(conf);
}
/**
* 使用jsonPath来解析json内容
* @param content json内容
* @param jsonPath jsonPath脚本
* @return 解析后的内容
*/
public static List<Map<String, Object>> parseContentWithJsonPath(String content, String jsonPath) {
if (content == null || jsonPath == null || "".equals(content) || "".equals(jsonPath)) {
return Collections.emptyList();
}
return PARSER.parse(content).read(jsonPath);
}
}

View File

@@ -0,0 +1,48 @@
package com.usthe.collector.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean
* @author tomsun28
* @date 21:07 2018/4/18
*/
@Component
public class SpringContextHolder implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
set(applicationContext);
}
private static void set(ApplicationContext applicationContext) {
SpringContextHolder.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
assertApplicationContext();
return applicationContext;
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String beanName) {
assertApplicationContext();
return (T) applicationContext.getBean(beanName);
}
public static <T> T getBean(Class<T> tClass) {
assertApplicationContext();
return (T) applicationContext.getBean(tClass);
}
private static void assertApplicationContext() {
if (null == SpringContextHolder.applicationContext) {
throw new RuntimeException("applicationContext为空,请检查是否注入springContextHolder");
}
}
}

View File

@@ -0,0 +1,9 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.usthe.collector.dispatch.timer.TimerDispatcher,\
com.usthe.collector.dispatch.CommonDispatcher,\
com.usthe.collector.dispatch.DispatchProperties,\
com.usthe.collector.dispatch.MetricsCollectorQueue,\
com.usthe.collector.dispatch.WorkerPool,\
com.usthe.collector.dispatch.entrance.internal.CollectJobService,\
com.usthe.collector.dispatch.export.MetricsDataExporter,\
com.usthe.collector.util.SpringContextHolder

View File

@@ -0,0 +1,37 @@
package com.usthe.collector.collect.telnet;
import org.apache.commons.net.telnet.TelnetClient;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.*;
/**
* @author tom
* @date 2021/12/4 19:39
*/
class TelnetCollectImplTest {
@Test
void telnet() {
TelnetClient telnetClient = null;
try {
telnetClient = new TelnetClient("vt200");
telnetClient.setConnectTimeout(5000);
TelnetClient finalTelnetClient = telnetClient;
assertDoesNotThrow(() -> finalTelnetClient.connect("baidu.com",80));
telnetClient.disconnect();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (telnetClient != null) {
try {
telnetClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>manager</artifactId>
@@ -22,25 +22,25 @@
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- scheduler -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>scheduler</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<!-- data warehouse -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>warehouse</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<!-- alerter -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>alerter</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<!-- collector -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>collector</artifactId>
<version>1.0</version>
</dependency>
<!-- spring -->
<dependency>
@@ -123,6 +123,7 @@
<include>sureness.yml</include>
<include>banner.txt</include>
<include>db/**</include>
<include>define/**</include>
</includes>
</resource>
</resources>
@@ -167,7 +168,7 @@
</goals>
<configuration>
<descriptors>
<descriptor>../assembly/server/assembly.xml</descriptor>
<descriptor>../script/assembly/server/assembly.xml</descriptor>
</descriptors>
</configuration>
</execution>

View File

@@ -1,5 +1,6 @@
package com.usthe.manager.service.impl;
import com.usthe.collector.dispatch.entrance.internal.CollectJobService;
import com.usthe.common.entity.job.Configmap;
import com.usthe.common.entity.job.Job;
import com.usthe.common.entity.job.Metrics;
@@ -20,7 +21,6 @@ import com.usthe.manager.service.AppService;
import com.usthe.manager.service.MonitorService;
import com.usthe.manager.support.exception.MonitorDatabaseException;
import com.usthe.manager.support.exception.MonitorDetectException;
import com.usthe.scheduler.JobScheduling;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
@@ -52,7 +52,7 @@ public class MonitorServiceImpl implements MonitorService {
private AppService appService;
@Autowired
private JobScheduling jobScheduling;
private CollectJobService collectJobService;
@Autowired
private MonitorDao monitorDao;
@@ -74,7 +74,7 @@ public class MonitorServiceImpl implements MonitorService {
List<Configmap> configmaps = params.stream().map(param ->
new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
List<CollectRep.MetricsData> collectRep = jobScheduling.addSyncCollectJob(appDefine);
List<CollectRep.MetricsData> collectRep = collectJobService.collectSyncJobData(appDefine);
// 判断探测结果 失败则抛出探测异常
if (collectRep == null || collectRep.isEmpty()) {
throw new MonitorDetectException("No collector response");
@@ -101,7 +101,7 @@ public class MonitorServiceImpl implements MonitorService {
}).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// 下发采集任务得到jobId
long jobId = jobScheduling.addAsyncCollectJob(appDefine);
long jobId = collectJobService.addAsyncCollectJob(appDefine);
// 下发成功后刷库
try {
monitor.setId(monitorId);
@@ -112,7 +112,7 @@ public class MonitorServiceImpl implements MonitorService {
} catch (Exception e) {
log.error(e.getMessage(), e);
// 刷库异常取消之前的下发任务
jobScheduling.cancelAsyncCollectJob(jobId);
collectJobService.cancelAsyncCollectJob(jobId);
throw new MonitorDatabaseException(e.getMessage());
}
}
@@ -225,7 +225,7 @@ public class MonitorServiceImpl implements MonitorService {
new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// 更新采集任务
jobScheduling.updateAsyncCollectJob(appDefine);
collectJobService.updateAsyncCollectJob(appDefine);
// 下发更新成功后刷库
try {
monitor.setJobId(preMonitor.getJobId());
@@ -246,7 +246,7 @@ public class MonitorServiceImpl implements MonitorService {
Monitor monitor = monitorOptional.get();
monitorDao.deleteById(id);
paramDao.deleteParamsByMonitorId(id);
jobScheduling.cancelAsyncCollectJob(monitor.getJobId());
collectJobService.cancelAsyncCollectJob(monitor.getJobId());
}
}
@@ -258,7 +258,7 @@ public class MonitorServiceImpl implements MonitorService {
monitorDao.deleteAll(monitors);
paramDao.deleteParamsByMonitorIdIn(ids);
for (Monitor monitor : monitors) {
jobScheduling.cancelAsyncCollectJob(monitor.getJobId());
collectJobService.cancelAsyncCollectJob(monitor.getJobId());
}
}
}
@@ -299,7 +299,7 @@ public class MonitorServiceImpl implements MonitorService {
if (!managedMonitors.isEmpty()) {
monitorDao.saveAll(managedMonitors);
for (Monitor monitor : managedMonitors) {
jobScheduling.cancelAsyncCollectJob(monitor.getJobId());
collectJobService.cancelAsyncCollectJob(monitor.getJobId());
}
}
}
@@ -326,7 +326,7 @@ public class MonitorServiceImpl implements MonitorService {
new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// 下发采集任务
jobScheduling.addAsyncCollectJob(appDefine);
collectJobService.addAsyncCollectJob(appDefine);
}
}
}

View File

@@ -4,7 +4,6 @@ package com.usthe.manager.support;
import com.usthe.common.entity.dto.Message;
import com.usthe.manager.support.exception.MonitorDatabaseException;
import com.usthe.manager.support.exception.MonitorDetectException;
import com.usthe.scheduler.ScheduleException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DataAccessException;
import org.springframework.http.HttpStatus;
@@ -133,23 +132,6 @@ public class GlobalExceptionHandler {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message);
}
/**
* 处理分发调度器异常问题
* @param exception 调度器异常问题
* @return response
*/
@ExceptionHandler(ScheduleException.class)
@ResponseBody
ResponseEntity<Message<Void>> handleScheduleException(ScheduleException exception) {
String errorMessage = "scheduler warning";
if (exception != null) {
errorMessage = exception.getMessage();
}
log.warn("[scheduler warning]-{}", errorMessage);
Message<Void> message = Message.<Void>builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build();
return ResponseEntity.status(HttpStatus.CONFLICT).body(message);
}
/**
* handler the exception thrown for datastore error
* @param exception datastore exception

View File

@@ -7,9 +7,8 @@
<groupId>com.usthe.tancloud</groupId>
<artifactId>monitor</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
<modules>
<module>scheduler</module>
<module>manager</module>
<module>alerter</module>
<module>common</module>

View File

@@ -0,0 +1,9 @@
#!/bin/bash
cd ../../web-app
ng build --prod --base-href /console/
cd ..
mvn clean package

View File

@@ -22,8 +22,11 @@ http://maven.apache.org/ASSEMBLY/2.0.0 ">
<fileSets>
<!--bin文件下的所有脚本文件输出到打包后的bin目录下-->
<fileSet>
<directory>../assembly/server/bin</directory>
<directory>../script/assembly/server/bin</directory>
<!-- 是否进行属性替换 即使用 ${project.artifactId} -->
<filtered>true</filtered>
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
<!-- src/main/resources目录下配置文件打包到config目录下 -->

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>monitor</artifactId>
<groupId>com.usthe.tancloud</groupId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -16,7 +16,14 @@
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0</version>
</dependency>
<!-- collector -->
<dependency>
<groupId>com.usthe.tancloud</groupId>
<artifactId>collector</artifactId>
<version>1.0</version>
<scope>provided</scope>
</dependency>
<!-- spring -->
<dependency>

View File

@@ -1,81 +0,0 @@
package com.usthe.warehouse.entrance;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.warehouse.MetricsDataQueue;
import com.usthe.warehouse.WarehouseProperties;
import com.usthe.warehouse.WarehouseWorkerPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 从Kafka消费指标组采集数据处理
* @author tom
* @date 2021/11/24 18:03
*/
@Configuration
@AutoConfigureAfter(value = {WarehouseProperties.class})
@ConditionalOnProperty(prefix = "warehouse.entrance.kafka",
name = "enabled", havingValue = "true", matchIfMissing = true)
@Slf4j
public class KafkaDataConsume implements DisposableBean {
private KafkaConsumer<Long, CollectRep.MetricsData> consumer;
private WarehouseWorkerPool workerPool;
private MetricsDataQueue dataQueue;
public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool,
MetricsDataQueue dataQueue) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
initConsumer(properties);
startConsumeData();
}
private void startConsumeData() {
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-kafka-data-consumer");
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<Long, CollectRep.MetricsData> records = consumer.poll(Duration.ofMillis(100));
records.forEach(record -> {
dataQueue.addMetricsDataToInflux(record.value());
dataQueue.addMetricsDataToRedis(record.value());
});
}
};
workerPool.executeJob(runnable);
}
private void initConsumer(WarehouseProperties properties) {
if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) {
log.error("init error, please config Warehouse kafka props in application.yml");
throw new IllegalArgumentException("please config Warehouse kafka props");
}
WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka();
Properties consumerProp = new Properties();
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers());
consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId());
consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class);
consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
consumer = new KafkaConsumer<>(consumerProp);
consumer.subscribe(Collections.singleton(kafkaProp.getTopic()));
}
@Override
public void destroy() throws Exception {
if (consumer != null) {
consumer.close();
}
}
}

View File

@@ -1,24 +0,0 @@
package com.usthe.warehouse.entrance;
import com.usthe.common.entity.message.CollectRep;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
/**
* MetricsData的反序列化
* @author tom
* @date 2021/11/24 17:29
*/
@Slf4j
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {
@Override
public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) {
try {
return CollectRep.MetricsData.parseFrom(bytes);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}

View File

@@ -1,134 +0,0 @@
package com.usthe.warehouse.store;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.common.util.CommonConstants;
import com.usthe.warehouse.MetricsDataQueue;
import com.usthe.warehouse.WarehouseProperties;
import com.usthe.warehouse.WarehouseWorkerPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import java.time.Instant;
import java.util.List;
/**
* influxdb存储采集数据
* @author tom
* @date 2021/11/24 18:23
*/
@Configuration
@AutoConfigureAfter(value = {WarehouseProperties.class})
@ConditionalOnProperty(prefix = "warehouse.store.influxdb",
name = "enabled", havingValue = "true", matchIfMissing = true)
@Slf4j
public class InfluxdbDataStorage implements DisposableBean {
private InfluxDBClient influxClient;
private WriteApi writeApi;
private WarehouseWorkerPool workerPool;
private MetricsDataQueue dataQueue;
public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
MetricsDataQueue dataQueue) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
initInfluxDbClient(properties);
startStorageData();
}
private void startStorageData() {
Runnable runnable = () -> {
Thread.currentThread().setName("warehouse-influxdb-data-storage");
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData();
if (metricsData != null) {
saveData(metricsData);
}
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
};
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
}
private void initInfluxDbClient(WarehouseProperties properties) {
if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) {
log.error("init error, please config Warehouse influxdb props in application.yml");
throw new IllegalArgumentException("please config Warehouse influxdb props");
}
WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb();
influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(),
influxdbProp.getOrg(), influxdbProp.getBucket());
WriteOptions writeOptions = WriteOptions.builder()
.batchSize(1000)
.bufferLimit(1000)
.jitterInterval(1000)
.retryInterval(5000)
.build();
writeApi = influxClient.makeWriteApi(writeOptions);
}
public void saveData(CollectRep.MetricsData metricsData) {
String measurement = metricsData.getApp() + "_" + metricsData.getMetrics();
String monitorId = String.valueOf(metricsData.getId());
Instant collectTime = Instant.ofEpochMilli(metricsData.getTime());
List<CollectRep.Field> fields = metricsData.getFieldsList();
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Point point = Point.measurement(measurement)
.addTag("id", monitorId)
.addTag("instance", valueRow.getInstance())
.time(collectTime, WritePrecision.MS);
for (int index = 0; index < fields.size(); index++) {
CollectRep.Field field = fields.get(index);
String value = valueRow.getColumns(index);
if (field.getType() == CommonConstants.TYPE_NUMBER) {
// number data
if (CommonConstants.NULL_VALUE.equals(value)) {
point.addField(field.getName(), (Number) null);
} else {
try {
double number = Double.parseDouble(value);
point.addField(field.getName(), number);
} catch (Exception e) {
log.warn(e.getMessage());
point.addField(field.getName(), (Number) null);
}
}
} else {
// string
if (CommonConstants.NULL_VALUE.equals(value)) {
point.addField(field.getName(), (String) null);
} else {
point.addField(field.getName(), value);
}
}
}
writeApi.writePoint(point);
}
}
@Override
public void destroy() throws Exception {
if (writeApi != null) {
writeApi.close();
}
if (influxClient != null) {
influxClient.close();
}
}
}

View File

@@ -1,7 +1,7 @@
package com.usthe.warehouse.store;
import com.usthe.collector.dispatch.export.MetricsDataExporter;
import com.usthe.common.entity.message.CollectRep;
import com.usthe.warehouse.MetricsDataQueue;
import com.usthe.warehouse.WarehouseProperties;
import com.usthe.warehouse.WarehouseWorkerPool;
import io.lettuce.core.RedisClient;
@@ -34,13 +34,12 @@ public class RedisDataStorage implements DisposableBean {
private RedisClient redisClient;
private StatefulRedisConnection<String, CollectRep.MetricsData> connection;
private WarehouseWorkerPool workerPool;
private MetricsDataQueue dataQueue;
private MetricsDataExporter dataExporter;
public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
MetricsDataQueue dataQueue) {
MetricsDataExporter dataExporter) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.dataExporter = dataExporter;
initRedisClient(properties);
startStorageData();
}
@@ -55,7 +54,7 @@ public class RedisDataStorage implements DisposableBean {
Thread.currentThread().setName("warehouse-redis-data-storage");
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData();
CollectRep.MetricsData metricsData = dataExporter.pollWarehouseRedisMetricsData();
if (metricsData != null) {
saveData(metricsData);
}

View File

@@ -2,7 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.usthe.warehouse.WarehouseProperties,\
com.usthe.warehouse.MetricsDataQueue,\
com.usthe.warehouse.WarehouseWorkerPool,\
com.usthe.warehouse.entrance.KafkaDataConsume,\
com.usthe.warehouse.store.InfluxdbDataStorage,\
com.usthe.warehouse.store.RedisDataStorage,\
com.usthe.warehouse.controller.MetricsDataController