|
|
|
@ -9,10 +9,7 @@ import com.influxdb.client.WriteApi; |
|
|
|
import com.influxdb.client.domain.WritePrecision; |
|
|
|
import com.influxdb.query.FluxRecord; |
|
|
|
import com.influxdb.query.FluxTable; |
|
|
|
import com.topsail.influxdb.entity.DeviceBelongInfo; |
|
|
|
import com.topsail.influxdb.entity.DeviceDataInfluxData; |
|
|
|
import com.topsail.influxdb.entity.DeviceHistoryData; |
|
|
|
import com.topsail.influxdb.entity.SyncDataFlag; |
|
|
|
import com.topsail.influxdb.entity.*; |
|
|
|
import com.topsail.influxdb.mapper.DeviceInfoMapper; |
|
|
|
import com.topsail.influxdb.pojo.History; |
|
|
|
import com.topsail.influxdb.rabbitmq.service.AmqpService; |
|
|
|
@ -34,6 +31,7 @@ import java.util.*; |
|
|
|
@Service |
|
|
|
public class DeviceDataService { |
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(DeviceDataService.class); |
|
|
|
public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
|
|
// InfluxDB基础配置 |
|
|
|
private static final String DEVICEDATA_BUCKET_NAME = "iot"; |
|
|
|
@Value("${shengdilan.influxdb.token}") |
|
|
|
@ -66,7 +64,7 @@ public class DeviceDataService { |
|
|
|
* @param imei |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
public List<DeviceHistoryData> getDeviceHistoryData(String uid, Integer pageNo, Integer pageSize, String startTime, String endTime, String imei) { |
|
|
|
public List<DeviceHistoryVo> getDeviceHistoryData(String uid, Integer pageNo, Integer pageSize, String startTime, String endTime, String imei) { |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"iot\") "); |
|
|
|
@ -83,7 +81,7 @@ public class DeviceDataService { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
query.append("|> range(start: -30d)"); |
|
|
|
query.append("|> range(start: -1d)"); |
|
|
|
} |
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei)); |
|
|
|
if (pageNo != null && pageSize != null) { |
|
|
|
@ -92,7 +90,8 @@ public class DeviceDataService { |
|
|
|
query.append(" |> yield(name: \"last\")"); |
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
List<FluxTable> tables = client.getQueryApi().query(query.toString()); |
|
|
|
List<DeviceHistoryData> rerurnList = new ArrayList<>(); |
|
|
|
// List<DeviceHistoryData> returnList = new ArrayList<>(); |
|
|
|
List<DeviceHistoryVo> resultSet = new ArrayList<>(); |
|
|
|
for (FluxTable fluxTable : tables) { |
|
|
|
List<FluxRecord> records = fluxTable.getRecords(); |
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
@ -100,13 +99,65 @@ public class DeviceDataService { |
|
|
|
DeviceHistoryData data = JSONObject.parseObject(value, DeviceHistoryData.class); |
|
|
|
Integer singalstrength = (Integer) JSONObject.parseObject(value).get("singalstrength"); |
|
|
|
data.setSignalStrength(singalstrength); |
|
|
|
data.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format((Double.parseDouble(data.getTime())))); |
|
|
|
rerurnList.add(data); |
|
|
|
data.setTime(dateFormat.format((Long.parseLong(data.getTime())))); |
|
|
|
DeviceHistoryVo dataVo = DeviceHistoryVo.builder() |
|
|
|
.deviceType(String.valueOf(data.getDeviceType())) |
|
|
|
.imei(data.getImei()) |
|
|
|
.batteryLevel(data.getBatteryLevel()) |
|
|
|
.singalStrength(data.getSignalStrength()) |
|
|
|
.sampleData(data.getSampleData()) |
|
|
|
.passNum(data.getPassNum()) |
|
|
|
.alarmType(data.getAlarmType()) |
|
|
|
.unit(data.getUnit()) |
|
|
|
.sendTime(data.getTime()) |
|
|
|
.dataBody(data.getDataBody()) |
|
|
|
.value(data.getValue()).build(); |
|
|
|
dataVo = analysisSampleData(dataVo, data.getValue(), data.getUnit()); |
|
|
|
resultSet.add(dataVo); |
|
|
|
// returnList.add(data); |
|
|
|
} |
|
|
|
} |
|
|
|
client.close(); |
|
|
|
Collections.reverse(rerurnList); |
|
|
|
return rerurnList; |
|
|
|
Collections.reverse(resultSet); |
|
|
|
return resultSet; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 解析数据 |
|
|
|
* |
|
|
|
* @param dataVo |
|
|
|
* @param sampleData |
|
|
|
* @param unit |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
private DeviceHistoryVo analysisSampleData(DeviceHistoryVo dataVo, String sampleData, String unit) { |
|
|
|
if (sampleData != null && !"".equals(sampleData) && unit != null && !"".equals(unit)) { |
|
|
|
//如果字符串中包含中括号则去掉中括号 |
|
|
|
if (sampleData.contains("[")) { |
|
|
|
sampleData = sampleData.substring(sampleData.indexOf("[") + 1, sampleData.indexOf("]")); |
|
|
|
} |
|
|
|
String[] data = sampleData.split(","); |
|
|
|
if (unit.contains("[")) { |
|
|
|
unit = unit.substring(unit.indexOf("[") + 1, unit.indexOf("]")); |
|
|
|
} |
|
|
|
String[] unitString = unit.split(","); |
|
|
|
for (int i = 0; i < unitString.length; i++) { |
|
|
|
if (unitString[i].contains("℃") && data.length > i) { |
|
|
|
//温度 |
|
|
|
dataVo.setTemperData(data[i]); |
|
|
|
} else if (unitString[i].contains("O/F") && data.length > i) { |
|
|
|
//开度 |
|
|
|
dataVo.setSampleData(data[i]); |
|
|
|
} else if (unitString[i].contains("%RH") && data.length > i) { |
|
|
|
//湿度 |
|
|
|
dataVo.setHumidityData(data[i]); |
|
|
|
} else if (unitString[i].contains("MPa") && data.length > i) { |
|
|
|
//压力 |
|
|
|
dataVo.setPreessureData(data[i]); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return dataVo; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
@ -114,7 +165,7 @@ public class DeviceDataService { |
|
|
|
*/ |
|
|
|
public List<DeviceDataInfluxData> getOldInfluxdbData(String imei) { |
|
|
|
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"iot\") "); |
|
|
|
query.append("|> range(start: -1y)"); |
|
|
|
@ -183,8 +234,12 @@ public class DeviceDataService { |
|
|
|
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); |
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData); |
|
|
|
} catch (Exception e) { |
|
|
|
e.printStackTrace(); |
|
|
|
LOG.error("设备数据写入influxdb失败:{}", history.getImei()); |
|
|
|
amqpService.SendMessage("shengdilandevicedataback", JSON.toJSONString(history)); |
|
|
|
} |
|
|
|
System.out.println("设备数据写入成功==========》" + history.getImei()); |
|
|
|
LOG.info("设备数据写入influxdb成功:{}", history.getImei()); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
@ -268,18 +323,18 @@ public class DeviceDataService { |
|
|
|
*/ |
|
|
|
public void transferDeviceData(String imei) { |
|
|
|
//1.查询所有的设备编号 |
|
|
|
List<SyncDataFlag> syncDataFlags = deviceInfoMapper.querySyncDeviceFlagInfo(imei); |
|
|
|
List<SyncDataFlag> syncDataFlags = deviceInfoMapper.querySyncDeviceDataFlagInfo(imei); |
|
|
|
//2.根据设备编号查询历史Influxdb所有的设备数据 |
|
|
|
if (syncDataFlags != null && syncDataFlags.size() > 0) { |
|
|
|
for (SyncDataFlag syncDataFlag : syncDataFlags) { |
|
|
|
Boolean syncDeviceData = false; |
|
|
|
List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei()); |
|
|
|
List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei().toLowerCase(Locale.ROOT)); |
|
|
|
if (influxdbDataList != null && influxdbDataList.size() > 0) { |
|
|
|
for (DeviceDataInfluxData influxData : influxdbDataList) { |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray()); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); |
|
|
|
//3.将设备数据保存到influxdb中 |
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, oldorg, WritePrecision.NS, influxData); |
|
|
|
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, influxData); |
|
|
|
syncDeviceData = true; |
|
|
|
} catch (Exception e) { |
|
|
|
LOG.error("保存设备数据到influxdb失败:{}", e.getMessage()); |
|
|
|
|