diff --git a/pom.xml b/pom.xml index 69b1f68..8903da4 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,16 @@ lombok 1.18.6 + + ch.qos.logback + logback-classic + 1.2.13 + + + ch.qos.logback + logback-core + 1.2.13 + org.springframework.boot spring-boot-starter-amqp @@ -90,7 +100,6 @@ org.springframework spring-test - 4.3.6.RELEASE @@ -112,11 +121,11 @@ mybatis-generator-maven-plugin 1.3.5 - + mysql mysql-connector-java - 5.1.40 + 8.0.13 com.itfsw diff --git a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java index 503ab63..2c1b656 100644 --- a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java +++ b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java @@ -63,7 +63,7 @@ public class AmqpListener { * @throws Exception */ -// @RabbitListener(queues = "shengdilandevicelogall") + @RabbitListener(queues = "shengdilandevicelogall") public void deviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { if (StringUtils.isEmpty(message)) { channel.basicAck(deliveryTag, false); @@ -88,7 +88,7 @@ public class AmqpListener { * @throws Exception */ -// @RabbitListener(queues = "shengdilandevicelogupdate") + @RabbitListener(queues = "shengdilandevicelogupdate") public void updateDeviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { if (StringUtils.isEmpty(message)) { channel.basicAck(deliveryTag, false); diff --git a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java index f438666..c5e23cd 100644 --- a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java +++ b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java @@ -20,6 +20,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -52,7 +54,24 @@ public class DeviceDataService { DeviceInfoMapper deviceInfoMapper; @Autowired AmqpService amqpService; + private InfluxDBClient influxDBClient; + private InfluxDBClient oldInfluxDBClient; + @PostConstruct + public void init() { + this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org); + this.oldInfluxDBClient = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); + } + + @PreDestroy + public void destroy() { + if (influxDBClient != null) { + influxDBClient.close(); + } + if (oldInfluxDBClient != null) { + oldInfluxDBClient.close(); + } + } /** * 根据设备号查询设备历史数据 * @@ -165,7 +184,7 @@ public class DeviceDataService { */ public List getOldInfluxdbData(String imei) { DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); - InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); +// InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); StringBuffer query = new StringBuffer(); query.append("from(bucket: \"iot\") "); query.append("|> range(start: -1y)"); @@ -175,7 +194,10 @@ public class DeviceDataService { // } query.append(" |> yield(name: \"last\")"); System.out.println("查询语句==========:" + query); - List tables = client.getQueryApi().query(query.toString()); + if(oldInfluxDBClient==null){ + oldInfluxDBClient = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); + } + List tables = oldInfluxDBClient.getQueryApi().query(query.toString()); List rerurnList = new ArrayList<>(); for (FluxTable fluxTable : tables) { List records = fluxTable.getRecords(); @@ -198,7 +220,7 @@ public class DeviceDataService { } } } - client.close(); +// client.close(); Collections.reverse(rerurnList); return rerurnList; } @@ -230,9 +252,9 @@ public class DeviceDataService { if (createtime != null) { time = createtime.toInstant(); } - InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); + // 复用已创建的客户端实例 DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); - try (WriteApi writeApi = client.getWriteApi()) { + try (WriteApi writeApi = influxDBClient.getWriteApi()) { writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData); } catch (Exception e) { e.printStackTrace(); @@ -337,9 +359,12 @@ public class DeviceDataService { List influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei().toLowerCase(Locale.ROOT)); if (influxdbDataList != null && influxdbDataList.size() > 0) { for (DeviceDataInfluxData influxData : influxdbDataList) { - InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); +// InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); //3.将设备数据保存到influxdb中 - try (WriteApi writeApi = client.getWriteApi()) { + if(influxDBClient== null){ + influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray()); + } + try (WriteApi writeApi = influxDBClient.getWriteApi()) { writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, influxData); syncDeviceData = true; } catch (Exception e) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0a49669..7ef22a9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,4 +1,3 @@ -spring.profiles.active=prod server.port=8898 compute.schedule=0 offline.gap=120