From ecaf533660cc12036f9138112e02757cf4e68dca Mon Sep 17 00:00:00 2001 From: bgy Date: Tue, 9 Dec 2025 14:45:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8E=86=E5=8F=B2=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E8=BD=AC=E5=AD=98=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../influxdb/controller/DeviceDataController.java | 3 +- .../influxdb/controller/DeviceLogController.java | 3 +- .../influxdb/controller/HistoryDataController.java | 55 ++++++ .../{model => entity}/DeviceDataInfluxData.java | 2 +- .../{model => entity}/DeviceLogInfluxData.java | 2 +- .../com/topsail/influxdb/entity/SyncDataFlag.java | 23 +++ .../topsail/influxdb/mapper/DeviceInfoMapper.java | 19 ++ .../topsail/influxdb/mapper/DeviceInfoMapper.xml | 25 ++- .../topsail/influxdb/rabbitmq/AmqpListener.java | 5 +- .../influxdb/service/DeviceDataService.java | 210 ++++++++++++++++----- .../topsail/influxdb/service/DeviceLogService.java | 79 +++++--- src/main/resources/application.properties | 18 +- 12 files changed, 360 insertions(+), 84 deletions(-) create mode 100644 src/main/java/com/topsail/influxdb/controller/HistoryDataController.java rename src/main/java/com/topsail/influxdb/{model => entity}/DeviceDataInfluxData.java (94%) rename src/main/java/com/topsail/influxdb/{model => entity}/DeviceLogInfluxData.java (96%) create mode 100644 src/main/java/com/topsail/influxdb/entity/SyncDataFlag.java diff --git a/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java b/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java index 0f501ad..d1482a3 100644 --- a/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java +++ b/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java @@ -38,8 +38,7 @@ public class DeviceDataController { } /** - * 查询单个设备历史数据(根据设备号查询设备历史数据) - * devices/history/{deviceid} + * 删除所有的设备历史数据 */ @RequestMapping(value = "/delete", method = RequestMethod.GET) public Result> deleteDeviceData() throws ParseException, BindException { diff --git a/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java b/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java index a7e2ef4..60ff3b8 100644 --- a/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java +++ b/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java @@ -44,8 +44,7 @@ public class DeviceLogController { } /** - * 查询单个设备历史数据(根据设备号查询设备历史数据) - * devices/history/{deviceid} + * 删除所有的设备下发日志 */ @RequestMapping(value = "/delete", method = RequestMethod.GET) public Result delete() throws ParseException, BindException { diff --git a/src/main/java/com/topsail/influxdb/controller/HistoryDataController.java b/src/main/java/com/topsail/influxdb/controller/HistoryDataController.java new file mode 100644 index 0000000..0694cf2 --- /dev/null +++ b/src/main/java/com/topsail/influxdb/controller/HistoryDataController.java @@ -0,0 +1,55 @@ +package com.topsail.influxdb.controller; + +import com.topsail.influxdb.entity.DeviceHistoryData; +import com.topsail.influxdb.result.Result; +import com.topsail.influxdb.service.DeviceDataService; +import com.topsail.influxdb.service.DeviceLogService; +import org.apache.ibatis.annotations.Param; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.BindException; +import org.springframework.web.bind.annotation.*; + +import java.text.ParseException; +import java.util.List; + +@RestController +@ResponseBody +@CrossOrigin +@RequestMapping(value = "/historyData") +public class HistoryDataController { + private Logger logger = LoggerFactory.getLogger(DeviceDataController.class); + + @Autowired + private DeviceDataService deviceDataService; + @Autowired + private DeviceLogService deviceLogService; + + /** + * 转存设备历史数据(根据设备号查询设备历史数据) + */ + @GetMapping(value = "/transferDeviceData") + public Result> transferDeviceData(@RequestParam(name = "imei") String imei) throws ParseException, BindException { + deviceDataService.transferDeviceData(imei); + return Result.success(null); + } + + /** + * 转存设备命令下发日志数据 + */ + @RequestMapping(value = "/transferDeviceLogData", method = RequestMethod.GET) + public Result> transferDeviceLogData(@RequestParam(name = "imei", required = false) String imei) throws ParseException, BindException { + deviceLogService.transferDeviceLogData(); + return Result.success(null); + } + /** + * 查询设备的历史数据条数 + */ + @RequestMapping(value = "/getHistoryDataCount", method = RequestMethod.GET) + public Result getHistoryDataCount(@RequestParam(name = "imei") String imei) throws ParseException, BindException { + Long deviceDataCount = deviceDataService.getDeviceDataCount(imei); + return Result.success(deviceDataCount); + } + +} diff --git a/src/main/java/com/topsail/influxdb/model/DeviceDataInfluxData.java b/src/main/java/com/topsail/influxdb/entity/DeviceDataInfluxData.java similarity index 94% rename from src/main/java/com/topsail/influxdb/model/DeviceDataInfluxData.java rename to src/main/java/com/topsail/influxdb/entity/DeviceDataInfluxData.java index 159a9b7..77af8b2 100644 --- a/src/main/java/com/topsail/influxdb/model/DeviceDataInfluxData.java +++ b/src/main/java/com/topsail/influxdb/entity/DeviceDataInfluxData.java @@ -1,4 +1,4 @@ -package com.topsail.influxdb.model; +package com.topsail.influxdb.entity; import com.influxdb.annotations.Column; import com.influxdb.annotations.Measurement; diff --git a/src/main/java/com/topsail/influxdb/model/DeviceLogInfluxData.java b/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java similarity index 96% rename from src/main/java/com/topsail/influxdb/model/DeviceLogInfluxData.java rename to src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java index 96193b3..96a3f1d 100644 --- a/src/main/java/com/topsail/influxdb/model/DeviceLogInfluxData.java +++ b/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java @@ -1,4 +1,4 @@ -package com.topsail.influxdb.model; +package com.topsail.influxdb.entity; import com.influxdb.annotations.Column; import com.influxdb.annotations.Measurement; diff --git a/src/main/java/com/topsail/influxdb/entity/SyncDataFlag.java b/src/main/java/com/topsail/influxdb/entity/SyncDataFlag.java new file mode 100644 index 0000000..b4abbad --- /dev/null +++ b/src/main/java/com/topsail/influxdb/entity/SyncDataFlag.java @@ -0,0 +1,23 @@ +package com.topsail.influxdb.entity; + + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 同步数据标记 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class SyncDataFlag { + private int id; + private String imei; + private int sync_device_data; + private int sync_device_log; +} diff --git a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java index 600e39f..e78c58e 100644 --- a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java +++ b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java @@ -1,8 +1,12 @@ package com.topsail.influxdb.mapper; import com.topsail.influxdb.entity.DeviceBelongInfo; +import com.topsail.influxdb.entity.DeviceLogData; +import com.topsail.influxdb.entity.SyncDataFlag; import org.apache.ibatis.annotations.Param; +import java.util.List; + public interface DeviceInfoMapper { /** * 查询设备的所属信息 @@ -11,4 +15,19 @@ public interface DeviceInfoMapper { * @return */ DeviceBelongInfo queryDeviceBelongInfo(@Param("imei") String imei); + + /** + * 查询圣地蓝设备同步标志信息 + * + * @return + */ + + List querySyncDeviceFlagInfo(); + + /** + * 根据设备编号查询设备命令下发日志数据 + * @param imei + * @return + */ + List queryDeviceLogData(@Param("imei") String imei, @Param("companyId") Integer companyId); } \ No newline at end of file diff --git a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml index f061dbd..383db80 100644 --- a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml +++ b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml @@ -84,7 +84,9 @@ + + \ No newline at end of file diff --git a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java index 948d9c7..9c0b9b2 100644 --- a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java +++ b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java @@ -9,7 +9,6 @@ import com.topsail.influxdb.service.DeviceDataService; import com.topsail.influxdb.service.DeviceLogService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; @@ -27,6 +26,7 @@ public class AmqpListener { DeviceLogService deviceLogService; @Autowired AmqpService amqpService; + /** * 监听设备历史数据 * @@ -53,6 +53,7 @@ public class AmqpListener { LOG.info("saveDeviceDataToInfluxdb Error:(" + e + ")-" + message); } } + /** * 监听设备下发命令日志 * @@ -62,7 +63,7 @@ public class AmqpListener { * @throws Exception */ - @RabbitListener(queues = "shengdilandevicelog") +// @RabbitListener(queues = "shengdilandevicelog") public void deviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { if (StringUtils.isEmpty(message)) { channel.basicAck(deliveryTag, false); diff --git a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java index 0fa48fb..5279445 100644 --- a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java +++ b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java @@ -9,24 +9,22 @@ import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import com.topsail.influxdb.entity.DeviceBelongInfo; +import com.topsail.influxdb.entity.DeviceDataInfluxData; import com.topsail.influxdb.entity.DeviceHistoryData; +import com.topsail.influxdb.entity.SyncDataFlag; import com.topsail.influxdb.mapper.DeviceInfoMapper; -import com.topsail.influxdb.model.DeviceDataInfluxData; import com.topsail.influxdb.pojo.History; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.TimeZone; +import java.util.*; @Service public class DeviceDataService { @@ -39,7 +37,15 @@ public class DeviceDataService { public String url; @Value("${shengdilan.influxdb.org}") private String org; - @Autowired + + @Value("${shengdilan.influxdb.oldtoken}") + public String oldtoken; + @Value("${shengdilan.influxdb.oldurl}") + public String oldurl; + @Value("${shengdilan.influxdb.oldorg}") + private String oldorg; + + @Resource DeviceInfoMapper deviceInfoMapper; /** @@ -97,6 +103,49 @@ public class DeviceDataService { return rerurnList; } + /** + * 获取要转存的历史influxdb数据 + */ + public List getOldInfluxdbData(String imei) { + DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); + InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); + StringBuffer query = new StringBuffer(); + query.append("from(bucket: \"iot\") "); + query.append("|> range(start: -1y)"); + query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei)); +// if (pageNo != null && pageSize != null) { +// query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNo - 1)); +// } + query.append(" |> yield(name: \"last\")"); + System.out.println("查询语句==========:" + query); + List tables = client.getQueryApi().query(query.toString()); + List rerurnList = new ArrayList<>(); + for (FluxTable fluxTable : tables) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + String value = (String) fluxRecord.getValueByKey("_value"); + History history = JSONObject.parseObject(value, History.class); + if (history != null) { + Date createtime = history.getSenddate(); + //将时间转换成Instant + Instant time = null; + if (createtime != null) { + time = createtime.toInstant(); + } + if (history.getDeviceBelongInfo() == null && deviceBelongInfo != null) { + history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); + history.setHouseId(deviceBelongInfo.getHouseId()); + } + DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); + rerurnList.add(deviceDataInfluxData); + } + } + } + client.close(); + Collections.reverse(rerurnList); + return rerurnList; + } + /** * 存储设备数据到influxdb * @@ -110,57 +159,69 @@ public class DeviceDataService { return; } //查询设备的所属信息 - DeviceBelongInfo deviceBelongInfo = getDeviceBelongInfo(history.getImei().trim()); - if (deviceBelongInfo == null) { - return; + if (history != null && (history.getDeviceBelongInfo() == null || history.getHouseId() == null)) { + DeviceBelongInfo deviceBelongInfo = getDeviceBelongInfo(history.getImei().trim()); + if (deviceBelongInfo == null) { + return; + } + history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); + history.setHouseId(deviceBelongInfo.getHouseId()); } - history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); - history.setHouseId(deviceBelongInfo.getHouseId()); - System.out.println("设备所属信息111111111111==========:" + deviceBelongInfo); - String influxvalue = "iot,imei=" + history.getImei() + " press=" + history.getSampledata() + " " + history.getTime().getTime(); - String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; - String bucket = "iot"; - String org = "shengdilan"; + Date createtime = history.getSenddate(); + //将时间转换成Instant + Instant time = null; + if (createtime != null) { + time = createtime.toInstant(); + } + InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); + DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); + try (WriteApi writeApi = client.getWriteApi()) { + writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData); + } + System.out.println("写入成功==========》" + history.getImei()); + } - InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); - DeviceDataInfluxData mem = new DeviceDataInfluxData(); - mem.imei = history.getImei(); + /** + * 构建Influxdb设备数据 + * + * @param history + */ + private DeviceDataInfluxData rebuildDeviceDataInfluxData(History history, Instant time) { + DeviceDataInfluxData deviceDataInfluxData = new DeviceDataInfluxData(); + deviceDataInfluxData.imei = history.getImei(); String value = history.getValue(); if (value != null && !value.equals("")) { String[] values = value.split(","); - mem.value1 = 0; - mem.value2 = 0; - mem.value3 = 0; - mem.value4 = 0; + deviceDataInfluxData.value1 = 0; + deviceDataInfluxData.value2 = 0; + deviceDataInfluxData.value3 = 0; + deviceDataInfluxData.value4 = 0; try { if (values.length > 0) { - mem.value1 = Double.parseDouble(values[0]); + deviceDataInfluxData.value1 = Double.parseDouble(values[0]); } if (values.length > 1) { - mem.value2 = Double.parseDouble(values[1]); + deviceDataInfluxData.value2 = Double.parseDouble(values[1]); } if (values.length > 2) { - mem.value3 = Double.parseDouble(values[2]); + deviceDataInfluxData.value3 = Double.parseDouble(values[2]); } if (values.length > 3) { - mem.value4 = Double.parseDouble(values[4]); + deviceDataInfluxData.value4 = Double.parseDouble(values[4]); } } catch (Exception e) { LOG.info(e.getMessage()); } } //从内部移动处出来,value有没有数值都进行插入 - mem.battery = history.getBatterylevel(); - mem.sigal = history.getSingalstrength(); - mem.jsondata = JSON.toJSONString(history); - mem.values = history.getValue(); - mem.unit = history.getUnit(); - mem.alarmtype = history.getAlarmtype(); - mem.time = Instant.now(); - try (WriteApi writeApi = client.getWriteApi()) { - writeApi.writeMeasurement(bucket, org, WritePrecision.NS, mem); - } - System.out.println("写入成功==========》" + history.getImei()); + deviceDataInfluxData.battery = history.getBatterylevel(); + deviceDataInfluxData.sigal = history.getSingalstrength(); + deviceDataInfluxData.jsondata = JSON.toJSONString(history); + deviceDataInfluxData.values = history.getValue(); + deviceDataInfluxData.unit = history.getUnit(); + deviceDataInfluxData.alarmtype = history.getAlarmtype(); + deviceDataInfluxData.time = time != null ? time : Instant.now(); + return deviceDataInfluxData; } /** @@ -174,10 +235,7 @@ public class DeviceDataService { * 删除掉所有的Influxdb数据 */ public void deleteDeviceData() { -// String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; -// String bucket = "iot"; -// String org = "shengdilan"; -// InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); +// InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray()); // StringBuffer query = new StringBuffer(); // query.append("from(bucket: \"iot\") "); // query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")")); @@ -188,7 +246,7 @@ public class DeviceDataService { // OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); // try { // // Delete data with specific time range -// deleteApi.delete(start, stop, "", bucket, org); +// deleteApi.delete(start, stop, "", DEVICEDATA_BUCKET_NAME, oldorg); // System.out.println("Data deleted successfully"); // } catch (Exception e) { // e.printStackTrace(); @@ -196,4 +254,68 @@ public class DeviceDataService { // client.close(); // } } + + /** + * 转存设备历史数据(根据设备号查询设备历史数据) + * + * @param imei + */ + public void transferDeviceData(String imei) { + //1.查询所有的设备编号 + List syncDataFlags = deviceInfoMapper.querySyncDeviceFlagInfo(); + //2.根据设备编号查询历史Influxdb所有的设备数据 + if (syncDataFlags != null && syncDataFlags.size() > 0) { + for (SyncDataFlag syncDataFlag : syncDataFlags) { + List influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei()); + if (influxdbDataList != null && influxdbDataList.size() > 0) { + for (DeviceDataInfluxData influxData : influxdbDataList) { + InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray()); + //3.将设备数据保存到influxdb中 + try (WriteApi writeApi = client.getWriteApi()) { + writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, oldorg, WritePrecision.NS, influxData); + } catch (Exception e) { + LOG.error("保存设备数据到influxdb失败:{}", e.getMessage()); + } + } + } + } + } + } + + /** + * 查询设备编码对应的设备数据记录数 + */ + public Long getDeviceDataCount(String imei) { + InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); + // 1. 拼接公共过滤条件片段(总条数和分页查询共用) + StringBuilder filterFragment = new StringBuilder(); + filterFragment.append("from(bucket: \"iot\") "); + filterFragment.append("|> range(start: -400d)"); + // 2. 拼接固定过滤条件(_measurement和_field,必选) + filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")"); + filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"jsondata\")"); + // 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接) + if (imei != null && !imei.trim().isEmpty()) { + filterFragment.append(String.format(" |> filter(fn: (r) => r[\"imei\"] == \"%s\")", imei.trim())); + } + // 5. 拼接固定排序条件(按时间倒序) + // 2. 构建总条数查询子句(yield命名为total) + StringBuilder totalQuery = new StringBuilder(); + totalQuery.append(filterFragment); // 复用过滤条件 + totalQuery.append(" |> count(column: \"_value\")"); // 统计总条数 + totalQuery.append(" |> yield(name: \"total\")\n"); // 标记结果集为total + System.out.println("查询数量语句:" + totalQuery.toString()); + //查询总条数 + long totalCount = 0; + List totalTables = client.getQueryApi().query(totalQuery.toString()); + for (FluxTable table : totalTables) { + for (FluxRecord record : table.getRecords()) { + Object total = record.getValue(); + if (total != null) { + totalCount = totalCount + ((Number) total).longValue(); + } + } + } + return totalCount; + } } diff --git a/src/main/java/com/topsail/influxdb/service/DeviceLogService.java b/src/main/java/com/topsail/influxdb/service/DeviceLogService.java index 5dc9034..b48d914 100644 --- a/src/main/java/com/topsail/influxdb/service/DeviceLogService.java +++ b/src/main/java/com/topsail/influxdb/service/DeviceLogService.java @@ -9,24 +9,24 @@ import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; +import com.topsail.influxdb.entity.DeviceBelongInfo; import com.topsail.influxdb.entity.DeviceLogData; -import com.topsail.influxdb.model.DeviceLogInfluxData; +import com.topsail.influxdb.entity.SyncDataFlag; +import com.topsail.influxdb.mapper.DeviceInfoMapper; +import com.topsail.influxdb.entity.DeviceLogInfluxData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.TimeZone; -import java.util.regex.Pattern; +import java.util.*; @Service public class DeviceLogService { @@ -44,6 +44,9 @@ public class DeviceLogService { private static final int DEFAULT_PAGE_NUM = 1; private static final int DEFAULT_PAGE_SIZE = 10; + @Resource + DeviceInfoMapper deviceInfoMapper; + /** * 存储设备数据到influxdb * @@ -53,6 +56,9 @@ public class DeviceLogService { if (deviceLogData == null || (StringUtils.isEmpty(deviceLogData.getImei()))) { return; } + Date createtime = deviceLogData.getCreatetime(); + //将时间转换成Instant + Instant time = createtime.toInstant(); InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); DeviceLogInfluxData deviceLogInfluxData = new DeviceLogInfluxData(); deviceLogInfluxData.imei = deviceLogData.getImei(); @@ -68,7 +74,7 @@ public class DeviceLogService { deviceLogInfluxData.feedbackValue = deviceLogData.getFeedbackValue(); deviceLogInfluxData.deviceBelongInfo = deviceLogData.getDeviceBelongInfo(); deviceLogInfluxData.houseId = deviceLogData.getHouseId(); - deviceLogInfluxData.time = Instant.now(); + deviceLogInfluxData.time = time != null ? time : Instant.now(); try (WriteApi writeApi = client.getWriteApi()) { writeApi.writeMeasurement(LOG_BUCKET_NAME, org, WritePrecision.NS, deviceLogInfluxData); } @@ -79,24 +85,24 @@ public class DeviceLogService { * 删除掉所有的Influxdb数据 */ public void deleteDeviceLog() { - InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); - StringBuffer query = new StringBuffer(); - query.append("from(bucket: \"devicelog\") "); - query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); - query.append("|> range(start: -36d)"); - System.out.println("查询语句==========:" + query); - DeleteApi deleteApi = client.getDeleteApi(); - OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - try { - // Delete data with specific time range - deleteApi.delete(start, stop, "", LOG_BUCKET_NAME, org); - System.out.println("Data deleted successfully"); - } catch (Exception e) { - e.printStackTrace(); - } finally { - client.close(); - } +// InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); +// StringBuffer query = new StringBuffer(); +// query.append("from(bucket: \"devicelog\") "); +// query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); +// query.append("|> range(start: -36d)"); +// System.out.println("查询语句==========:" + query); +// DeleteApi deleteApi = client.getDeleteApi(); +// OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); +// OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); +// try { +// // Delete data with specific time range +// deleteApi.delete(start, stop, "", LOG_BUCKET_NAME, org); +// System.out.println("Data deleted successfully"); +// } catch (Exception e) { +// e.printStackTrace(); +// } finally { +// client.close(); +// } } /** @@ -239,4 +245,27 @@ public class DeviceLogService { } return escaped; } + + /** + * 转存设备命令下发日志数据 + */ + public void transferDeviceLogData() { + //1. 查询所有的设备编号 + List syncDataFlags = deviceInfoMapper.querySyncDeviceFlagInfo(); + //2. 根据设备编号查询设备命令下发日志数据 + for (SyncDataFlag syncDataFlag : syncDataFlags) { + String imei = syncDataFlag.getImei(); + List deviceLogDataList = deviceInfoMapper.queryDeviceLogData(imei, 10); + if (deviceLogDataList != null && deviceLogDataList.size() > 0) { + DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); + //3. 批量插入设备命令下发日志数据到influxdb中 + for (DeviceLogData deviceLogData : deviceLogDataList) { + deviceLogData.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); + deviceLogData.setHouseId(deviceBelongInfo.getHouseId()); + saveDeviceLogToInfluxdb(deviceLogData); + } + } + } + + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index fd10b51..dd212a8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -6,17 +6,23 @@ spring.rabbitmq.password=topsail spring.rabbitmq.virtualHost=/ spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=10 - spring.datasource.url=jdbc:mysql://47.98.32.177:3306/iot?useSSL=false&useUnicode=true&characterEncoding=utf-8 spring.datasource.username=yunmei spring.datasource.password=yunmei1234 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver - compute.schedule=0 offline.gap=120 - spring.profiles.active=rds - -shengdilan.influxdb.token=_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug== +shengdilan.influxdb.token=PHeyRfQX30aLDrY3csu44DgUlF-jgJETi7bro5FMFoUpuADq1PSMWRzCKjNVvtGLCytiyRLEEmbAs_4x5vdR0w== shengdilan.influxdb.url=http://172.16.1.124:8086 -shengdilan.influxdb.org=shengdilan \ No newline at end of file +shengdilan.influxdb.org=shengdilan +/ ** +* 旧influxdb配置参数 +*/ +#shengdilan.influxdb.oldtoken=C2sfXsMC475aTtin7HbRkUXa9tEZTUU0S928ZdPzFktcFW8gZD_zY8-hKhgPxkLLodVS4YcsL3RcwgsJWYlURw== +#shengdilan.influxdb.oldurl=http://192.168.139.128:8086 +#shengdilan.influxdb.oldorg=shengdilan + +shengdilan.influxdb.oldtoken=k7dtXN5boSf9gbnUC75ekSJqILCMIcJ_nxxHLgxAYKqEtFSqqIfnELc2xHxuimdRYZgtbReQFs7qq_XWh04Z2w== +shengdilan.influxdb.oldurl=http://182.92.218.150:8086 +shengdilan.influxdb.oldorg=topsail