[collector]fix 采集任务超时监测线程处理异常 (#43)

This commit is contained in:
tomsun28
2022-03-20 21:42:37 +08:00
committed by GitHub
parent cc22196d4a
commit d169dac94d
3 changed files with 6 additions and 5 deletions

View File

@@ -32,7 +32,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
/** /**
* 指标组采集任务超时时间值 * 指标组采集任务超时时间值
*/ */
private static final long DURATION_TIME = 120_000L; private static final long DURATION_TIME = 240_000L;
/** /**
* 指标组采集任务优先级队列 * 指标组采集任务优先级队列
*/ */
@@ -94,7 +94,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
Thread.currentThread().setName("metrics-task-monitor"); Thread.currentThread().setName("metrics-task-monitor");
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
try { try {
// 检测每个指标组采集单元是否超时2分钟,超时则丢弃并返回异常 // 检测每个指标组采集单元是否超时4分钟,超时则丢弃并返回异常
long deadline = System.currentTimeMillis() - DURATION_TIME; long deadline = System.currentTimeMillis() - DURATION_TIME;
for (Map.Entry<String, MetricsTime> entry : metricsTimeoutMonitorMap.entrySet()) { for (Map.Entry<String, MetricsTime> entry : metricsTimeoutMonitorMap.entrySet()) {
MetricsTime metricsTime = entry.getValue(); MetricsTime metricsTime = entry.getValue();
@@ -165,7 +165,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
metricsSet.forEach(metricItem -> { metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect); jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout)); new MetricsTime(System.currentTimeMillis(), metrics, timeout));
}); });
} else { } else {
@@ -185,7 +185,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
metricsSet.forEach(metricItem -> { metricsSet.forEach(metricItem -> {
MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this); MetricsCollect metricsCollect = new MetricsCollect(metricItem, timeout, this);
jobRequestQueue.addJob(metricsCollect); jobRequestQueue.addJob(metricsCollect);
metricsTimeoutMonitorMap.put(job.getId() + metrics.getName(), metricsTimeoutMonitorMap.put(job.getId() + "-" + metrics.getName(),
new MetricsTime(System.currentTimeMillis(), metrics, timeout)); new MetricsTime(System.currentTimeMillis(), metrics, timeout));
}); });
} else { } else {

View File

@@ -166,7 +166,7 @@ public class Job {
return null; return null;
} }
if (!metricsSet.remove(metrics)) { if (!metricsSet.remove(metrics)) {
log.error("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.", log.warn("Job {} appId {} app {} metrics {} remove empty error in priorMetrics.",
id, monitorId, app, metrics.getName()); id, monitorId, app, metrics.getName());
} }
if (metricsSet.isEmpty()) { if (metricsSet.isEmpty()) {

View File

@@ -183,6 +183,7 @@ public class TdEngineDataStorage implements DisposableBean {
String createTableSql = String.format(CREATE_SUPER_TABLE_SQL, superTable, fieldSqlBuilder); String createTableSql = String.format(CREATE_SUPER_TABLE_SQL, superTable, fieldSqlBuilder);
try { try {
assert statement != null; assert statement != null;
log.info("[tdengine-data]: create {} use sql: {}.", superTable, createTableSql);
statement.execute(createTableSql); statement.execute(createTableSql);
statement.execute(insertDataSql); statement.execute(insertDataSql);
} catch (Exception createTableException) { } catch (Exception createTableException) {