|
|
@ -9,24 +9,22 @@ import com.influxdb.client.domain.WritePrecision; |
|
|
import com.influxdb.query.FluxRecord; |
|
|
import com.influxdb.query.FluxRecord; |
|
|
import com.influxdb.query.FluxTable; |
|
|
import com.influxdb.query.FluxTable; |
|
|
import com.topsail.influxdb.entity.DeviceBelongInfo; |
|
|
import com.topsail.influxdb.entity.DeviceBelongInfo; |
|
|
|
|
|
import com.topsail.influxdb.entity.DeviceDataInfluxData; |
|
|
import com.topsail.influxdb.entity.DeviceHistoryData; |
|
|
import com.topsail.influxdb.entity.DeviceHistoryData; |
|
|
|
|
|
import com.topsail.influxdb.entity.SyncDataFlag; |
|
|
import com.topsail.influxdb.mapper.DeviceInfoMapper; |
|
|
import com.topsail.influxdb.mapper.DeviceInfoMapper; |
|
|
import com.topsail.influxdb.model.DeviceDataInfluxData; |
|
|
|
|
|
import com.topsail.influxdb.pojo.History; |
|
|
import com.topsail.influxdb.pojo.History; |
|
|
import org.slf4j.Logger; |
|
|
import org.slf4j.Logger; |
|
|
import org.slf4j.LoggerFactory; |
|
|
import org.slf4j.LoggerFactory; |
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
import org.springframework.stereotype.Service; |
|
|
import org.springframework.stereotype.Service; |
|
|
import org.springframework.util.StringUtils; |
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
|
|
|
|
import javax.annotation.Resource; |
|
|
import java.text.ParseException; |
|
|
import java.text.ParseException; |
|
|
import java.text.SimpleDateFormat; |
|
|
import java.text.SimpleDateFormat; |
|
|
import java.time.Instant; |
|
|
import java.time.Instant; |
|
|
import java.util.ArrayList; |
|
|
|
|
|
import java.util.Collections; |
|
|
|
|
|
import java.util.List; |
|
|
|
|
|
import java.util.TimeZone; |
|
|
|
|
|
|
|
|
import java.util.*; |
|
|
|
|
|
|
|
|
@Service |
|
|
@Service |
|
|
public class DeviceDataService { |
|
|
public class DeviceDataService { |
|
|
@ -39,7 +37,15 @@ public class DeviceDataService { |
|
|
public String url; |
|
|
public String url; |
|
|
@Value("${shengdilan.influxdb.org}") |
|
|
@Value("${shengdilan.influxdb.org}") |
|
|
private String org; |
|
|
private String org; |
|
|
@Autowired |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${shengdilan.influxdb.oldtoken}") |
|
|
|
|
|
public String oldtoken; |
|
|
|
|
|
@Value("${shengdilan.influxdb.oldurl}") |
|
|
|
|
|
public String oldurl; |
|
|
|
|
|
@Value("${shengdilan.influxdb.oldorg}") |
|
|
|
|
|
private String oldorg; |
|
|
|
|
|
|
|
|
|
|
|
@Resource |
|
|
DeviceInfoMapper deviceInfoMapper; |
|
|
DeviceInfoMapper deviceInfoMapper; |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
@ -97,6 +103,49 @@ public class DeviceDataService { |
|
|
return rerurnList; |
|
|
return rerurnList; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* 获取要转存的历史influxdb数据 |
|
|
|
|
|
*/ |
|
|
|
|
|
public List<DeviceDataInfluxData> getOldInfluxdbData(String imei) { |
|
|
|
|
|
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei); |
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
|
|
query.append("from(bucket: \"iot\") "); |
|
|
|
|
|
query.append("|> range(start: -1y)"); |
|
|
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei)); |
|
|
|
|
|
// if (pageNo != null && pageSize != null) { |
|
|
|
|
|
// query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNo - 1)); |
|
|
|
|
|
// } |
|
|
|
|
|
query.append(" |> yield(name: \"last\")"); |
|
|
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
|
|
List<FluxTable> tables = client.getQueryApi().query(query.toString()); |
|
|
|
|
|
List<DeviceDataInfluxData> rerurnList = new ArrayList<>(); |
|
|
|
|
|
for (FluxTable fluxTable : tables) { |
|
|
|
|
|
List<FluxRecord> records = fluxTable.getRecords(); |
|
|
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
|
|
String value = (String) fluxRecord.getValueByKey("_value"); |
|
|
|
|
|
History history = JSONObject.parseObject(value, History.class); |
|
|
|
|
|
if (history != null) { |
|
|
|
|
|
Date createtime = history.getSenddate(); |
|
|
|
|
|
//将时间转换成Instant |
|
|
|
|
|
Instant time = null; |
|
|
|
|
|
if (createtime != null) { |
|
|
|
|
|
time = createtime.toInstant(); |
|
|
|
|
|
} |
|
|
|
|
|
if (history.getDeviceBelongInfo() == null && deviceBelongInfo != null) { |
|
|
|
|
|
history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); |
|
|
|
|
|
history.setHouseId(deviceBelongInfo.getHouseId()); |
|
|
|
|
|
} |
|
|
|
|
|
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); |
|
|
|
|
|
rerurnList.add(deviceDataInfluxData); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
client.close(); |
|
|
|
|
|
Collections.reverse(rerurnList); |
|
|
|
|
|
return rerurnList; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
* 存储设备数据到influxdb |
|
|
* 存储设备数据到influxdb |
|
|
* |
|
|
* |
|
|
@ -110,57 +159,69 @@ public class DeviceDataService { |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
//查询设备的所属信息 |
|
|
//查询设备的所属信息 |
|
|
DeviceBelongInfo deviceBelongInfo = getDeviceBelongInfo(history.getImei().trim()); |
|
|
|
|
|
if (deviceBelongInfo == null) { |
|
|
|
|
|
return; |
|
|
|
|
|
|
|
|
if (history != null && (history.getDeviceBelongInfo() == null || history.getHouseId() == null)) { |
|
|
|
|
|
DeviceBelongInfo deviceBelongInfo = getDeviceBelongInfo(history.getImei().trim()); |
|
|
|
|
|
if (deviceBelongInfo == null) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); |
|
|
|
|
|
history.setHouseId(deviceBelongInfo.getHouseId()); |
|
|
} |
|
|
} |
|
|
history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo()); |
|
|
|
|
|
history.setHouseId(deviceBelongInfo.getHouseId()); |
|
|
|
|
|
System.out.println("设备所属信息111111111111==========:" + deviceBelongInfo); |
|
|
|
|
|
String influxvalue = "iot,imei=" + history.getImei() + " press=" + history.getSampledata() + " " + history.getTime().getTime(); |
|
|
|
|
|
String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
|
|
String bucket = "iot"; |
|
|
|
|
|
String org = "shengdilan"; |
|
|
|
|
|
|
|
|
Date createtime = history.getSenddate(); |
|
|
|
|
|
//将时间转换成Instant |
|
|
|
|
|
Instant time = null; |
|
|
|
|
|
if (createtime != null) { |
|
|
|
|
|
time = createtime.toInstant(); |
|
|
|
|
|
} |
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); |
|
|
|
|
|
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time); |
|
|
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
|
|
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData); |
|
|
|
|
|
} |
|
|
|
|
|
System.out.println("写入成功==========》" + history.getImei()); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
|
|
DeviceDataInfluxData mem = new DeviceDataInfluxData(); |
|
|
|
|
|
mem.imei = history.getImei(); |
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* 构建Influxdb设备数据 |
|
|
|
|
|
* |
|
|
|
|
|
* @param history |
|
|
|
|
|
*/ |
|
|
|
|
|
private DeviceDataInfluxData rebuildDeviceDataInfluxData(History history, Instant time) { |
|
|
|
|
|
DeviceDataInfluxData deviceDataInfluxData = new DeviceDataInfluxData(); |
|
|
|
|
|
deviceDataInfluxData.imei = history.getImei(); |
|
|
String value = history.getValue(); |
|
|
String value = history.getValue(); |
|
|
if (value != null && !value.equals("")) { |
|
|
if (value != null && !value.equals("")) { |
|
|
String[] values = value.split(","); |
|
|
String[] values = value.split(","); |
|
|
mem.value1 = 0; |
|
|
|
|
|
mem.value2 = 0; |
|
|
|
|
|
mem.value3 = 0; |
|
|
|
|
|
mem.value4 = 0; |
|
|
|
|
|
|
|
|
deviceDataInfluxData.value1 = 0; |
|
|
|
|
|
deviceDataInfluxData.value2 = 0; |
|
|
|
|
|
deviceDataInfluxData.value3 = 0; |
|
|
|
|
|
deviceDataInfluxData.value4 = 0; |
|
|
try { |
|
|
try { |
|
|
if (values.length > 0) { |
|
|
if (values.length > 0) { |
|
|
mem.value1 = Double.parseDouble(values[0]); |
|
|
|
|
|
|
|
|
deviceDataInfluxData.value1 = Double.parseDouble(values[0]); |
|
|
} |
|
|
} |
|
|
if (values.length > 1) { |
|
|
if (values.length > 1) { |
|
|
mem.value2 = Double.parseDouble(values[1]); |
|
|
|
|
|
|
|
|
deviceDataInfluxData.value2 = Double.parseDouble(values[1]); |
|
|
} |
|
|
} |
|
|
if (values.length > 2) { |
|
|
if (values.length > 2) { |
|
|
mem.value3 = Double.parseDouble(values[2]); |
|
|
|
|
|
|
|
|
deviceDataInfluxData.value3 = Double.parseDouble(values[2]); |
|
|
} |
|
|
} |
|
|
if (values.length > 3) { |
|
|
if (values.length > 3) { |
|
|
mem.value4 = Double.parseDouble(values[4]); |
|
|
|
|
|
|
|
|
deviceDataInfluxData.value4 = Double.parseDouble(values[4]); |
|
|
} |
|
|
} |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
LOG.info(e.getMessage()); |
|
|
LOG.info(e.getMessage()); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
//从内部移动处出来,value有没有数值都进行插入 |
|
|
//从内部移动处出来,value有没有数值都进行插入 |
|
|
mem.battery = history.getBatterylevel(); |
|
|
|
|
|
mem.sigal = history.getSingalstrength(); |
|
|
|
|
|
mem.jsondata = JSON.toJSONString(history); |
|
|
|
|
|
mem.values = history.getValue(); |
|
|
|
|
|
mem.unit = history.getUnit(); |
|
|
|
|
|
mem.alarmtype = history.getAlarmtype(); |
|
|
|
|
|
mem.time = Instant.now(); |
|
|
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
|
|
writeApi.writeMeasurement(bucket, org, WritePrecision.NS, mem); |
|
|
|
|
|
} |
|
|
|
|
|
System.out.println("写入成功==========》" + history.getImei()); |
|
|
|
|
|
|
|
|
deviceDataInfluxData.battery = history.getBatterylevel(); |
|
|
|
|
|
deviceDataInfluxData.sigal = history.getSingalstrength(); |
|
|
|
|
|
deviceDataInfluxData.jsondata = JSON.toJSONString(history); |
|
|
|
|
|
deviceDataInfluxData.values = history.getValue(); |
|
|
|
|
|
deviceDataInfluxData.unit = history.getUnit(); |
|
|
|
|
|
deviceDataInfluxData.alarmtype = history.getAlarmtype(); |
|
|
|
|
|
deviceDataInfluxData.time = time != null ? time : Instant.now(); |
|
|
|
|
|
return deviceDataInfluxData; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/** |
|
|
/** |
|
|
@ -174,10 +235,7 @@ public class DeviceDataService { |
|
|
* 删除掉所有的Influxdb数据 |
|
|
* 删除掉所有的Influxdb数据 |
|
|
*/ |
|
|
*/ |
|
|
public void deleteDeviceData() { |
|
|
public void deleteDeviceData() { |
|
|
// String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
|
|
// String bucket = "iot"; |
|
|
|
|
|
// String org = "shengdilan"; |
|
|
|
|
|
// InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
|
|
|
|
|
// InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray()); |
|
|
// StringBuffer query = new StringBuffer(); |
|
|
// StringBuffer query = new StringBuffer(); |
|
|
// query.append("from(bucket: \"iot\") "); |
|
|
// query.append("from(bucket: \"iot\") "); |
|
|
// query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")")); |
|
|
// query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")")); |
|
|
@ -188,7 +246,7 @@ public class DeviceDataService { |
|
|
// OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
// OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
// try { |
|
|
// try { |
|
|
// // Delete data with specific time range |
|
|
// // Delete data with specific time range |
|
|
// deleteApi.delete(start, stop, "", bucket, org); |
|
|
|
|
|
|
|
|
// deleteApi.delete(start, stop, "", DEVICEDATA_BUCKET_NAME, oldorg); |
|
|
// System.out.println("Data deleted successfully"); |
|
|
// System.out.println("Data deleted successfully"); |
|
|
// } catch (Exception e) { |
|
|
// } catch (Exception e) { |
|
|
// e.printStackTrace(); |
|
|
// e.printStackTrace(); |
|
|
@ -196,4 +254,68 @@ public class DeviceDataService { |
|
|
// client.close(); |
|
|
// client.close(); |
|
|
// } |
|
|
// } |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* 转存设备历史数据(根据设备号查询设备历史数据) |
|
|
|
|
|
* |
|
|
|
|
|
* @param imei |
|
|
|
|
|
*/ |
|
|
|
|
|
public void transferDeviceData(String imei) { |
|
|
|
|
|
//1.查询所有的设备编号 |
|
|
|
|
|
List<SyncDataFlag> syncDataFlags = deviceInfoMapper.querySyncDeviceFlagInfo(); |
|
|
|
|
|
//2.根据设备编号查询历史Influxdb所有的设备数据 |
|
|
|
|
|
if (syncDataFlags != null && syncDataFlags.size() > 0) { |
|
|
|
|
|
for (SyncDataFlag syncDataFlag : syncDataFlags) { |
|
|
|
|
|
List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei()); |
|
|
|
|
|
if (influxdbDataList != null && influxdbDataList.size() > 0) { |
|
|
|
|
|
for (DeviceDataInfluxData influxData : influxdbDataList) { |
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray()); |
|
|
|
|
|
//3.将设备数据保存到influxdb中 |
|
|
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
|
|
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, oldorg, WritePrecision.NS, influxData); |
|
|
|
|
|
} catch (Exception e) { |
|
|
|
|
|
LOG.error("保存设备数据到influxdb失败:{}", e.getMessage()); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
* 查询设备编码对应的设备数据记录数 |
|
|
|
|
|
*/ |
|
|
|
|
|
public Long getDeviceDataCount(String imei) { |
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg); |
|
|
|
|
|
// 1. 拼接公共过滤条件片段(总条数和分页查询共用) |
|
|
|
|
|
StringBuilder filterFragment = new StringBuilder(); |
|
|
|
|
|
filterFragment.append("from(bucket: \"iot\") "); |
|
|
|
|
|
filterFragment.append("|> range(start: -400d)"); |
|
|
|
|
|
// 2. 拼接固定过滤条件(_measurement和_field,必选) |
|
|
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")"); |
|
|
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"jsondata\")"); |
|
|
|
|
|
// 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接) |
|
|
|
|
|
if (imei != null && !imei.trim().isEmpty()) { |
|
|
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"imei\"] == \"%s\")", imei.trim())); |
|
|
|
|
|
} |
|
|
|
|
|
// 5. 拼接固定排序条件(按时间倒序) |
|
|
|
|
|
// 2. 构建总条数查询子句(yield命名为total) |
|
|
|
|
|
StringBuilder totalQuery = new StringBuilder(); |
|
|
|
|
|
totalQuery.append(filterFragment); // 复用过滤条件 |
|
|
|
|
|
totalQuery.append(" |> count(column: \"_value\")"); // 统计总条数 |
|
|
|
|
|
totalQuery.append(" |> yield(name: \"total\")\n"); // 标记结果集为total |
|
|
|
|
|
System.out.println("查询数量语句:" + totalQuery.toString()); |
|
|
|
|
|
//查询总条数 |
|
|
|
|
|
long totalCount = 0; |
|
|
|
|
|
List<FluxTable> totalTables = client.getQueryApi().query(totalQuery.toString()); |
|
|
|
|
|
for (FluxTable table : totalTables) { |
|
|
|
|
|
for (FluxRecord record : table.getRecords()) { |
|
|
|
|
|
Object total = record.getValue(); |
|
|
|
|
|
if (total != null) { |
|
|
|
|
|
totalCount = totalCount + ((Number) total).longValue(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return totalCount; |
|
|
|
|
|
} |
|
|
} |
|
|
} |