Browse Source

优化同步历史设备数据功能

master
bgy 1 week ago
parent
commit
66c1b843dd
4 changed files with 46 additions and 13 deletions
  1. +12
    -3
      pom.xml
  2. +2
    -2
      src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java
  3. +32
    -7
      src/main/java/com/topsail/influxdb/service/DeviceDataService.java
  4. +0
    -1
      src/main/resources/application.properties

+ 12
- 3
pom.xml View File

@ -40,6 +40,16 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.6</version> <version>1.18.6</version>
</dependency> </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.13</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId> <artifactId>spring-boot-starter-amqp</artifactId>
@ -90,7 +100,6 @@
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId> <artifactId>spring-test</artifactId>
<version>4.3.6.RELEASE</version>
</dependency> </dependency>
<dependency> <dependency>
@ -112,11 +121,11 @@
<artifactId>mybatis-generator-maven-plugin</artifactId> <artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.5</version> <version>1.3.5</version>
<dependencies> <dependencies>
<!-- jdbc 依赖 -->
<!-- 使用与主依赖相同的 MySQL 驱动版本 -->
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
<version>8.0.13</version> <!-- 统一使用主依赖的版本 -->
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.itfsw</groupId> <groupId>com.itfsw</groupId>


+ 2
- 2
src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java View File

@ -63,7 +63,7 @@ public class AmqpListener {
* @throws Exception * @throws Exception
*/ */
// @RabbitListener(queues = "shengdilandevicelogall")
@RabbitListener(queues = "shengdilandevicelogall")
public void deviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { public void deviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception {
if (StringUtils.isEmpty(message)) { if (StringUtils.isEmpty(message)) {
channel.basicAck(deliveryTag, false); channel.basicAck(deliveryTag, false);
@ -88,7 +88,7 @@ public class AmqpListener {
* @throws Exception * @throws Exception
*/ */
// @RabbitListener(queues = "shengdilandevicelogupdate")
@RabbitListener(queues = "shengdilandevicelogupdate")
public void updateDeviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { public void updateDeviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception {
if (StringUtils.isEmpty(message)) { if (StringUtils.isEmpty(message)) {
channel.basicAck(deliveryTag, false); channel.basicAck(deliveryTag, false);


+ 32
- 7
src/main/java/com/topsail/influxdb/service/DeviceDataService.java View File

@ -20,6 +20,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -52,7 +54,24 @@ public class DeviceDataService {
DeviceInfoMapper deviceInfoMapper; DeviceInfoMapper deviceInfoMapper;
@Autowired @Autowired
AmqpService amqpService; AmqpService amqpService;
private InfluxDBClient influxDBClient;
private InfluxDBClient oldInfluxDBClient;
@PostConstruct
public void init() {
this.influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org);
this.oldInfluxDBClient = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg);
}
@PreDestroy
public void destroy() {
if (influxDBClient != null) {
influxDBClient.close();
}
if (oldInfluxDBClient != null) {
oldInfluxDBClient.close();
}
}
/** /**
* 根据设备号查询设备历史数据 * 根据设备号查询设备历史数据
* *
@ -165,7 +184,7 @@ public class DeviceDataService {
*/ */
public List<DeviceDataInfluxData> getOldInfluxdbData(String imei) { public List<DeviceDataInfluxData> getOldInfluxdbData(String imei) {
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei);
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg);
// InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg);
StringBuffer query = new StringBuffer(); StringBuffer query = new StringBuffer();
query.append("from(bucket: \"iot\") "); query.append("from(bucket: \"iot\") ");
query.append("|> range(start: -1y)"); query.append("|> range(start: -1y)");
@ -175,7 +194,10 @@ public class DeviceDataService {
// } // }
query.append(" |> yield(name: \"last\")"); query.append(" |> yield(name: \"last\")");
System.out.println("查询语句==========:" + query); System.out.println("查询语句==========:" + query);
List<FluxTable> tables = client.getQueryApi().query(query.toString());
if(oldInfluxDBClient==null){
oldInfluxDBClient = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg);
}
List<FluxTable> tables = oldInfluxDBClient.getQueryApi().query(query.toString());
List<DeviceDataInfluxData> rerurnList = new ArrayList<>(); List<DeviceDataInfluxData> rerurnList = new ArrayList<>();
for (FluxTable fluxTable : tables) { for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords(); List<FluxRecord> records = fluxTable.getRecords();
@ -198,7 +220,7 @@ public class DeviceDataService {
} }
} }
} }
client.close();
// client.close();
Collections.reverse(rerurnList); Collections.reverse(rerurnList);
return rerurnList; return rerurnList;
} }
@ -230,9 +252,9 @@ public class DeviceDataService {
if (createtime != null) { if (createtime != null) {
time = createtime.toInstant(); time = createtime.toInstant();
} }
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray());
// 复用已创建的客户端实例
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time);
try (WriteApi writeApi = client.getWriteApi()) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData); writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -337,9 +359,12 @@ public class DeviceDataService {
List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei().toLowerCase(Locale.ROOT)); List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei().toLowerCase(Locale.ROOT));
if (influxdbDataList != null && influxdbDataList.size() > 0) { if (influxdbDataList != null && influxdbDataList.size() > 0) {
for (DeviceDataInfluxData influxData : influxdbDataList) { for (DeviceDataInfluxData influxData : influxdbDataList) {
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray());
// InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray());
//3.将设备数据保存到influxdb中 //3.将设备数据保存到influxdb中
try (WriteApi writeApi = client.getWriteApi()) {
if(influxDBClient== null){
influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray());
}
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, influxData); writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, influxData);
syncDeviceData = true; syncDeviceData = true;
} catch (Exception e) { } catch (Exception e) {


+ 0
- 1
src/main/resources/application.properties View File

@ -1,4 +1,3 @@
spring.profiles.active=prod
server.port=8898 server.port=8898
compute.schedule=0 compute.schedule=0
offline.gap=120 offline.gap=120


Loading…
Cancel
Save