|
|
|
@ -9,19 +9,18 @@ import com.influxdb.client.WriteApi; |
|
|
|
import com.influxdb.client.domain.WritePrecision; |
|
|
|
import com.influxdb.query.FluxRecord; |
|
|
|
import com.influxdb.query.FluxTable; |
|
|
|
import com.topsail.influxdb.entity.DeviceBelongInfo; |
|
|
|
import com.topsail.influxdb.entity.DeviceLogData; |
|
|
|
import com.topsail.influxdb.entity.DeviceLogInfluxData; |
|
|
|
import com.topsail.influxdb.entity.SyncDataFlag; |
|
|
|
import com.topsail.influxdb.entity.*; |
|
|
|
import com.topsail.influxdb.mapper.DeviceInfoMapper; |
|
|
|
import com.topsail.influxdb.rabbitmq.service.AmqpService; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.BeanUtils; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.text.ParseException; |
|
|
|
import java.text.SimpleDateFormat; |
|
|
|
@ -33,6 +32,27 @@ import java.util.*; |
|
|
|
@Service |
|
|
|
public class DeviceLogService { |
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(DeviceLogService.class); |
|
|
|
private static volatile Map<Integer, String> supplierInfoMap = new HashMap<>(); |
|
|
|
|
|
|
|
// 初始化时加载圣地蓝项目信息(例如,在构造函数或@PostConstruct方法中) |
|
|
|
@PostConstruct |
|
|
|
public void initShendianlanProjects() { |
|
|
|
if (supplierInfoMap.isEmpty()) { |
|
|
|
synchronized (DeviceLogService.class) { |
|
|
|
if (supplierInfoMap.isEmpty()) { |
|
|
|
List<SupplierVO> supplierList = deviceInfoMapper.getSupplierList(); |
|
|
|
if (supplierList != null && !supplierList.isEmpty()) { |
|
|
|
//查询出属于圣地蓝项目的IMEI号 |
|
|
|
for (SupplierVO supplierVO : supplierList) { |
|
|
|
Integer id = supplierVO.getId(); |
|
|
|
String name = supplierVO.getSupplierName(); |
|
|
|
supplierInfoMap.put(id, name); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// InfluxDB基础配置 |
|
|
|
private static final String LOG_BUCKET_NAME = "devicelog"; |
|
|
|
@ -61,10 +81,14 @@ public class DeviceLogService { |
|
|
|
return; |
|
|
|
} |
|
|
|
Date createtime = deviceLogData.getCreatetime(); |
|
|
|
if (createtime == null) { |
|
|
|
createtime = new Date(); |
|
|
|
} |
|
|
|
//将时间转换成Instant |
|
|
|
Instant time = createtime.toInstant(); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); |
|
|
|
DeviceLogInfluxData deviceLogInfluxData = new DeviceLogInfluxData(); |
|
|
|
deviceLogInfluxData.id = deviceLogData.getId(); |
|
|
|
deviceLogInfluxData.imei = deviceLogData.getImei(); |
|
|
|
//从内部移动处出来,value有没有数值都进行插入 |
|
|
|
deviceLogInfluxData.result = deviceLogData.getResult(); |
|
|
|
@ -91,19 +115,26 @@ public class DeviceLogService { |
|
|
|
/** |
|
|
|
* 删除掉所有的Influxdb数据 |
|
|
|
*/ |
|
|
|
public void deleteDeviceLog() { |
|
|
|
public void deleteDeviceLog(Integer deviceLogId) { |
|
|
|
Calendar calendar = Calendar.getInstance(); |
|
|
|
calendar.setTime(new Date()); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"devicelog\") "); |
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); |
|
|
|
query.append("|> range(start: -36d)"); |
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
// 1. 拼接公共过滤条件片段(总条数和分页查询共用) |
|
|
|
StringBuilder predicates = new StringBuilder(); |
|
|
|
// 2. 拼接固定过滤条件(_measurement和_field,必选) |
|
|
|
predicates.append("_measurement=devicelog"); |
|
|
|
if (deviceLogId != null) { |
|
|
|
if (predicates.length() > 0) { |
|
|
|
predicates.append(" and ").append("id = ").append(deviceLogId); |
|
|
|
} |
|
|
|
} |
|
|
|
System.out.println(predicates.toString()); |
|
|
|
DeleteApi deleteApi = client.getDeleteApi(); |
|
|
|
OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
try { |
|
|
|
// Delete data with specific time range |
|
|
|
deleteApi.delete(start, stop, "", LOG_BUCKET_NAME, org); |
|
|
|
// // 删除指定时间范围内所有数据 |
|
|
|
OffsetDateTime start = OffsetDateTime.of(calendar.get(Calendar.YEAR) - 10, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
OffsetDateTime stop = OffsetDateTime.of(calendar.get(Calendar.YEAR) + 1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
deleteApi.delete(start, stop, predicates.toString(), LOG_BUCKET_NAME, org); |
|
|
|
System.out.println("Data deleted successfully"); |
|
|
|
} catch (Exception e) { |
|
|
|
e.printStackTrace(); |
|
|
|
@ -130,7 +161,7 @@ public class DeviceLogService { |
|
|
|
* @param houseId 房间id |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
public JSONObject getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) { |
|
|
|
public JSONObject getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String statusIssue, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) { |
|
|
|
JSONObject resultSet = new JSONObject(); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
// 1. 拼接公共过滤条件片段(总条数和分页查询共用) |
|
|
|
@ -162,6 +193,9 @@ public class DeviceLogService { |
|
|
|
if (result != null && !result.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"result\"] == \"%s\")", result.trim())); |
|
|
|
} |
|
|
|
if (statusIssue != null && !statusIssue.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"statusIssue\"] == \"%s\")", statusIssue.trim())); |
|
|
|
} |
|
|
|
// 5. 动态拼接supplierId过滤条件(仅当supplierId非空且非空白字符串时拼接) |
|
|
|
if (supplierId != null && !supplierId.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"supplierId\"] == \"%s\")", supplierId.trim())); |
|
|
|
@ -203,8 +237,8 @@ public class DeviceLogService { |
|
|
|
dataQuery.append(filterFragment); // 复用过滤条件 |
|
|
|
dataQuery.append(" |> sort(columns:[\"_time\"], desc:true)"); |
|
|
|
// 处理分页参数默认值 |
|
|
|
int finalPageNum = pageNode != null ? pageNode:DEFAULT_PAGE_NUM; |
|
|
|
int finalPageSize = pageSize != null ? pageSize:DEFAULT_PAGE_SIZE; |
|
|
|
int finalPageNum = pageNode != null ? pageNode : DEFAULT_PAGE_NUM; |
|
|
|
int finalPageSize = pageSize != null ? pageSize : DEFAULT_PAGE_SIZE; |
|
|
|
int offset = (finalPageNum - 1) * finalPageSize; |
|
|
|
// dataQuery.append(" |> limit(n: " + finalPageSize + ", offset: " + offset + ")"); |
|
|
|
dataQuery.append(" |> yield(name: \"data\")"); // 标记结果集为data |
|
|
|
@ -229,6 +263,9 @@ public class DeviceLogService { |
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
String value = (String) fluxRecord.getValueByKey("_value"); |
|
|
|
DeviceLogData data = JSONObject.parseObject(value, DeviceLogData.class); |
|
|
|
if (data.getSupplierId() != null) { |
|
|
|
data.setSupplierName(supplierInfoMap.get(data.getSupplierId())); |
|
|
|
} |
|
|
|
rerurnList.add(data); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -236,7 +273,9 @@ public class DeviceLogService { |
|
|
|
Collections.reverse(rerurnList); |
|
|
|
resultSet.put("count", totalCount); |
|
|
|
//对结果按照分页要求截取构造数据 |
|
|
|
rerurnList = rerurnList.subList(offset, Math.min(offset + finalPageSize, rerurnList.size())); |
|
|
|
if (rerurnList != null && rerurnList.size() > 0) { |
|
|
|
rerurnList = rerurnList.subList(offset, Math.min(offset + finalPageSize, rerurnList.size())); |
|
|
|
} |
|
|
|
resultSet.put("list", rerurnList); |
|
|
|
return resultSet; |
|
|
|
} |
|
|
|
@ -271,12 +310,13 @@ public class DeviceLogService { |
|
|
|
String imei = syncDataFlag.getImei(); |
|
|
|
List<DeviceLogData> deviceLogDataList = deviceInfoMapper.queryDeviceLogData(imei, companyId); |
|
|
|
if (deviceLogDataList != null && deviceLogDataList.size() > 0) { |
|
|
|
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); |
|
|
|
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); |
|
|
|
//3. 批量插入设备命令下发日志数据到influxdb中 |
|
|
|
for (DeviceLogData deviceLogData : deviceLogDataList) { |
|
|
|
deviceLogData.setDeviceBelongInfo(deviceBelongInfo != null ? deviceBelongInfo.getDeviceBelongInfo() : null); |
|
|
|
deviceLogData.setHouseId(deviceBelongInfo != null ? deviceBelongInfo.getHouseId() : null); |
|
|
|
try { |
|
|
|
deviceLogData.setCompanyId(companyId); |
|
|
|
saveDeviceLogToInfluxdb(deviceLogData); |
|
|
|
flag = true; |
|
|
|
} catch (Exception e) { |
|
|
|
@ -295,4 +335,49 @@ public class DeviceLogService { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 更新设备命令下发日志数据 |
|
|
|
*/ |
|
|
|
public void updateDeviceLogMqListener(DeviceLogData deviceLogData) { |
|
|
|
//步骤1:查出ID对应的日志 |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
// 1. 拼接公共过滤条件片段(总条数和分页查询共用) |
|
|
|
StringBuilder filterFragment = new StringBuilder(); |
|
|
|
filterFragment.append("from(bucket: \"devicelog\") "); |
|
|
|
filterFragment.append("|> range(start: -360d)"); |
|
|
|
// 2. 拼接固定过滤条件(_measurement和_field,必选) |
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")"); |
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"logjson\")"); |
|
|
|
// 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接) |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"id\"] == \"%s\")", deviceLogData.getId())); |
|
|
|
// 3. 构建分页数据查询子句(yield命名为data) |
|
|
|
StringBuilder dataQuery = new StringBuilder(); |
|
|
|
dataQuery.append(filterFragment); // 复用过滤条件 |
|
|
|
dataQuery.append(" |> sort(columns:[\"_time\"], desc:true)"); |
|
|
|
dataQuery.append(" |> yield(name: \"data\")"); // 标记结果集为data |
|
|
|
System.out.println("查询数据语句:" + dataQuery.toString()); |
|
|
|
//查询数据结果 |
|
|
|
List<FluxTable> dataResults = client.getQueryApi().query(dataQuery.toString()); |
|
|
|
for (FluxTable fluxTable : dataResults) { |
|
|
|
List<FluxRecord> records = fluxTable.getRecords(); |
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
String value = (String) fluxRecord.getValueByKey("_value"); |
|
|
|
DeviceLogData data = JSONObject.parseObject(value, DeviceLogData.class); |
|
|
|
//构造数据 |
|
|
|
DeviceLogData newData = new DeviceLogData(); |
|
|
|
BeanUtils.copyProperties(data, newData); |
|
|
|
newData.setId(deviceLogData.getId()); |
|
|
|
newData.setFeedbackValue(deviceLogData.getFeedbackValue()); |
|
|
|
newData.setStatusIssue(deviceLogData.getStatusIssue()); |
|
|
|
//步骤2:删除该日志 |
|
|
|
//删除设备命令下发日志数据 |
|
|
|
deleteDeviceLog(deviceLogData.getId()); |
|
|
|
//步骤3:更新该日志 |
|
|
|
//保存设备命令下发日志数据到influxdb中 |
|
|
|
saveDeviceLogToInfluxdb(newData); |
|
|
|
} |
|
|
|
} |
|
|
|
client.close(); |
|
|
|
} |
|
|
|
} |