|
@@ -7,7 +7,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
import org.apache.kafka.common.serialization.LongSerializer;
|
|
import org.apache.kafka.common.serialization.LongSerializer;
|
|
|
-import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
|
|
|
|
+import org.springframework.beans.factory.DisposableBean;
|
|
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
|
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.context.annotation.Configuration;
|
|
@@ -24,7 +24,7 @@ import java.util.Properties;
|
|
|
name = "enabled", havingValue = "true", matchIfMissing = true)
|
|
name = "enabled", havingValue = "true", matchIfMissing = true)
|
|
|
@AutoConfigureAfter(value = {DispatchProperties.class})
|
|
@AutoConfigureAfter(value = {DispatchProperties.class})
|
|
|
@Slf4j
|
|
@Slf4j
|
|
|
-public class KafkaDataExporter {
|
|
|
|
|
|
|
+public class KafkaDataExporter implements DisposableBean {
|
|
|
|
|
|
|
|
KafkaProducer<Long, CollectRep.MetricsData> kafkaProducer;
|
|
KafkaProducer<Long, CollectRep.MetricsData> kafkaProducer;
|
|
|
DispatchProperties.ExportProperties.KafkaProperties kafkaProperties;
|
|
DispatchProperties.ExportProperties.KafkaProperties kafkaProperties;
|
|
@@ -52,4 +52,11 @@ public class KafkaDataExporter {
|
|
|
log.error("kafkaProducer is not enable");
|
|
log.error("kafkaProducer is not enable");
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void destroy() throws Exception {
|
|
|
|
|
+ if (kafkaProducer != null) {
|
|
|
|
|
+ kafkaProducer.close();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|