[monitor]优化去除Kafka etcd依赖
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>monitor</artifactId>
|
||||
<groupId>com.usthe.tancloud</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<version>1.0</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
@@ -16,7 +16,14 @@
|
||||
<dependency>
|
||||
<groupId>com.usthe.tancloud</groupId>
|
||||
<artifactId>common</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
<version>1.0</version>
|
||||
</dependency>
|
||||
<!-- collector -->
|
||||
<dependency>
|
||||
<groupId>com.usthe.tancloud</groupId>
|
||||
<artifactId>collector</artifactId>
|
||||
<version>1.0</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<!-- spring -->
|
||||
<dependency>
|
||||
|
||||
@@ -1,81 +0,0 @@
|
||||
package com.usthe.warehouse.entrance;
|
||||
|
||||
import com.usthe.common.entity.message.CollectRep;
|
||||
import com.usthe.warehouse.MetricsDataQueue;
|
||||
import com.usthe.warehouse.WarehouseProperties;
|
||||
import com.usthe.warehouse.WarehouseWorkerPool;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 从Kafka消费指标组采集数据处理
|
||||
* @author tom
|
||||
* @date 2021/11/24 18:03
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(value = {WarehouseProperties.class})
|
||||
@ConditionalOnProperty(prefix = "warehouse.entrance.kafka",
|
||||
name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
@Slf4j
|
||||
public class KafkaDataConsume implements DisposableBean {
|
||||
|
||||
private KafkaConsumer<Long, CollectRep.MetricsData> consumer;
|
||||
private WarehouseWorkerPool workerPool;
|
||||
private MetricsDataQueue dataQueue;
|
||||
public KafkaDataConsume(WarehouseProperties properties, WarehouseWorkerPool workerPool,
|
||||
MetricsDataQueue dataQueue) {
|
||||
this.workerPool = workerPool;
|
||||
this.dataQueue = dataQueue;
|
||||
initConsumer(properties);
|
||||
startConsumeData();
|
||||
}
|
||||
|
||||
private void startConsumeData() {
|
||||
Runnable runnable = () -> {
|
||||
Thread.currentThread().setName("warehouse-kafka-data-consumer");
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
ConsumerRecords<Long, CollectRep.MetricsData> records = consumer.poll(Duration.ofMillis(100));
|
||||
records.forEach(record -> {
|
||||
dataQueue.addMetricsDataToInflux(record.value());
|
||||
dataQueue.addMetricsDataToRedis(record.value());
|
||||
});
|
||||
}
|
||||
};
|
||||
workerPool.executeJob(runnable);
|
||||
}
|
||||
|
||||
private void initConsumer(WarehouseProperties properties) {
|
||||
if (properties == null || properties.getEntrance() == null || properties.getEntrance().getKafka() == null) {
|
||||
log.error("init error, please config Warehouse kafka props in application.yml");
|
||||
throw new IllegalArgumentException("please config Warehouse kafka props");
|
||||
}
|
||||
WarehouseProperties.EntranceProperties.KafkaProperties kafkaProp = properties.getEntrance().getKafka();
|
||||
Properties consumerProp = new Properties();
|
||||
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProp.getServers());
|
||||
consumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProp.getGroupId());
|
||||
consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
|
||||
consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaMetricsDataDeserializer.class);
|
||||
consumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
|
||||
consumerProp.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||
consumer = new KafkaConsumer<>(consumerProp);
|
||||
consumer.subscribe(Collections.singleton(kafkaProp.getTopic()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (consumer != null) {
|
||||
consumer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
package com.usthe.warehouse.entrance;
|
||||
|
||||
import com.usthe.common.entity.message.CollectRep;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
/**
|
||||
* MetricsData的反序列化
|
||||
* @author tom
|
||||
* @date 2021/11/24 17:29
|
||||
*/
|
||||
@Slf4j
|
||||
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {
|
||||
|
||||
@Override
|
||||
public CollectRep.MetricsData deserialize(String topicName, byte[] bytes) {
|
||||
try {
|
||||
return CollectRep.MetricsData.parseFrom(bytes);
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,134 +0,0 @@
|
||||
package com.usthe.warehouse.store;
|
||||
|
||||
import com.influxdb.client.InfluxDBClient;
|
||||
import com.influxdb.client.InfluxDBClientFactory;
|
||||
import com.influxdb.client.WriteApi;
|
||||
import com.influxdb.client.WriteOptions;
|
||||
import com.influxdb.client.domain.WritePrecision;
|
||||
import com.influxdb.client.write.Point;
|
||||
import com.usthe.common.entity.message.CollectRep;
|
||||
import com.usthe.common.util.CommonConstants;
|
||||
import com.usthe.warehouse.MetricsDataQueue;
|
||||
import com.usthe.warehouse.WarehouseProperties;
|
||||
import com.usthe.warehouse.WarehouseWorkerPool;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* influxdb存储采集数据
|
||||
* @author tom
|
||||
* @date 2021/11/24 18:23
|
||||
*/
|
||||
@Configuration
|
||||
@AutoConfigureAfter(value = {WarehouseProperties.class})
|
||||
@ConditionalOnProperty(prefix = "warehouse.store.influxdb",
|
||||
name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
@Slf4j
|
||||
public class InfluxdbDataStorage implements DisposableBean {
|
||||
|
||||
private InfluxDBClient influxClient;
|
||||
private WriteApi writeApi;
|
||||
private WarehouseWorkerPool workerPool;
|
||||
private MetricsDataQueue dataQueue;
|
||||
|
||||
public InfluxdbDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
|
||||
MetricsDataQueue dataQueue) {
|
||||
this.workerPool = workerPool;
|
||||
this.dataQueue = dataQueue;
|
||||
initInfluxDbClient(properties);
|
||||
startStorageData();
|
||||
}
|
||||
|
||||
private void startStorageData() {
|
||||
Runnable runnable = () -> {
|
||||
Thread.currentThread().setName("warehouse-influxdb-data-storage");
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
CollectRep.MetricsData metricsData = dataQueue.pollInfluxMetricsData();
|
||||
if (metricsData != null) {
|
||||
saveData(metricsData);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
workerPool.executeJob(runnable);
|
||||
workerPool.executeJob(runnable);
|
||||
}
|
||||
|
||||
private void initInfluxDbClient(WarehouseProperties properties) {
|
||||
if (properties == null || properties.getStore() == null || properties.getStore().getInfluxdb() == null) {
|
||||
log.error("init error, please config Warehouse influxdb props in application.yml");
|
||||
throw new IllegalArgumentException("please config Warehouse influxdb props");
|
||||
}
|
||||
WarehouseProperties.StoreProperties.InfluxdbProperties influxdbProp = properties.getStore().getInfluxdb();
|
||||
influxClient = InfluxDBClientFactory.create(influxdbProp.getServers(), influxdbProp.getToken().toCharArray(),
|
||||
influxdbProp.getOrg(), influxdbProp.getBucket());
|
||||
WriteOptions writeOptions = WriteOptions.builder()
|
||||
.batchSize(1000)
|
||||
.bufferLimit(1000)
|
||||
.jitterInterval(1000)
|
||||
.retryInterval(5000)
|
||||
.build();
|
||||
writeApi = influxClient.makeWriteApi(writeOptions);
|
||||
}
|
||||
|
||||
|
||||
public void saveData(CollectRep.MetricsData metricsData) {
|
||||
String measurement = metricsData.getApp() + "_" + metricsData.getMetrics();
|
||||
String monitorId = String.valueOf(metricsData.getId());
|
||||
Instant collectTime = Instant.ofEpochMilli(metricsData.getTime());
|
||||
|
||||
List<CollectRep.Field> fields = metricsData.getFieldsList();
|
||||
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
|
||||
Point point = Point.measurement(measurement)
|
||||
.addTag("id", monitorId)
|
||||
.addTag("instance", valueRow.getInstance())
|
||||
.time(collectTime, WritePrecision.MS);
|
||||
for (int index = 0; index < fields.size(); index++) {
|
||||
CollectRep.Field field = fields.get(index);
|
||||
String value = valueRow.getColumns(index);
|
||||
if (field.getType() == CommonConstants.TYPE_NUMBER) {
|
||||
// number data
|
||||
if (CommonConstants.NULL_VALUE.equals(value)) {
|
||||
point.addField(field.getName(), (Number) null);
|
||||
} else {
|
||||
try {
|
||||
double number = Double.parseDouble(value);
|
||||
point.addField(field.getName(), number);
|
||||
} catch (Exception e) {
|
||||
log.warn(e.getMessage());
|
||||
point.addField(field.getName(), (Number) null);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// string
|
||||
if (CommonConstants.NULL_VALUE.equals(value)) {
|
||||
point.addField(field.getName(), (String) null);
|
||||
} else {
|
||||
point.addField(field.getName(), value);
|
||||
}
|
||||
}
|
||||
}
|
||||
writeApi.writePoint(point);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (writeApi != null) {
|
||||
writeApi.close();
|
||||
}
|
||||
if (influxClient != null) {
|
||||
influxClient.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usthe.warehouse.store;
|
||||
|
||||
import com.usthe.collector.dispatch.export.MetricsDataExporter;
|
||||
import com.usthe.common.entity.message.CollectRep;
|
||||
import com.usthe.warehouse.MetricsDataQueue;
|
||||
import com.usthe.warehouse.WarehouseProperties;
|
||||
import com.usthe.warehouse.WarehouseWorkerPool;
|
||||
import io.lettuce.core.RedisClient;
|
||||
@@ -34,13 +34,12 @@ public class RedisDataStorage implements DisposableBean {
|
||||
private RedisClient redisClient;
|
||||
private StatefulRedisConnection<String, CollectRep.MetricsData> connection;
|
||||
private WarehouseWorkerPool workerPool;
|
||||
private MetricsDataQueue dataQueue;
|
||||
private MetricsDataExporter dataExporter;
|
||||
|
||||
public RedisDataStorage (WarehouseProperties properties, WarehouseWorkerPool workerPool,
|
||||
MetricsDataQueue dataQueue) {
|
||||
MetricsDataExporter dataExporter) {
|
||||
this.workerPool = workerPool;
|
||||
this.dataQueue = dataQueue;
|
||||
|
||||
this.dataExporter = dataExporter;
|
||||
initRedisClient(properties);
|
||||
startStorageData();
|
||||
}
|
||||
@@ -55,7 +54,7 @@ public class RedisDataStorage implements DisposableBean {
|
||||
Thread.currentThread().setName("warehouse-redis-data-storage");
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
CollectRep.MetricsData metricsData = dataQueue.pollRedisMetricsData();
|
||||
CollectRep.MetricsData metricsData = dataExporter.pollWarehouseRedisMetricsData();
|
||||
if (metricsData != null) {
|
||||
saveData(metricsData);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.usthe.warehouse.WarehouseProperties,\
|
||||
com.usthe.warehouse.MetricsDataQueue,\
|
||||
com.usthe.warehouse.WarehouseWorkerPool,\
|
||||
com.usthe.warehouse.entrance.KafkaDataConsume,\
|
||||
com.usthe.warehouse.store.InfluxdbDataStorage,\
|
||||
com.usthe.warehouse.store.RedisDataStorage,\
|
||||
com.usthe.warehouse.controller.MetricsDataController
|
||||
Reference in New Issue
Block a user