diff --git a/Architecture.jpg b/Architecture.jpg deleted file mode 100644 index f7e60a4..0000000 Binary files a/Architecture.jpg and /dev/null differ diff --git a/README.md b/README.md index 5403097..8e44e38 100644 --- a/README.md +++ b/README.md @@ -4,16 +4,12 @@ ### 模块 - **[manager](manager)** 提供监控管理,系统管理基础服务 -> 开发中,提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。 +> 提供对监控的管理,监控应用配置的管理,系统用户租户后台管理等。 - **[collector](collector)** 提供监控数据采集服务 -> 开发中,使用通用协议远程采集获取对端指标数据。 +> 使用通用协议远程采集获取对端指标数据。 - **[scheduler](scheduler)** 提供监控任务调度服务 -> 开发完成,采集任务管理,一次性任务和周期性任务的调度分发。 +> 采集任务管理,一次性任务和周期性任务的调度分发。 - **[warehouse](warehouse)** 提供监控数据仓储服务 -> 开发中,采集指标结果数据管理,数据落盘,查询,计算统计。 +> 采集指标结果数据管理,数据落盘,查询,计算统计。 - **[alerter](alerter)** 提供告警服务 -> 开发中,告警计算触发,监控状态联动,告警配置,告警通知。 - -### 结构 - -![arch](Architecture.jpg) \ No newline at end of file +> 告警计算触发,监控状态联动,告警配置,告警通知。 diff --git a/alerter/pom.xml b/alerter/pom.xml index f216a80..84eee54 100644 --- a/alerter/pom.xml +++ b/alerter/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 @@ -20,7 +20,14 @@ com.usthe.tancloud common - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 + provided diff --git a/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java index 6d92427..562c17a 100644 --- a/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java +++ b/alerter/src/main/java/com/usthe/alert/AlerterDataQueue.java @@ -1,7 +1,6 @@ package com.usthe.alert; import com.usthe.alert.pojo.entity.Alert; -import com.usthe.common.entity.message.CollectRep; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -17,22 +16,12 @@ import java.util.concurrent.TimeUnit; @Slf4j public class AlerterDataQueue { - private final LinkedBlockingQueue metricsDataQueue; private final LinkedBlockingQueue alertDataQueue; public AlerterDataQueue() { - metricsDataQueue = new LinkedBlockingQueue<>(); alertDataQueue = new LinkedBlockingQueue<>(); } - public void addMetricsData(CollectRep.MetricsData metricsData) { - metricsDataQueue.offer(metricsData); - } - - public CollectRep.MetricsData pollMetricsData() throws InterruptedException { - return metricsDataQueue.poll(2, TimeUnit.SECONDS); - } - public void addAlertData(Alert alert) { alertDataQueue.offer(alert); } diff --git a/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java index 357f106..1a2cb3a 100644 --- a/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java +++ b/alerter/src/main/java/com/usthe/alert/calculate/CalculateAlarm.java @@ -4,16 +4,15 @@ import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; import com.usthe.alert.AlerterWorkerPool; import com.usthe.alert.AlerterDataQueue; -import com.usthe.alert.entrance.KafkaDataConsume; import com.usthe.alert.pojo.entity.Alert; import com.usthe.alert.pojo.entity.AlertDefine; import com.usthe.alert.service.AlertDefineService; import com.usthe.alert.util.AlertTemplateUtil; +import com.usthe.collector.dispatch.export.MetricsDataExporter; import com.usthe.common.entity.message.CollectRep; import com.usthe.common.util.CommonConstants; import com.usthe.common.util.CommonUtil; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @@ -27,20 +26,21 @@ import java.util.concurrent.ConcurrentHashMap; * @date 2021/12/9 14:19 */ @Configuration -@AutoConfigureAfter(value = {KafkaDataConsume.class}) @Slf4j public class CalculateAlarm { private AlerterWorkerPool workerPool; private AlerterDataQueue dataQueue; + private MetricsDataExporter dataExporter; private AlertDefineService alertDefineService; private Map triggeredAlertMap; private Map triggeredMonitorStateAlertMap; public CalculateAlarm (AlerterWorkerPool workerPool, AlerterDataQueue dataQueue, - AlertDefineService alertDefineService) { + AlertDefineService alertDefineService, MetricsDataExporter dataExporter) { this.workerPool = workerPool; this.dataQueue = dataQueue; + this.dataExporter = dataExporter; this.alertDefineService = alertDefineService; this.triggeredAlertMap = new ConcurrentHashMap<>(128); this.triggeredMonitorStateAlertMap = new ConcurrentHashMap<>(128); @@ -51,7 +51,7 @@ public class CalculateAlarm { Runnable runnable = () -> { while (!Thread.currentThread().isInterrupted()) { try { - CollectRep.MetricsData metricsData = dataQueue.pollMetricsData(); + CollectRep.MetricsData metricsData = dataExporter.pollAlertMetricsData(); if (metricsData != null) { calculate(metricsData); } diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java deleted file mode 100644 index ef16143..0000000 --- a/alerter/src/main/java/com/usthe/alert/entrance/KafkaDataConsume.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.usthe.alert.entrance; - -import com.usthe.alert.AlerterProperties; -import com.usthe.alert.AlerterWorkerPool; -import com.usthe.alert.AlerterDataQueue; -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * 从Kafka消费指标组采集数据处理 - * @author tom - * @date 2021/11/24 18:03 - */ -@Configuration -@AutoConfigureAfter(value = {AlerterProperties.class}) -@ConditionalOnProperty(prefix = "alerter.entrance.kafka", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class KafkaDataConsume implements DisposableBean { - - private KafkaConsumer consumer; - private AlerterWorkerPool workerPool; - private AlerterDataQueue dataQueue; - public KafkaDataConsume(AlerterProperties properties, AlerterWorkerPool workerPool, - AlerterDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initConsumer(properties); - startConsumeData(); - } - - private void startConsumeData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-kafka-data-consumer"); - while (!Thread.currentThread().isInterrupted()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - dataQueue.addMetricsData(record.value()); - }); - } - }; - workerPool.executeJob(runnable); - } - - private void initConsumer(AlerterProperties properties) { - if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { - log.error("init error, please config Warehouse kafka props in application.yml"); - throw new IllegalArgumentException("please config Warehouse kafka props"); - } - AlerterProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); - Properties consumerProp = new Properties(); - consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); - consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); - consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); - consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); - consumer = new KafkaConsumer<>(consumerProp); - consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); - } - - @Override - public void destroy() throws Exception { - if (consumer != null) { - consumer.close(); - } - } -} diff --git a/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java b/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java deleted file mode 100644 index c0ab7d6..0000000 --- a/alerter/src/main/java/com/usthe/alert/entrance/KafkaMetricsDataDeserializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.usthe.alert.entrance; - -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * MetricsData的反序列化 - * @author tom - * @date 2021/11/24 17:29 - */ -@Slf4j -public class KafkaMetricsDataDeserializer implements Deserializer { - - @Override - public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { - try { - return CollectRep.MetricsData.parseFrom(bytes); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } -} diff --git a/alerter/src/main/resources/META-INF/spring.factories b/alerter/src/main/resources/META-INF/spring.factories index 30adc51..56463db 100644 --- a/alerter/src/main/resources/META-INF/spring.factories +++ b/alerter/src/main/resources/META-INF/spring.factories @@ -6,7 +6,6 @@ com.usthe.alert.AlerterWorkerPool,\ com.usthe.alert.AlerterProperties,\ com.usthe.alert.AlerterDataQueue,\ com.usthe.alert.AlerterConfiguration,\ -com.usthe.alert.entrance.KafkaDataConsume,\ com.usthe.alert.calculate.CalculateAlarm,\ com.usthe.alert.controller.AlertsController,\ com.usthe.alert.controller.AlertDefinesController \ No newline at end of file diff --git a/assembly/collector/assembly.xml b/assembly/collector/assembly.xml deleted file mode 100644 index 5978e38..0000000 --- a/assembly/collector/assembly.xml +++ /dev/null @@ -1,44 +0,0 @@ - - - 1.0 - - - tar.gz - zip - - - - - - - ../../assembly/collector/bin - - true - bin - - - - - src/main/resources - - application.yml - logback-spring.xml - - - true - ${file.separator}config - - - - - target - / - - *executable.jar - - - - diff --git a/assembly/collector/bin/startup.sh b/assembly/collector/bin/startup.sh deleted file mode 100644 index f81450d..0000000 --- a/assembly/collector/bin/startup.sh +++ /dev/null @@ -1,109 +0,0 @@ -#!/bin/bash - -# 项目名称 -SERVER_NAME="${project.artifactId}" - -# jar名称 -JAR_NAME="${project.build.finalName}-executable.jar" - -# 进入bin目录 -cd `dirname $0` -# bin目录绝对路径 -BIN_DIR=`pwd` -# 返回到上一级项目根目录路径 -cd .. -# 打印项目根目录绝对路径 -# `pwd` 执行系统命令并获得结果 -DEPLOY_DIR=`pwd` - -# 外部配置文件绝对目录,如果是目录需要/结尾,也可以直接指定文件 -# 如果指定的是目录,spring则会读取目录中的所有配置文件 -CONF_DIR=$DEPLOY_DIR/config -# SERVER_PORT=`sed '/server.port/!d;s/.*=//' config/application.properties | tr -d '\r'` -# 获取应用的端口号 -SERVER_PORT=`sed -nr '/port: [0-9]+/ s/.*port: +([0-9]+).*/\1/p' config/application.yml` - -PIDS=`ps -f | grep java | grep "$CONF_DIR" |awk '{print $2}'` -if [ "$1" = "status" ]; then - if [ -n "$PIDS" ]; then - echo "The $SERVER_NAME is running...!" - echo "PID: $PIDS" - exit 0 - else - echo "The $SERVER_NAME is stopped" - exit 0 - fi -fi - -if [ -n "$PIDS" ]; then - echo "ERROR: The $SERVER_NAME already started!" - echo "PID: $PIDS" - exit 1 -fi - -if [ -n "$SERVER_PORT" ]; then - SERVER_PORT_COUNT=`netstat -tln | grep $SERVER_PORT | wc -l` - if [ $SERVER_PORT_COUNT -gt 0 ]; then - echo "ERROR: The $SERVER_NAME port $SERVER_PORT already used!" - exit 1 - fi -fi - -# 项目日志输出绝对路径 -LOGS_DIR=$DEPLOY_DIR/logs -# 如果logs文件夹不存在,则创建文件夹 -if [ ! -d $LOGS_DIR ]; then - mkdir $LOGS_DIR -fi -STDOUT_FILE=$LOGS_DIR/catalina.log - -# JVM Configuration -JAVA_OPTS=" -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true " -JAVA_DEBUG_OPTS="" -if [ "$1" = "debug" ]; then - JAVA_DEBUG_OPTS=" -Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n " -fi - -JAVA_JMX_OPTS="" -if [ "$1" = "jmx" ]; then - JAVA_JMX_OPTS=" -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false " -fi - -JAVA_MEM_OPTS="" -BITS=`java -version 2>&1 | grep -i 64-bit` -if [ -n "$BITS" ]; then - JAVA_MEM_OPTS=" -server -Xmx512m -Xms512m -Xmn256m -XX:PermSize=128m -Xss256k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 " -else - JAVA_MEM_OPTS=" -server -Xms512m -Xmx512m -XX:PermSize=128m -XX:SurvivorRatio=2 -XX:+UseParallelGC " -fi - -# 加载外部log4j2文件的配置 -LOG_IMPL_FILE=log4j2.xml -LOGGING_CONFIG="" -if [ -f "$CONF_DIR/$LOG_IMPL_FILE" ] -then - LOGGING_CONFIG="-Dlogging.config=$CONF_DIR/$LOG_IMPL_FILE" -fi -CONFIG_FILES=" -Dlogging.path=$LOGS_DIR $LOGGING_CONFIG -Dspring.config.location=$CONF_DIR/ " -echo -e "Starting the $SERVER_NAME ..." -nohup java $JAVA_OPTS $JAVA_MEM_OPTS $JAVA_DEBUG_OPTS $JAVA_JMX_OPTS $CONFIG_FILES -jar $DEPLOY_DIR/lib/$JAR_NAME > $STDOUT_FILE 2>&1 & - -COUNT=0 -while [ $COUNT -lt 1 ]; do - echo -e ".\c" - sleep 1 - if [ -n "$SERVER_PORT" ]; then - COUNT=`netstat -an | grep $SERVER_PORT | wc -l` - else - COUNT=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}' | wc -l` - fi - if [ $COUNT -gt 0 ]; then - break - fi -done - - -echo "OK!" -PIDS=`ps -f | grep java | grep "$DEPLOY_DIR" | awk '{print $2}'` -echo "PID: $PIDS" -echo "STDOUT: $STDOUT_FILE" \ No newline at end of file diff --git a/assembly/server/bin/shutdown.sh b/assembly/server/bin/shutdown.sh deleted file mode 100644 index 28a5e2e..0000000 --- a/assembly/server/bin/shutdown.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# 项目名称 -APPLICATION="${project.artifactId}" - -# 项目启动jar包名称 -APPLICATION_JAR="${project.build.finalName}.jar" - -# 通过项目名称查找到PI,然后kill -9 pid -PID=$(ps -ef | grep "${APPLICATION_JAR}" | grep -v grep | awk '{ print $2 }') -if [[ -z "$PID" ]] -then - echo ${APPLICATION} is already stopped -else - echo kill ${PID} - kill -9 ${PID} - echo ${APPLICATION} stopped successfully -fi \ No newline at end of file diff --git a/collector/README.md b/collector/README.md index 5d35f75..304c58c 100644 --- a/collector/README.md +++ b/collector/README.md @@ -31,18 +31,6 @@ * Ping * 服务端口 -#### HELP - -1. ARK插件类隔离未生效 -> 注意需构建在jdk1.8环境中运行 -> 插件是否配置导入并配置正确 -> 本地DEBUG时需单独IDEA打开运行collector工程,不能将plugin和collector在同一工程打开运行 - -2. metaspace元空间内存占用多或溢出 -> 建议调整JVM参数 ```-Dsun.reflect.inflationThreshold=100000``` -> 由于使用太多反射,超过参数`inflationThreshold`默认值15阈值导致触发JVM反射优化(加快反射速度), -> 反射获取类信息由使用*JNI存取器**膨胀(Inflation)* -> 为*反射每个方法生成一个类加载器DelegatingClassLoader和Java类MethodAccessor*. -> 动态加载的字节码导致PermGen持续增长. +#### HELP diff --git a/collector/plugins/pom.xml b/collector/plugins/pom.xml deleted file mode 100644 index 14c317b..0000000 --- a/collector/plugins/pom.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - collector - com.usthe.tancloud - 1.0-SNAPSHOT - - 4.0.0 - - plugins - pom - - sample-plugin - - - - \ No newline at end of file diff --git a/collector/plugins/sample-plugin/pom.xml b/collector/plugins/sample-plugin/pom.xml deleted file mode 100644 index 6ce90c0..0000000 --- a/collector/plugins/sample-plugin/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - plugins - com.usthe.tancloud - 1.0-SNAPSHOT - - 4.0.0 - - sample-plugin - - - - - - com.alipay.sofa - sofa-ark-plugin-maven-plugin - 1.1.6 - - - default-cli - - ark-plugin - - - - - 2000 - - - - - - com.com.usthe.plugin.sample.ExportDemo - - - - - - - - - - - - \ No newline at end of file diff --git a/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java b/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java deleted file mode 100644 index b6bbe94..0000000 --- a/collector/plugins/sample-plugin/src/main/java/com/usthe/collector/plugin/SameClass.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.usthe.collector.plugin; - -/** - * @author tomsun28 - * @date 2021/10/8 15:12 - */ -public class SameClass { - - public static String hello() { - return "hello plugin"; - } -} diff --git a/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java b/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java deleted file mode 100644 index c3e163d..0000000 --- a/collector/plugins/sample-plugin/src/main/java/com/usthe/plugin/sample/ExportDemo.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.usthe.plugin.sample; - -import com.usthe.collector.plugin.SameClass; - -/** - * @author tomsun28 - * @date 2021/10/8 15:11 - */ -public class ExportDemo { - - public String hello() { - return SameClass.hello(); - } -} diff --git a/collector/pom.xml b/collector/pom.xml index e30de1e..a22b4a6 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -5,14 +5,85 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 collector - pom - - server - plugins - + + + + + org.springframework.boot + spring-boot-starter-web + provided + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-autoconfigure + + + + com.usthe.tancloud + common + 1.0-SNAPSHOT + + + + io.etcd + jetcd-core + 0.5.11 + + + + org.apache.kafka + kafka-clients + 3.0.0 + + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + commons-net + commons-net + 3.8.0 + + + + com.jayway.jsonpath + json-path + 2.6.0 + + + + com.googlecode.concurrentlinkedhashmap + concurrentlinkedhashmap-lru + 1.4.2 + + + com.google.guava + guava + 31.0.1-jre + + + com.google.code.gson + gson + 2.8.8 + + + com.googlecode.aviator + aviator + 5.2.7 + + + \ No newline at end of file diff --git a/collector/server/pom.xml b/collector/server/pom.xml deleted file mode 100644 index f22246b..0000000 --- a/collector/server/pom.xml +++ /dev/null @@ -1,146 +0,0 @@ - - - - collector - com.usthe.tancloud - 1.0-SNAPSHOT - - 4.0.0 - - server - - - - - org.springframework.boot - spring-boot-starter-webflux - - - org.springframework.boot - spring-boot-configuration-processor - true - - - - com.alipay.sofa - sofa-ark-springboot-starter - 1.1.6 - - - - com.usthe.tancloud - common - 1.0-SNAPSHOT - - - - io.etcd - jetcd-core - 0.5.11 - - - - org.apache.kafka - kafka-clients - 3.0.0 - - - - org.apache.httpcomponents - httpclient - 4.5.13 - - - - commons-net - commons-net - 3.8.0 - - - - com.jayway.jsonpath - json-path - 2.6.0 - - - - com.googlecode.concurrentlinkedhashmap - concurrentlinkedhashmap-lru - 1.4.2 - - - com.google.guava - guava - 31.0.1-jre - - - com.google.code.gson - gson - 2.8.8 - - - com.googlecode.aviator - aviator - 5.2.7 - - - - com.usthe.tancloud - sample-plugin - 1.0-SNAPSHOT - - - - - hertz-beat-collector - - - com.alipay.sofa - sofa-ark-maven-plugin - 1.1.6 - - - default-cli - - - - repackage - - - - - ./target - - - executable - - - - - - org.apache.maven.plugins - maven-assembly-plugin - 3.3.0 - - - make-zip - - package - - - single - - - - ../assembly/collector/assembly.xml - - - - - - - - - \ No newline at end of file diff --git a/collector/server/src/main/java/com/usthe/collector/Collector.java b/collector/server/src/main/java/com/usthe/collector/Collector.java deleted file mode 100644 index 275ec54..0000000 --- a/collector/server/src/main/java/com/usthe/collector/Collector.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.usthe.collector; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; - -/** - * collector start - * @author tomsun28 - * @date 2021/10/7 18:02 - */ -@SpringBootApplication(exclude = {KafkaAutoConfiguration.class}) -@Slf4j -public class Collector { - public static void main(String[] args) { - SpringApplication.run(Collector.class, args); - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConfiguration.java b/collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConfiguration.java deleted file mode 100644 index 8f8d592..0000000 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConfiguration.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.usthe.collector.dispatch; - -import com.googlecode.aviator.AviatorEvaluator; -import com.googlecode.aviator.Options; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * @author tomsun28 - * @date 2021/11/3 12:55 - */ -@Configuration -public class DispatchConfiguration { - - private static final int AVIATOR_LRU_CACHE_SIZE = 1024; - - @Bean - public void configAviatorEvaluator() { - // 配置AviatorEvaluator使用LRU缓存编译后的表达式 - AviatorEvaluator.getInstance() - .useLRUExpressionCache(AVIATOR_LRU_CACHE_SIZE); - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectJobController.java b/collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectJobController.java deleted file mode 100644 index 0ef1473..0000000 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectJobController.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.usthe.collector.dispatch.entrance.http; - -import com.usthe.collector.dispatch.timer.TimerDispatch; -import com.usthe.common.entity.job.Job; -import com.usthe.common.entity.message.CollectRep; -import com.usthe.common.util.ProtoJsonUtil; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.Mono; - -import java.util.ArrayList; -import java.util.List; - -/** - * 采集job管理提供api接口 - * @author tomsun28 - * @date 2021/11/6 13:58 - */ -@RestController -public class CollectJobController { - - @Autowired - private TimerDispatch timerDispatch; - - /** - * 执行一次性采集任务,获取采集数据响应 - * @return 采集结果 - */ - @PostMapping(path = "/job/sync", consumes = MediaType.APPLICATION_JSON_VALUE, - produces = MediaType.APPLICATION_JSON_VALUE) - public Mono> collectSyncJobData(@RequestBody Job job) { - return Mono.create(sink -> { - CollectResponseEventListener listener = new CollectResponseEventListener() { - @Override - public void response(List responseMetrics) { - if (responseMetrics == null || responseMetrics.isEmpty()) { - sink.success(); - } else { - List jsons = new ArrayList<>(responseMetrics.size()); - for (CollectRep.MetricsData metricsData : responseMetrics) { - String json = ProtoJsonUtil.toJsonStr(metricsData); - if (json != null) { - jsons.add(json); - } - } - sink.success(jsons); - } - } - }; - timerDispatch.addJob(job, listener); - }); - } - -} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java deleted file mode 100644 index ff7ba61..0000000 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaDataExporter.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.usthe.collector.dispatch.export; - -import com.usthe.collector.dispatch.DispatchProperties; -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.LongSerializer; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - -/** - * kafka采集数据消息发送 - * @author tomsun28 - * @date 2021/11/3 15:22 - */ -@Configuration -@ConditionalOnProperty(prefix = "collector.dispatch.export.kafka", - name = "enabled", havingValue = "true", matchIfMissing = true) -@AutoConfigureAfter(value = {DispatchProperties.class}) -@Slf4j -public class KafkaDataExporter implements DisposableBean { - - KafkaProducer kafkaProducer; - DispatchProperties.ExportProperties.KafkaProperties kafkaProperties; - public KafkaDataExporter(DispatchProperties dispatchProperties) { - try { - kafkaProperties = dispatchProperties.getExport().getKafka(); - Properties properties = new Properties(); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getServers()); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMetricsDataSerializer.class); - kafkaProducer = new KafkaProducer<>(properties); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - - /** - * 发送消息 - * @param metricsData 指标组采集数据 - */ - public void send(CollectRep.MetricsData metricsData) { - if (kafkaProducer != null) { - kafkaProducer.send(new ProducerRecord<>(kafkaProperties.getTopic(), metricsData.getId(), metricsData)); - } else { - log.error("kafkaProducer is not enable"); - } - } - - @Override - public void destroy() throws Exception { - if (kafkaProducer != null) { - kafkaProducer.close(); - } - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaMetricsDataSerializer.java b/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaMetricsDataSerializer.java deleted file mode 100644 index d21afe7..0000000 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/export/KafkaMetricsDataSerializer.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.usthe.collector.dispatch.export; - - -import com.usthe.common.entity.message.CollectRep; -import org.apache.kafka.common.serialization.Serializer; - -/** - * MetricsData的序列化 - * @author tomsun28 - * @date 2021/11/3 16:14 - */ -public class KafkaMetricsDataSerializer implements Serializer { - - @Override - public byte[] serialize(String topicName, CollectRep.MetricsData metricsData) { - return metricsData.toByteArray(); - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/plugin/SameClass.java b/collector/server/src/main/java/com/usthe/collector/plugin/SameClass.java deleted file mode 100644 index 69fe72d..0000000 --- a/collector/server/src/main/java/com/usthe/collector/plugin/SameClass.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.usthe.collector.plugin; - -/** - * @author tomsun28 - * @date 2021/10/8 15:12 - */ -public class SameClass { - - public static String hello() { - return "hello collector"; - } -} diff --git a/collector/server/src/main/java/com/usthe/collector/plugin/TestPlugin.java b/collector/server/src/main/java/com/usthe/collector/plugin/TestPlugin.java deleted file mode 100644 index 06efe94..0000000 --- a/collector/server/src/main/java/com/usthe/collector/plugin/TestPlugin.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.usthe.collector.plugin; - -import com.usthe.plugin.sample.ExportDemo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.CommandLineRunner; -import org.springframework.stereotype.Component; - -/** - * @author tomsun28 - * @date 2021/10/8 15:31 - */ -@Component -@Slf4j -public class TestPlugin implements CommandLineRunner { - @Override - public void run(String... args) throws Exception { - log.info(SameClass.hello()); - log.info(new ExportDemo().hello()); - } -} diff --git a/collector/server/src/main/resources/application.yml b/collector/server/src/main/resources/application.yml deleted file mode 100644 index 1cf6a57..0000000 --- a/collector/server/src/main/resources/application.yml +++ /dev/null @@ -1,19 +0,0 @@ -server: - port: 1157 -spring: - application: - name: ${HOSTNAME:@collecor@}${PID} - profiles: - active: dev - jackson: - default-property-inclusion: NON_EMPTY -collector: - dispatch: - entrance: - etcd: - endpoints: http://139.198.109.64:2379 - export: - kafka: - enabled: true - servers: 139.198.109.64:9092 - topic: async-collect-data diff --git a/collector/server/src/main/resources/banner.txt b/collector/server/src/main/resources/banner.txt deleted file mode 100644 index 8e22262..0000000 --- a/collector/server/src/main/resources/banner.txt +++ /dev/null @@ -1,6 +0,0 @@ - ██████╗ ██████╗ ██╗ ██╗ ███████╗ ██████╗████████╗ ██████╗ ██████╗ -██╔════╝██╔═══██╗██║ ██║ ██╔════╝██╔════╝╚══██╔══╝██╔═══██╗██╔══██╗ -██║ ██║ ██║██║ ██║ █████╗ ██║ ██║ ██║ ██║██████╔╝ -██║ ██║ ██║██║ ██║ ██╔══╝ ██║ ██║ ██║ ██║██╔══██╗ Profile: ${spring.profiles.active} -╚██████╗╚██████╔╝███████╗███████╗███████╗╚██████╗ ██║ ╚██████╔╝██║ ██║ Name: ${spring.application.name} Port: ${server.port} Pid: ${pid} - ╚═════╝ ╚═════╝ ╚══════╝╚══════╝╚══════╝ ╚═════╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ diff --git a/collector/server/src/main/resources/logback-spring.xml b/collector/server/src/main/resources/logback-spring.xml deleted file mode 100644 index 90e6569..0000000 --- a/collector/server/src/main/resources/logback-spring.xml +++ /dev/null @@ -1,79 +0,0 @@ - - - - - - - - - 1-%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger - %msg%n - UTF-8 - - - - - - - - logs/${application_name}-%d{yyyy-MM-dd}.%i.log - - - 200MB - - - - true - - - ===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n - utf-8 - - - - - - logs/${application_name}-%d{yyyy-MM-dd}-error.%i.log - - 200MB - - - - true - - - ===%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger Line:%-3L - %msg%n - utf-8 - - - - ERROR - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/collector/server/src/main/java/com/usthe/collector/collect/AbstractCollect.java b/collector/src/main/java/com/usthe/collector/collect/AbstractCollect.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/collect/AbstractCollect.java rename to collector/src/main/java/com/usthe/collector/collect/AbstractCollect.java diff --git a/collector/server/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java b/collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java rename to collector/src/main/java/com/usthe/collector/collect/database/JdbcCommonCollect.java diff --git a/collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java rename to collector/src/main/java/com/usthe/collector/collect/http/HttpCollectImpl.java diff --git a/collector/server/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java rename to collector/src/main/java/com/usthe/collector/collect/icmp/IcmpCollectImpl.java diff --git a/collector/server/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java rename to collector/src/main/java/com/usthe/collector/collect/telnet/TelnetCollectImpl.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/CollectorProperties.java b/collector/src/main/java/com/usthe/collector/common/CollectorProperties.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/CollectorProperties.java rename to collector/src/main/java/com/usthe/collector/common/CollectorProperties.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/cache/CacheCloseable.java b/collector/src/main/java/com/usthe/collector/common/cache/CacheCloseable.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/cache/CacheCloseable.java rename to collector/src/main/java/com/usthe/collector/common/cache/CacheCloseable.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/cache/CacheDetectable.java b/collector/src/main/java/com/usthe/collector/common/cache/CacheDetectable.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/cache/CacheDetectable.java rename to collector/src/main/java/com/usthe/collector/common/cache/CacheDetectable.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/cache/CacheIdentifier.java b/collector/src/main/java/com/usthe/collector/common/cache/CacheIdentifier.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/cache/CacheIdentifier.java rename to collector/src/main/java/com/usthe/collector/common/cache/CacheIdentifier.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/cache/CommonCache.java b/collector/src/main/java/com/usthe/collector/common/cache/CommonCache.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/cache/CommonCache.java rename to collector/src/main/java/com/usthe/collector/common/cache/CommonCache.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/cache/support/CommonJdbcConnect.java b/collector/src/main/java/com/usthe/collector/common/cache/support/CommonJdbcConnect.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/cache/support/CommonJdbcConnect.java rename to collector/src/main/java/com/usthe/collector/common/cache/support/CommonJdbcConnect.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/http/CustomHttpRequestRetryHandler.java b/collector/src/main/java/com/usthe/collector/common/http/CustomHttpRequestRetryHandler.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/http/CustomHttpRequestRetryHandler.java rename to collector/src/main/java/com/usthe/collector/common/http/CustomHttpRequestRetryHandler.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/http/HttpClientPool.java b/collector/src/main/java/com/usthe/collector/common/http/HttpClientPool.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/http/HttpClientPool.java rename to collector/src/main/java/com/usthe/collector/common/http/HttpClientPool.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpec.java b/collector/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpec.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpec.java rename to collector/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpec.java diff --git a/collector/server/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpecProvider.java b/collector/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpecProvider.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpecProvider.java rename to collector/src/main/java/com/usthe/collector/common/http/IgnoreReqCookieSpecProvider.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java rename to collector/src/main/java/com/usthe/collector/dispatch/CollectDataDispatch.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java similarity index 97% rename from collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java rename to collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java index 6e2ff8f..e73a6aa 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/CommonDispatcher.java @@ -1,6 +1,6 @@ package com.usthe.collector.dispatch; -import com.usthe.collector.dispatch.export.KafkaDataExporter; +import com.usthe.collector.dispatch.export.MetricsDataExporter; import com.usthe.collector.dispatch.timer.Timeout; import com.usthe.collector.dispatch.timer.TimerDispatch; import com.usthe.collector.dispatch.timer.WheelTimerTask; @@ -44,14 +44,14 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc /** * kafka采集数据导出器 */ - private KafkaDataExporter kafkaDataExporter; + private MetricsDataExporter kafkaDataExporter; /** * 指标组任务与开始时间映射map */ private Map metricsTimeoutMonitorMap; public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch, - KafkaDataExporter kafkaDataExporter, WorkerPool workerPool) { + MetricsDataExporter kafkaDataExporter, WorkerPool workerPool) { this.kafkaDataExporter = kafkaDataExporter; this.jobRequestQueue = jobRequestQueue; this.timerDispatch = timerDispatch; diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java rename to collector/src/main/java/com/usthe/collector/dispatch/DispatchConstants.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java b/collector/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java rename to collector/src/main/java/com/usthe/collector/dispatch/DispatchProperties.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java rename to collector/src/main/java/com/usthe/collector/dispatch/MetricsCollect.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java rename to collector/src/main/java/com/usthe/collector/dispatch/MetricsCollectorQueue.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java rename to collector/src/main/java/com/usthe/collector/dispatch/MetricsTaskDispatch.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java b/collector/src/main/java/com/usthe/collector/dispatch/WorkerPool.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/WorkerPool.java rename to collector/src/main/java/com/usthe/collector/dispatch/WorkerPool.java diff --git a/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java new file mode 100644 index 0000000..22287c3 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectJobService.java @@ -0,0 +1,83 @@ +package com.usthe.collector.dispatch.entrance.internal; + +import com.usthe.collector.dispatch.timer.TimerDispatch; +import com.usthe.common.entity.job.Job; +import com.usthe.common.entity.message.CollectRep; +import com.usthe.common.util.SnowFlakeIdGenerator; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * 采集job管理提供api接口 + * @author tomsun28 + * @date 2021/11/6 13:58 + */ +@Service +@Slf4j +public class CollectJobService { + + @Autowired + private TimerDispatch timerDispatch; + + /** + * 执行一次性采集任务,获取采集数据响应 + * @param job 采集任务详情 + * @return 采集结果 + */ + public List collectSyncJobData(Job job) { + final List metricsData = new LinkedList<>(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + CollectResponseEventListener listener = new CollectResponseEventListener() { + @Override + public void response(List responseMetrics) { + if (responseMetrics != null) { + metricsData.addAll(responseMetrics); + } + countDownLatch.countDown(); + } + }; + timerDispatch.addJob(job, listener); + try { + countDownLatch.await(100, TimeUnit.SECONDS); + } catch (Exception e) { + log.info("同步任务运行100秒无响应,返回"); + } + return metricsData; + } + + /** + * 下发周期性异步采集任务 + * @param job 采集任务详情 + * @return long 任务ID + */ + public long addAsyncCollectJob(Job job) { + long jobId = SnowFlakeIdGenerator.generateId(); + job.setId(jobId); + timerDispatch.addJob(job, null); + return jobId; + } + + /** + * 更新已经下发的周期性异步采集任务 + * @param modifyJob 采集任务详情 + */ + public void updateAsyncCollectJob(Job modifyJob) { + timerDispatch.deleteJob(modifyJob.getId(), true); + timerDispatch.addJob(modifyJob, null); + } + + /** + * 取消周期性异步采集任务 + * @param jobId 任务ID + */ + public void cancelAsyncCollectJob(Long jobId) { + timerDispatch.deleteJob(jobId, true); + } + +} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectResponseEventListener.java b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java similarity index 73% rename from collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectResponseEventListener.java rename to collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java index a0813f6..c4d6db4 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/entrance/http/CollectResponseEventListener.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/entrance/internal/CollectResponseEventListener.java @@ -1,4 +1,4 @@ -package com.usthe.collector.dispatch.entrance.http; +package com.usthe.collector.dispatch.entrance.internal; import com.usthe.common.entity.message.CollectRep; @@ -16,5 +16,5 @@ public interface CollectResponseEventListener extends EventListener { * 采集任务完成结果通知 * @param responseMetrics 响应数据 */ - public default void response(List responseMetrics) {} + default void response(List responseMetrics) {} } diff --git a/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java b/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java new file mode 100644 index 0000000..82972a5 --- /dev/null +++ b/collector/src/main/java/com/usthe/collector/dispatch/export/MetricsDataExporter.java @@ -0,0 +1,56 @@ +package com.usthe.collector.dispatch.export; + +import com.usthe.common.entity.message.CollectRep; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.stereotype.Component; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 采集数据消息发送 + * @author tomsun28 + * @date 2021/11/3 15:22 + */ +@Component +@Slf4j +public class MetricsDataExporter implements DisposableBean { + + private final LinkedBlockingQueue metricsDataToAlertQueue; + private final LinkedBlockingQueue metricsDataToWarehouseInfluxQueue; + private final LinkedBlockingQueue metricsDataToWarehouseRedisQueue; + + public MetricsDataExporter() { + metricsDataToAlertQueue = new LinkedBlockingQueue<>(); + metricsDataToWarehouseInfluxQueue = new LinkedBlockingQueue<>(); + metricsDataToWarehouseRedisQueue = new LinkedBlockingQueue<>(); + } + + public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException { + return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS); + } + + public CollectRep.MetricsData pollWarehouseInfluxMetricsData() throws InterruptedException { + return metricsDataToAlertQueue.poll(2, TimeUnit.SECONDS); + } + + public CollectRep.MetricsData pollWarehouseRedisMetricsData() throws InterruptedException { + return metricsDataToWarehouseRedisQueue.poll(2, TimeUnit.SECONDS); + } + + /** + * 发送消息 + * @param metricsData 指标组采集数据 + */ + public void send(CollectRep.MetricsData metricsData) { + metricsDataToAlertQueue.offer(metricsData); + metricsDataToWarehouseInfluxQueue.offer(metricsData); + metricsDataToWarehouseRedisQueue.offer(metricsData); + } + + @Override + public void destroy() throws Exception { + metricsDataToAlertQueue.clear(); + } +} diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/HashedWheelTimer.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/Timeout.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/Timer.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/Timer.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/Timer.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/Timer.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java similarity index 93% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java index 40a022f..578e504 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatch.java @@ -1,7 +1,7 @@ package com.usthe.collector.dispatch.timer; -import com.usthe.collector.dispatch.entrance.http.CollectResponseEventListener; +import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener; import com.usthe.common.entity.job.Job; import com.usthe.common.entity.message.CollectRep; diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java similarity index 97% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java index 1486234..46a4b05 100644 --- a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java +++ b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerDispatcher.java @@ -1,6 +1,6 @@ package com.usthe.collector.dispatch.timer; -import com.usthe.collector.dispatch.entrance.http.CollectResponseEventListener; +import com.usthe.collector.dispatch.entrance.internal.CollectResponseEventListener; import com.usthe.common.entity.job.Job; import com.usthe.common.entity.message.CollectRep; import org.springframework.stereotype.Component; diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/TimerTask.java diff --git a/collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java b/collector/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java rename to collector/src/main/java/com/usthe/collector/dispatch/timer/WheelTimerTask.java diff --git a/collector/server/src/main/java/com/usthe/collector/util/CollectorConstants.java b/collector/src/main/java/com/usthe/collector/util/CollectorConstants.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/util/CollectorConstants.java rename to collector/src/main/java/com/usthe/collector/util/CollectorConstants.java diff --git a/collector/server/src/main/java/com/usthe/collector/util/JsonPathParser.java b/collector/src/main/java/com/usthe/collector/util/JsonPathParser.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/util/JsonPathParser.java rename to collector/src/main/java/com/usthe/collector/util/JsonPathParser.java diff --git a/collector/server/src/main/java/com/usthe/collector/util/SpringContextHolder.java b/collector/src/main/java/com/usthe/collector/util/SpringContextHolder.java similarity index 100% rename from collector/server/src/main/java/com/usthe/collector/util/SpringContextHolder.java rename to collector/src/main/java/com/usthe/collector/util/SpringContextHolder.java diff --git a/collector/src/main/resources/META-INF/spring.factories b/collector/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..6f385a1 --- /dev/null +++ b/collector/src/main/resources/META-INF/spring.factories @@ -0,0 +1,9 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +com.usthe.collector.dispatch.timer.TimerDispatcher,\ +com.usthe.collector.dispatch.CommonDispatcher,\ +com.usthe.collector.dispatch.DispatchProperties,\ +com.usthe.collector.dispatch.MetricsCollectorQueue,\ +com.usthe.collector.dispatch.WorkerPool,\ +com.usthe.collector.dispatch.entrance.internal.CollectJobService,\ +com.usthe.collector.dispatch.export.MetricsDataExporter,\ +com.usthe.collector.util.SpringContextHolder diff --git a/collector/server/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java b/collector/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java similarity index 100% rename from collector/server/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java rename to collector/src/test/java/com/usthe/collector/collect/telnet/TelnetCollectImplTest.java diff --git a/common/pom.xml b/common/pom.xml index 65949be..5679795 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 diff --git a/manager/pom.xml b/manager/pom.xml index d6da03c..ecca547 100644 --- a/manager/pom.xml +++ b/manager/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 manager @@ -22,25 +22,25 @@ com.usthe.tancloud common - 1.0-SNAPSHOT - - - - com.usthe.tancloud - scheduler - 1.0-SNAPSHOT + 1.0 com.usthe.tancloud warehouse - 1.0-SNAPSHOT + 1.0 com.usthe.tancloud alerter - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 @@ -123,6 +123,7 @@ sureness.yml banner.txt db/** + define/** @@ -167,7 +168,7 @@ - ../assembly/server/assembly.xml + ../script/assembly/server/assembly.xml diff --git a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java index 024d003..2a6e9ca 100644 --- a/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java +++ b/manager/src/main/java/com/usthe/manager/service/impl/MonitorServiceImpl.java @@ -1,5 +1,6 @@ package com.usthe.manager.service.impl; +import com.usthe.collector.dispatch.entrance.internal.CollectJobService; import com.usthe.common.entity.job.Configmap; import com.usthe.common.entity.job.Job; import com.usthe.common.entity.job.Metrics; @@ -20,7 +21,6 @@ import com.usthe.manager.service.AppService; import com.usthe.manager.service.MonitorService; import com.usthe.manager.support.exception.MonitorDatabaseException; import com.usthe.manager.support.exception.MonitorDetectException; -import com.usthe.scheduler.JobScheduling; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -52,7 +52,7 @@ public class MonitorServiceImpl implements MonitorService { private AppService appService; @Autowired - private JobScheduling jobScheduling; + private CollectJobService collectJobService; @Autowired private MonitorDao monitorDao; @@ -74,7 +74,7 @@ public class MonitorServiceImpl implements MonitorService { List configmaps = params.stream().map(param -> new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); - List collectRep = jobScheduling.addSyncCollectJob(appDefine); + List collectRep = collectJobService.collectSyncJobData(appDefine); // 判断探测结果 失败则抛出探测异常 if (collectRep == null || collectRep.isEmpty()) { throw new MonitorDetectException("No collector response"); @@ -101,7 +101,7 @@ public class MonitorServiceImpl implements MonitorService { }).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 下发采集任务得到jobId - long jobId = jobScheduling.addAsyncCollectJob(appDefine); + long jobId = collectJobService.addAsyncCollectJob(appDefine); // 下发成功后刷库 try { monitor.setId(monitorId); @@ -112,7 +112,7 @@ public class MonitorServiceImpl implements MonitorService { } catch (Exception e) { log.error(e.getMessage(), e); // 刷库异常取消之前的下发任务 - jobScheduling.cancelAsyncCollectJob(jobId); + collectJobService.cancelAsyncCollectJob(jobId); throw new MonitorDatabaseException(e.getMessage()); } } @@ -225,7 +225,7 @@ public class MonitorServiceImpl implements MonitorService { new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 更新采集任务 - jobScheduling.updateAsyncCollectJob(appDefine); + collectJobService.updateAsyncCollectJob(appDefine); // 下发更新成功后刷库 try { monitor.setJobId(preMonitor.getJobId()); @@ -246,7 +246,7 @@ public class MonitorServiceImpl implements MonitorService { Monitor monitor = monitorOptional.get(); monitorDao.deleteById(id); paramDao.deleteParamsByMonitorId(id); - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } @@ -258,7 +258,7 @@ public class MonitorServiceImpl implements MonitorService { monitorDao.deleteAll(monitors); paramDao.deleteParamsByMonitorIdIn(ids); for (Monitor monitor : monitors) { - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } } @@ -299,7 +299,7 @@ public class MonitorServiceImpl implements MonitorService { if (!managedMonitors.isEmpty()) { monitorDao.saveAll(managedMonitors); for (Monitor monitor : managedMonitors) { - jobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + collectJobService.cancelAsyncCollectJob(monitor.getJobId()); } } } @@ -326,7 +326,7 @@ public class MonitorServiceImpl implements MonitorService { new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 下发采集任务 - jobScheduling.addAsyncCollectJob(appDefine); + collectJobService.addAsyncCollectJob(appDefine); } } } diff --git a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java index 271234c..cde3378 100644 --- a/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java +++ b/manager/src/main/java/com/usthe/manager/support/GlobalExceptionHandler.java @@ -4,7 +4,6 @@ package com.usthe.manager.support; import com.usthe.common.entity.dto.Message; import com.usthe.manager.support.exception.MonitorDatabaseException; import com.usthe.manager.support.exception.MonitorDetectException; -import com.usthe.scheduler.ScheduleException; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataAccessException; import org.springframework.http.HttpStatus; @@ -133,23 +132,6 @@ public class GlobalExceptionHandler { return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(message); } - /** - * 处理分发调度器异常问题 - * @param exception 调度器异常问题 - * @return response - */ - @ExceptionHandler(ScheduleException.class) - @ResponseBody - ResponseEntity> handleScheduleException(ScheduleException exception) { - String errorMessage = "scheduler warning"; - if (exception != null) { - errorMessage = exception.getMessage(); - } - log.warn("[scheduler warning]-{}", errorMessage); - Message message = Message.builder().msg(errorMessage).code(MONITOR_CONFLICT_CODE).build(); - return ResponseEntity.status(HttpStatus.CONFLICT).body(message); - } - /** * handler the exception thrown for datastore error * @param exception datastore exception diff --git a/pom.xml b/pom.xml index ab68842..c4da00a 100644 --- a/pom.xml +++ b/pom.xml @@ -7,9 +7,8 @@ com.usthe.tancloud monitor pom - 1.0-SNAPSHOT + 1.0 - scheduler manager alerter common diff --git a/script/assembly/package-build.sh b/script/assembly/package-build.sh new file mode 100644 index 0000000..4bd63c6 --- /dev/null +++ b/script/assembly/package-build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +cd ../../web-app + +ng build --prod --base-href /console/ + +cd .. + +mvn clean package diff --git a/assembly/server/assembly.xml b/script/assembly/server/assembly.xml similarity index 91% rename from assembly/server/assembly.xml rename to script/assembly/server/assembly.xml index 038731d..91d5399 100644 --- a/assembly/server/assembly.xml +++ b/script/assembly/server/assembly.xml @@ -22,8 +22,11 @@ http://maven.apache.org/ASSEMBLY/2.0.0 "> - ../assembly/server/bin + ../script/assembly/server/bin + + true bin + 0755 diff --git a/assembly/collector/bin/shutdown.sh b/script/assembly/server/bin/shutdown.sh similarity index 100% rename from assembly/collector/bin/shutdown.sh rename to script/assembly/server/bin/shutdown.sh diff --git a/assembly/server/bin/startup.sh b/script/assembly/server/bin/startup.sh similarity index 100% rename from assembly/server/bin/startup.sh rename to script/assembly/server/bin/startup.sh diff --git a/warehouse/pom.xml b/warehouse/pom.xml index eb4f04d..25bc65b 100644 --- a/warehouse/pom.xml +++ b/warehouse/pom.xml @@ -5,7 +5,7 @@ monitor com.usthe.tancloud - 1.0-SNAPSHOT + 1.0 4.0.0 @@ -16,7 +16,14 @@ com.usthe.tancloud common - 1.0-SNAPSHOT + 1.0 + + + + com.usthe.tancloud + collector + 1.0 + provided diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java deleted file mode 100644 index ad99bed..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaDataConsume.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.usthe.warehouse.entrance; - -import com.usthe.common.entity.message.CollectRep; -import com.usthe.warehouse.MetricsDataQueue; -import com.usthe.warehouse.WarehouseProperties; -import com.usthe.warehouse.WarehouseWorkerPool; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Duration; -import java.util.Collections; -import java.util.Properties; - -/** - * 从Kafka消费指标组采集数据处理 - * @author tom - * @date 2021/11/24 18:03 - */ -@Configuration -@AutoConfigureAfter(value = {WarehouseProperties.class}) -@ConditionalOnProperty(prefix = "warehouse.entrance.kafka", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class KafkaDataConsume implements DisposableBean { - - private KafkaConsumer consumer; - private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; - public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initConsumer(properties); - startConsumeData(); - } - - private void startConsumeData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-kafka-data-consumer"); - while (!Thread.currentThread().isInterrupted()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - records.forEach(record -> { - dataQueue.addMetricsDataToInflux(record.value()); - dataQueue.addMetricsDataToRedis(record.value()); - }); - } - }; - workerPool.executeJob(runnable); - } - - private void initConsumer(WarehouseProperties properties) { - if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) { - log.error("init error, please config Warehouse kafka props in application.yml"); - throw new IllegalArgumentException("please config Warehouse kafka props"); - } - WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka(); - Properties consumerProp = new Properties(); - consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers()); - consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId()); - consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class); - consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); - consumer = new KafkaConsumer<>(consumerProp); - consumer.subscribe(Collections.singleton(kafkaProp.getTopic())); - } - - @Override - public void destroy() throws Exception { - if (consumer != null) { - consumer.close(); - } - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java b/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java deleted file mode 100644 index 544265a..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/entrance/KafkaMetricsDataDeserializer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.usthe.warehouse.entrance; - -import com.usthe.common.entity.message.CollectRep; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Deserializer; - -/** - * MetricsData的反序列化 - * @author tom - * @date 2021/11/24 17:29 - */ -@Slf4j -public class KafkaMetricsDataDeserializer implements Deserializer { - - @Override - public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) { - try { - return CollectRep.MetricsData.parseFrom(bytes); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return null; - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java deleted file mode 100644 index 22061a3..0000000 --- a/warehouse/src/main/java/com/usthe/warehouse/store/InfluxdbDataStorage.java +++ /dev/null @@ -1,134 +0,0 @@ -package com.usthe.warehouse.store; - -import com.influxdb.client.InfluxDBClient; -import com.influxdb.client.InfluxDBClientFactory; -import com.influxdb.client.WriteApi; -import com.influxdb.client.WriteOptions; -import com.influxdb.client.domain.WritePrecision; -import com.influxdb.client.write.Point; -import com.usthe.common.entity.message.CollectRep; -import com.usthe.common.util.CommonConstants; -import com.usthe.warehouse.MetricsDataQueue; -import com.usthe.warehouse.WarehouseProperties; -import com.usthe.warehouse.WarehouseWorkerPool; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; - -import java.time.Instant; -import java.util.List; - -/** - * influxdb存储采集数据 - * @author tom - * @date 2021/11/24 18:23 - */ -@Configuration -@AutoConfigureAfter(value = {WarehouseProperties.class}) -@ConditionalOnProperty(prefix = "warehouse.store.influxdb", - name = "enabled", havingValue = "true", matchIfMissing = true) -@Slf4j -public class InfluxdbDataStorage implements DisposableBean { - - private InfluxDBClient influxClient; - private WriteApi writeApi; - private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; - - public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { - this.workerPool = workerPool; - this.dataQueue = dataQueue; - initInfluxDbClient(properties); - startStorageData(); - } - - private void startStorageData() { - Runnable runnable = () -> { - Thread.currentThread().setName("warehouse-influxdb-data-storage"); - while (!Thread.currentThread().isInterrupted()) { - try { - CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData(); - if (metricsData != null) { - saveData(metricsData); - } - } catch (InterruptedException e) { - log.error(e.getMessage()); - } - } - }; - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); - } - - private void initInfluxDbClient(WarehouseProperties properties) { - if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) { - log.error("init error, please config Warehouse influxdb props in application.yml"); - throw new IllegalArgumentException("please config Warehouse influxdb props"); - } - WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb(); - influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(), - influxdbProp.getOrg(), influxdbProp.getBucket()); - WriteOptions writeOptions = WriteOptions.builder() - .batchSize(1000) - .bufferLimit(1000) - .jitterInterval(1000) - .retryInterval(5000) - .build(); - writeApi = influxClient.makeWriteApi(writeOptions); - } - - - public void saveData(CollectRep.MetricsData metricsData) { - String measurement = metricsData.getApp() + "_" + metricsData.getMetrics(); - String monitorId = String.valueOf(metricsData.getId()); - Instant collectTime = Instant.ofEpochMilli(metricsData.getTime()); - - List fields = metricsData.getFieldsList(); - for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - Point point = Point.measurement(measurement) - .addTag("id", monitorId) - .addTag("instance", valueRow.getInstance()) - .time(collectTime, WritePrecision.MS); - for (int index = 0; index < fields.size(); index++) { - CollectRep.Field field = fields.get(index); - String value = valueRow.getColumns(index); - if (field.getType() == CommonConstants.TYPE_NUMBER) { - // number data - if (CommonConstants.NULL_VALUE.equals(value)) { - point.addField(field.getName(), (Number) null); - } else { - try { - double number = Double.parseDouble(value); - point.addField(field.getName(), number); - } catch (Exception e) { - log.warn(e.getMessage()); - point.addField(field.getName(), (Number) null); - } - } - } else { - // string - if (CommonConstants.NULL_VALUE.equals(value)) { - point.addField(field.getName(), (String) null); - } else { - point.addField(field.getName(), value); - } - } - } - writeApi.writePoint(point); - } - } - - - @Override - public void destroy() throws Exception { - if (writeApi != null) { - writeApi.close(); - } - if (influxClient != null) { - influxClient.close(); - } - } -} diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java index 0287697..10a3d4f 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/RedisDataStorage.java @@ -1,7 +1,7 @@ package com.usthe.warehouse.store; +import com.usthe.collector.dispatch.export.MetricsDataExporter; import com.usthe.common.entity.message.CollectRep; -import com.usthe.warehouse.MetricsDataQueue; import com.usthe.warehouse.WarehouseProperties; import com.usthe.warehouse.WarehouseWorkerPool; import io.lettuce.core.RedisClient; @@ -34,13 +34,12 @@ public class RedisDataStorage implements DisposableBean { private RedisClient redisClient; private StatefulRedisConnection connection; private WarehouseWorkerPool workerPool; - private MetricsDataQueue dataQueue; + private MetricsDataExporter dataExporter; public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool, - MetricsDataQueue dataQueue) { + MetricsDataExporter dataExporter) { this.workerPool = workerPool; - this.dataQueue = dataQueue; - + this.dataExporter = dataExporter; initRedisClient(properties); startStorageData(); } @@ -55,7 +54,7 @@ public class RedisDataStorage implements DisposableBean { Thread.currentThread().setName("warehouse-redis-data-storage"); while (!Thread.currentThread().isInterrupted()) { try { - CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData(); + CollectRep.MetricsData metricsData = dataExporter.pollWarehouseRedisMetricsData(); if (metricsData != null) { saveData(metricsData); } diff --git a/warehouse/src/main/resources/META-INF/spring.factories b/warehouse/src/main/resources/META-INF/spring.factories index fc593ec..783832d 100644 --- a/warehouse/src/main/resources/META-INF/spring.factories +++ b/warehouse/src/main/resources/META-INF/spring.factories @@ -2,7 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.usthe.warehouse.WarehouseProperties,\ com.usthe.warehouse.MetricsDataQueue,\ com.usthe.warehouse.WarehouseWorkerPool,\ -com.usthe.warehouse.entrance.KafkaDataConsume,\ -com.usthe.warehouse.store.InfluxdbDataStorage,\ com.usthe.warehouse.store.RedisDataStorage,\ com.usthe.warehouse.controller.MetricsDataController \ No newline at end of file