From bd6fd4e7a76071b474a276fb5871aa7fc21510de Mon Sep 17 00:00:00 2001 From: bgy Date: Fri, 19 Dec 2025 17:55:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9B=B4=E6=96=B0=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E4=B8=8B=E5=8F=91=E6=97=A5=E5=BF=97=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E3=80=82=E4=BC=98=E5=8C=96=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E4=B8=8B=E5=8F=91=E6=97=A5=E5=BF=97=E4=BF=9D?= =?UTF-8?q?=E5=AD=98=E5=8A=9F=E8=83=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../influxdb/controller/DeviceDataController.java | 2 +- .../influxdb/controller/DeviceLogController.java | 10 +- .../com/topsail/influxdb/entity/DeviceLogData.java | 1 + .../influxdb/entity/DeviceLogInfluxData.java | 2 + .../com/topsail/influxdb/entity/SupplierVO.java | 59 ++++++++++ .../topsail/influxdb/mapper/DeviceInfoMapper.java | 3 + .../topsail/influxdb/mapper/DeviceInfoMapper.xml | 19 ++-- .../topsail/influxdb/rabbitmq/AmqpListener.java | 29 ++++- .../influxdb/service/DeviceDataService.java | 9 +- .../topsail/influxdb/service/DeviceLogService.java | 123 +++++++++++++++++---- 10 files changed, 222 insertions(+), 35 deletions(-) create mode 100644 src/main/java/com/topsail/influxdb/entity/SupplierVO.java diff --git a/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java b/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java index a85ac2d..2990a81 100644 --- a/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java +++ b/src/main/java/com/topsail/influxdb/controller/DeviceDataController.java @@ -41,7 +41,7 @@ public class DeviceDataController { /** * 删除所有的设备历史数据 */ - @RequestMapping(value = "/shengDiLandelete", method = RequestMethod.GET) + @RequestMapping(value = "/shengDiLanDelete", method = RequestMethod.GET) public Result> deleteDeviceData() throws ParseException, BindException { List list = new ArrayList<>(); // deviceDataService.deleteDeviceData(); diff --git a/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java b/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java index 7e6ec32..1a09502 100644 --- a/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java +++ b/src/main/java/com/topsail/influxdb/controller/DeviceLogController.java @@ -38,17 +38,17 @@ public class DeviceLogController { * @return */ @GetMapping(value = "/getShengDiLanDeviceLog") - public Result getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) throws ParseException, BindException { - JSONObject jsonObject = deviceLogService.getPageDeviceLog(pageNode, pageSize, startTime, endTime, result, imei, supplierId, companyId, operator, houseId, bindingInfo); + public Result getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String statusIssue, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) throws ParseException, BindException { + JSONObject jsonObject = deviceLogService.getPageDeviceLog(pageNode, pageSize, startTime, endTime, result, statusIssue, imei, supplierId, companyId, operator, houseId, bindingInfo); return Result.success(jsonObject); } /** * 删除所有的设备下发日志 */ - @RequestMapping(value = "/shengDiLandelete", method = RequestMethod.GET) - public Result delete() throws ParseException, BindException { -// deviceLogService.deleteDeviceLog(); + @RequestMapping(value = "/shengDiLanDelete", method = RequestMethod.GET) + public Result delete(@RequestParam (name = "id",required = false) Integer id) throws ParseException, BindException { + deviceLogService.deleteDeviceLog(id); return Result.success(new CodeMsg(0, "success")); } } diff --git a/src/main/java/com/topsail/influxdb/entity/DeviceLogData.java b/src/main/java/com/topsail/influxdb/entity/DeviceLogData.java index 546ab14..3a0d8f3 100644 --- a/src/main/java/com/topsail/influxdb/entity/DeviceLogData.java +++ b/src/main/java/com/topsail/influxdb/entity/DeviceLogData.java @@ -20,6 +20,7 @@ public class DeviceLogData { private Integer companyId; private Integer id; private Integer supplierId; + private String supplierName; private String imei; // 设备imei号 private String operation; // 操作 private String params; // 参数 diff --git a/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java b/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java index 96a3f1d..df24d64 100644 --- a/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java +++ b/src/main/java/com/topsail/influxdb/entity/DeviceLogInfluxData.java @@ -7,6 +7,8 @@ import java.time.Instant; @Measurement(name = "devicelog") public class DeviceLogInfluxData { + @Column(tag = true) + public Integer id; @Column(tag = true) public String imei; @Column(tag = true) diff --git a/src/main/java/com/topsail/influxdb/entity/SupplierVO.java b/src/main/java/com/topsail/influxdb/entity/SupplierVO.java new file mode 100644 index 0000000..5e02eb0 --- /dev/null +++ b/src/main/java/com/topsail/influxdb/entity/SupplierVO.java @@ -0,0 +1,59 @@ +package com.topsail.influxdb.entity; + +import lombok.Data; + +import java.io.Serializable; + + +/** + * + * 供应商表实体 + * + * @version + * +
+ * Author	Version		Date		Changes
+ * wg 	1.0  2020年08月26日 Created
+ *
+ * 
+ * @since 1. + */ +@Data +public class SupplierVO implements Serializable { + private static final long serialVersionUID = 5342511411663841L; + + /** + * 所属公司 + */ + private Integer companyId; + /** + *ID + */ + private Integer id; + /** + *供应商名字 + */ + private String supplierName; + /** + *供应商负责人名字 + */ + private String supplierPrincipalName; + /** + *供应商负责人电话 + */ + private String supplierPrincipalPhone; + /** + *供应商负责人邮箱 + */ + private String supplierPrincipalEmail; + /** + *供应商密钥 + */ + private String supplierSecret; + /** + *下发命令url + */ + private String issueUrl; + + +} \ No newline at end of file diff --git a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java index 458d3c8..1f93bae 100644 --- a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java +++ b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.java @@ -2,6 +2,7 @@ package com.topsail.influxdb.mapper; import com.topsail.influxdb.entity.DeviceBelongInfo; import com.topsail.influxdb.entity.DeviceLogData; +import com.topsail.influxdb.entity.SupplierVO; import com.topsail.influxdb.entity.SyncDataFlag; import org.apache.ibatis.annotations.Param; @@ -42,4 +43,6 @@ public interface DeviceInfoMapper { void updateSyncDeviceDataFlagInfo(@Param("id") Integer id, @Param("flag") Integer flag); void updateSyncDeviceLogFlagInfo(@Param("id") Integer id, @Param("flag") Integer flag); + + List getSupplierList(); } \ No newline at end of file diff --git a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml index e6ca110..a35a5b6 100644 --- a/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml +++ b/src/main/java/com/topsail/influxdb/mapper/DeviceInfoMapper.xml @@ -84,13 +84,13 @@ UPDATE shengdilan_sync_data_flag SET sync_device_data = #{flag}, - update_time = NOW() + update_time = NOW() WHERE id = #{id} UPDATE shengdilan_sync_data_flag SET sync_device_log = #{flag}, - update_time = NOW() + update_time = NOW() WHERE id = #{id} + \ No newline at end of file diff --git a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java index 6e6f088..503ab63 100644 --- a/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java +++ b/src/main/java/com/topsail/influxdb/rabbitmq/AmqpListener.java @@ -37,7 +37,7 @@ public class AmqpListener { * @throws Exception */ -// @RabbitListener(queues = "shengdilandevicedata2") + @RabbitListener(queues = "shengdilandevicedataall") public void deviceDataMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { if (StringUtils.isEmpty(message)) { channel.basicAck(deliveryTag, false); @@ -63,7 +63,7 @@ public class AmqpListener { * @throws Exception */ -// @RabbitListener(queues = "shengdilandevicelog") +// @RabbitListener(queues = "shengdilandevicelogall") public void deviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { if (StringUtils.isEmpty(message)) { channel.basicAck(deliveryTag, false); @@ -79,4 +79,29 @@ public class AmqpListener { LOG.info("saveDeviceLogToInfluxdb Error:(" + e + ")-" + message); } } + /** + * 更新设备下发命令日志 + * + * @param message + * @param deliveryTag + * @param channel + * @throws Exception + */ + +// @RabbitListener(queues = "shengdilandevicelogupdate") + public void updateDeviceLogMqListener(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception { + if (StringUtils.isEmpty(message)) { + channel.basicAck(deliveryTag, false); + return; + } + try { + DeviceLogData deviceLogData = JSON.parseObject(message, DeviceLogData.class); + deviceLogService.updateDeviceLogMqListener(deviceLogData); + LOG.info("saveDeviceLogToInfluxdb OK:" + deviceLogData.getImei()); + channel.basicAck(deliveryTag, false); + } catch (Exception e) { + channel.basicNack(deliveryTag, false, true); + LOG.info("saveDeviceLogToInfluxdb Error:(" + e + ")-" + message); + } + } } diff --git a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java index 4a7a7a0..f438666 100644 --- a/src/main/java/com/topsail/influxdb/service/DeviceDataService.java +++ b/src/main/java/com/topsail/influxdb/service/DeviceDataService.java @@ -325,8 +325,14 @@ public class DeviceDataService { //1.查询所有的设备编号 List syncDataFlags = deviceInfoMapper.querySyncDeviceDataFlagInfo(imei); //2.根据设备编号查询历史Influxdb所有的设备数据 + Boolean flag = true; if (syncDataFlags != null && syncDataFlags.size() > 0) { - for (SyncDataFlag syncDataFlag : syncDataFlags) { + for (int i = 0; i < syncDataFlags.size(); i++) { + if (!flag) { + syncDataFlags = deviceInfoMapper.querySyncDeviceDataFlagInfo(null); + } + int index = new Random().nextInt(syncDataFlags.size()); + SyncDataFlag syncDataFlag = syncDataFlags.get(index); Boolean syncDeviceData = false; List influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei().toLowerCase(Locale.ROOT)); if (influxdbDataList != null && influxdbDataList.size() > 0) { @@ -354,6 +360,7 @@ public class DeviceDataService { //4.更新同步设备数据状态 deviceInfoMapper.updateSyncDeviceDataFlagInfo(syncDataFlag.getId(), 1); } + flag = false; } } } diff --git a/src/main/java/com/topsail/influxdb/service/DeviceLogService.java b/src/main/java/com/topsail/influxdb/service/DeviceLogService.java index c14e088..6cc4b9e 100644 --- a/src/main/java/com/topsail/influxdb/service/DeviceLogService.java +++ b/src/main/java/com/topsail/influxdb/service/DeviceLogService.java @@ -9,19 +9,18 @@ import com.influxdb.client.WriteApi; import com.influxdb.client.domain.WritePrecision; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; -import com.topsail.influxdb.entity.DeviceBelongInfo; -import com.topsail.influxdb.entity.DeviceLogData; -import com.topsail.influxdb.entity.DeviceLogInfluxData; -import com.topsail.influxdb.entity.SyncDataFlag; +import com.topsail.influxdb.entity.*; import com.topsail.influxdb.mapper.DeviceInfoMapper; import com.topsail.influxdb.rabbitmq.service.AmqpService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -33,6 +32,27 @@ import java.util.*; @Service public class DeviceLogService { public static final Logger LOG = LoggerFactory.getLogger(DeviceLogService.class); + private static volatile Map supplierInfoMap = new HashMap<>(); + + // 初始化时加载圣地蓝项目信息(例如,在构造函数或@PostConstruct方法中) + @PostConstruct + public void initShendianlanProjects() { + if (supplierInfoMap.isEmpty()) { + synchronized (DeviceLogService.class) { + if (supplierInfoMap.isEmpty()) { + List supplierList = deviceInfoMapper.getSupplierList(); + if (supplierList != null && !supplierList.isEmpty()) { + //查询出属于圣地蓝项目的IMEI号 + for (SupplierVO supplierVO : supplierList) { + Integer id = supplierVO.getId(); + String name = supplierVO.getSupplierName(); + supplierInfoMap.put(id, name); + } + } + } + } + } + } // InfluxDB基础配置 private static final String LOG_BUCKET_NAME = "devicelog"; @@ -61,10 +81,14 @@ public class DeviceLogService { return; } Date createtime = deviceLogData.getCreatetime(); + if (createtime == null) { + createtime = new Date(); + } //将时间转换成Instant Instant time = createtime.toInstant(); InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); DeviceLogInfluxData deviceLogInfluxData = new DeviceLogInfluxData(); + deviceLogInfluxData.id = deviceLogData.getId(); deviceLogInfluxData.imei = deviceLogData.getImei(); //从内部移动处出来,value有没有数值都进行插入 deviceLogInfluxData.result = deviceLogData.getResult(); @@ -91,19 +115,26 @@ public class DeviceLogService { /** * 删除掉所有的Influxdb数据 */ - public void deleteDeviceLog() { + public void deleteDeviceLog(Integer deviceLogId) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); - StringBuffer query = new StringBuffer(); - query.append("from(bucket: \"devicelog\") "); - query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); - query.append("|> range(start: -36d)"); - System.out.println("查询语句==========:" + query); + // 1. 拼接公共过滤条件片段(总条数和分页查询共用) + StringBuilder predicates = new StringBuilder(); + // 2. 拼接固定过滤条件(_measurement和_field,必选) + predicates.append("_measurement=devicelog"); + if (deviceLogId != null) { + if (predicates.length() > 0) { + predicates.append(" and ").append("id = ").append(deviceLogId); + } + } + System.out.println(predicates.toString()); DeleteApi deleteApi = client.getDeleteApi(); - OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); try { - // Delete data with specific time range - deleteApi.delete(start, stop, "", LOG_BUCKET_NAME, org); +// // 删除指定时间范围内所有数据 + OffsetDateTime start = OffsetDateTime.of(calendar.get(Calendar.YEAR) - 10, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + OffsetDateTime stop = OffsetDateTime.of(calendar.get(Calendar.YEAR) + 1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + deleteApi.delete(start, stop, predicates.toString(), LOG_BUCKET_NAME, org); System.out.println("Data deleted successfully"); } catch (Exception e) { e.printStackTrace(); @@ -130,7 +161,7 @@ public class DeviceLogService { * @param houseId 房间id * @return */ - public JSONObject getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) { + public JSONObject getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String statusIssue, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) { JSONObject resultSet = new JSONObject(); InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); // 1. 拼接公共过滤条件片段(总条数和分页查询共用) @@ -162,6 +193,9 @@ public class DeviceLogService { if (result != null && !result.trim().isEmpty()) { filterFragment.append(String.format(" |> filter(fn: (r) => r[\"result\"] == \"%s\")", result.trim())); } + if (statusIssue != null && !statusIssue.trim().isEmpty()) { + filterFragment.append(String.format(" |> filter(fn: (r) => r[\"statusIssue\"] == \"%s\")", statusIssue.trim())); + } // 5. 动态拼接supplierId过滤条件(仅当supplierId非空且非空白字符串时拼接) if (supplierId != null && !supplierId.trim().isEmpty()) { filterFragment.append(String.format(" |> filter(fn: (r) => r[\"supplierId\"] == \"%s\")", supplierId.trim())); @@ -203,8 +237,8 @@ public class DeviceLogService { dataQuery.append(filterFragment); // 复用过滤条件 dataQuery.append(" |> sort(columns:[\"_time\"], desc:true)"); // 处理分页参数默认值 - int finalPageNum = pageNode != null ? pageNode:DEFAULT_PAGE_NUM; - int finalPageSize = pageSize != null ? pageSize:DEFAULT_PAGE_SIZE; + int finalPageNum = pageNode != null ? pageNode : DEFAULT_PAGE_NUM; + int finalPageSize = pageSize != null ? pageSize : DEFAULT_PAGE_SIZE; int offset = (finalPageNum - 1) * finalPageSize; // dataQuery.append(" |> limit(n: " + finalPageSize + ", offset: " + offset + ")"); dataQuery.append(" |> yield(name: \"data\")"); // 标记结果集为data @@ -229,6 +263,9 @@ public class DeviceLogService { for (FluxRecord fluxRecord : records) { String value = (String) fluxRecord.getValueByKey("_value"); DeviceLogData data = JSONObject.parseObject(value, DeviceLogData.class); + if (data.getSupplierId() != null) { + data.setSupplierName(supplierInfoMap.get(data.getSupplierId())); + } rerurnList.add(data); } } @@ -236,7 +273,9 @@ public class DeviceLogService { Collections.reverse(rerurnList); resultSet.put("count", totalCount); //对结果按照分页要求截取构造数据 - rerurnList = rerurnList.subList(offset, Math.min(offset + finalPageSize, rerurnList.size())); + if (rerurnList != null && rerurnList.size() > 0) { + rerurnList = rerurnList.subList(offset, Math.min(offset + finalPageSize, rerurnList.size())); + } resultSet.put("list", rerurnList); return resultSet; } @@ -271,12 +310,13 @@ public class DeviceLogService { String imei = syncDataFlag.getImei(); List deviceLogDataList = deviceInfoMapper.queryDeviceLogData(imei, companyId); if (deviceLogDataList != null && deviceLogDataList.size() > 0) { - DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); + DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); //3. 批量插入设备命令下发日志数据到influxdb中 for (DeviceLogData deviceLogData : deviceLogDataList) { deviceLogData.setDeviceBelongInfo(deviceBelongInfo != null ? deviceBelongInfo.getDeviceBelongInfo() : null); deviceLogData.setHouseId(deviceBelongInfo != null ? deviceBelongInfo.getHouseId() : null); try { + deviceLogData.setCompanyId(companyId); saveDeviceLogToInfluxdb(deviceLogData); flag = true; } catch (Exception e) { @@ -295,4 +335,49 @@ public class DeviceLogService { } } } + + /** + * 更新设备命令下发日志数据 + */ + public void updateDeviceLogMqListener(DeviceLogData deviceLogData) { + //步骤1:查出ID对应的日志 + InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); + // 1. 拼接公共过滤条件片段(总条数和分页查询共用) + StringBuilder filterFragment = new StringBuilder(); + filterFragment.append("from(bucket: \"devicelog\") "); + filterFragment.append("|> range(start: -360d)"); + // 2. 拼接固定过滤条件(_measurement和_field,必选) + filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")"); + filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"logjson\")"); + // 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接) + filterFragment.append(String.format(" |> filter(fn: (r) => r[\"id\"] == \"%s\")", deviceLogData.getId())); + // 3. 构建分页数据查询子句(yield命名为data) + StringBuilder dataQuery = new StringBuilder(); + dataQuery.append(filterFragment); // 复用过滤条件 + dataQuery.append(" |> sort(columns:[\"_time\"], desc:true)"); + dataQuery.append(" |> yield(name: \"data\")"); // 标记结果集为data + System.out.println("查询数据语句:" + dataQuery.toString()); + //查询数据结果 + List dataResults = client.getQueryApi().query(dataQuery.toString()); + for (FluxTable fluxTable : dataResults) { + List records = fluxTable.getRecords(); + for (FluxRecord fluxRecord : records) { + String value = (String) fluxRecord.getValueByKey("_value"); + DeviceLogData data = JSONObject.parseObject(value, DeviceLogData.class); + //构造数据 + DeviceLogData newData = new DeviceLogData(); + BeanUtils.copyProperties(data, newData); + newData.setId(deviceLogData.getId()); + newData.setFeedbackValue(deviceLogData.getFeedbackValue()); + newData.setStatusIssue(deviceLogData.getStatusIssue()); + //步骤2:删除该日志 + //删除设备命令下发日志数据 + deleteDeviceLog(deviceLogData.getId()); + //步骤3:更新该日志 + //保存设备命令下发日志数据到influxdb中 + saveDeviceLogToInfluxdb(newData); + } + } + client.close(); + } }