You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

342 lines
16 KiB

package com.topsail.influxdb.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.influxdb.client.DeleteApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.topsail.influxdb.entity.DeviceBelongInfo;
import com.topsail.influxdb.entity.DeviceDataInfluxData;
import com.topsail.influxdb.entity.DeviceHistoryData;
import com.topsail.influxdb.entity.SyncDataFlag;
import com.topsail.influxdb.mapper.DeviceInfoMapper;
import com.topsail.influxdb.pojo.History;
import com.topsail.influxdb.rabbitmq.service.AmqpService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;
@Service
public class DeviceDataService {
public static final Logger LOG = LoggerFactory.getLogger(DeviceDataService.class);
// InfluxDB基础配置
private static final String DEVICEDATA_BUCKET_NAME = "iot";
@Value("${shengdilan.influxdb.token}")
public String token;
@Value("${shengdilan.influxdb.url}")
public String url;
@Value("${shengdilan.influxdb.org}")
private String org;
@Value("${shengdilan.influxdb.oldtoken}")
public String oldtoken;
@Value("${shengdilan.influxdb.oldurl}")
public String oldurl;
@Value("${shengdilan.influxdb.oldorg}")
private String oldorg;
@Resource
DeviceInfoMapper deviceInfoMapper;
@Autowired
AmqpService amqpService;
/**
* 根据设备号查询设备历史数据
*
* @param uid
* @param pageNo
* @param pageSize
* @param startTime
* @param endTime
* @param imei
* @return
*/
public List<DeviceHistoryData> getDeviceHistoryData(String uid, Integer pageNo, Integer pageSize, String startTime, String endTime, String imei) {
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org);
StringBuffer query = new StringBuffer();
query.append("from(bucket: \"iot\") ");
if (startTime != null && endTime != null) {
SimpleDateFormat oldFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SimpleDateFormat newFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
newFormat.setTimeZone(TimeZone.getTimeZone("UTC"));//时区转换
String start = null;
try {
start = newFormat.format(oldFormat.parse(startTime));
String stop = newFormat.format(oldFormat.parse(endTime));
query.append(String.format(" |> range(start:%s, stop:%s)", start, stop));
} catch (ParseException e) {
e.printStackTrace();
}
} else {
query.append("|> range(start: -30d)");
}
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei));
if (pageNo != null && pageSize != null) {
query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNo - 1));
}
query.append(" |> yield(name: \"last\")");
System.out.println("查询语句==========:" + query);
List<FluxTable> tables = client.getQueryApi().query(query.toString());
List<DeviceHistoryData> rerurnList = new ArrayList<>();
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
String value = (String) fluxRecord.getValueByKey("_value");
DeviceHistoryData data = JSONObject.parseObject(value, DeviceHistoryData.class);
Integer singalstrength = (Integer) JSONObject.parseObject(value).get("singalstrength");
data.setSignalStrength(singalstrength);
data.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format((Double.parseDouble(data.getTime()))));
rerurnList.add(data);
}
}
client.close();
Collections.reverse(rerurnList);
return rerurnList;
}
/**
* 获取要转存的历史influxdb数据
*/
public List<DeviceDataInfluxData> getOldInfluxdbData(String imei) {
DeviceBelongInfo deviceBelongInfo = deviceInfoMapper.queryDeviceBelongInfo(imei);
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org);
StringBuffer query = new StringBuffer();
query.append("from(bucket: \"iot\") ");
query.append("|> range(start: -1y)");
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei));
// if (pageNo != null && pageSize != null) {
// query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNo - 1));
// }
query.append(" |> yield(name: \"last\")");
System.out.println("查询语句==========:" + query);
List<FluxTable> tables = client.getQueryApi().query(query.toString());
List<DeviceDataInfluxData> rerurnList = new ArrayList<>();
for (FluxTable fluxTable : tables) {
List<FluxRecord> records = fluxTable.getRecords();
for (FluxRecord fluxRecord : records) {
String value = (String) fluxRecord.getValueByKey("_value");
History history = JSONObject.parseObject(value, History.class);
if (history != null) {
Date createtime = history.getSenddate();
//将时间转换成Instant
Instant time = null;
if (createtime != null) {
time = createtime.toInstant();
}
if (history.getDeviceBelongInfo() == null && deviceBelongInfo != null) {
history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo());
history.setHouseId(deviceBelongInfo.getHouseId());
}
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time);
rerurnList.add(deviceDataInfluxData);
}
}
}
client.close();
Collections.reverse(rerurnList);
return rerurnList;
}
/**
* 存储设备数据到influxdb
*
* @param history
*/
public void saveDeviceDataToInfluxdb(History history) {
if (history.getSenddate() == null) {
return;
}
if (history == null || (StringUtils.isEmpty(history.getImei()))) {
return;
}
//查询设备的所属信息
if (history != null && (history.getDeviceBelongInfo() == null || history.getHouseId() == null)) {
DeviceBelongInfo deviceBelongInfo = getDeviceBelongInfo(history.getImei().trim());
if (deviceBelongInfo == null) {
return;
}
history.setDeviceBelongInfo(deviceBelongInfo.getDeviceBelongInfo());
history.setHouseId(deviceBelongInfo.getHouseId());
}
Date createtime = history.getSenddate();
//将时间转换成Instant
Instant time = null;
if (createtime != null) {
time = createtime.toInstant();
}
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray());
DeviceDataInfluxData deviceDataInfluxData = rebuildDeviceDataInfluxData(history, time);
try (WriteApi writeApi = client.getWriteApi()) {
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, org, WritePrecision.NS, deviceDataInfluxData);
}
System.out.println("设备数据写入成功==========》" + history.getImei());
}
/**
* 构建Influxdb设备数据
*
* @param history
*/
private DeviceDataInfluxData rebuildDeviceDataInfluxData(History history, Instant time) {
DeviceDataInfluxData deviceDataInfluxData = new DeviceDataInfluxData();
deviceDataInfluxData.imei = history.getImei();
String value = history.getValue();
if (value != null && !value.equals("")) {
String[] values = value.split(",");
deviceDataInfluxData.value1 = 0;
deviceDataInfluxData.value2 = 0;
deviceDataInfluxData.value3 = 0;
deviceDataInfluxData.value4 = 0;
try {
if (values.length > 0) {
deviceDataInfluxData.value1 = Double.parseDouble(values[0]);
}
if (values.length > 1) {
deviceDataInfluxData.value2 = Double.parseDouble(values[1]);
}
if (values.length > 2) {
deviceDataInfluxData.value3 = Double.parseDouble(values[2]);
}
if (values.length > 3) {
deviceDataInfluxData.value4 = Double.parseDouble(values[4]);
}
} catch (Exception e) {
LOG.info(e.getMessage());
}
}
//从内部移动处出来,value有没有数值都进行插入
deviceDataInfluxData.battery = history.getBatterylevel();
deviceDataInfluxData.sigal = history.getSingalstrength();
deviceDataInfluxData.jsondata = JSON.toJSONString(history);
deviceDataInfluxData.values = history.getValue();
deviceDataInfluxData.unit = history.getUnit();
deviceDataInfluxData.alarmtype = history.getAlarmtype();
deviceDataInfluxData.time = time != null ? time : Instant.now();
return deviceDataInfluxData;
}
/**
* 查询设备的所属信息
*/
public DeviceBelongInfo getDeviceBelongInfo(String imei) {
return deviceInfoMapper.queryDeviceBelongInfo(imei);
}
/**
* 删除掉所有的Influxdb数据
*/
public void deleteDeviceData() {
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray());
StringBuffer query = new StringBuffer();
query.append("from(bucket: \"iot\") ");
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")"));
query.append("|> range(start: -36d)");
System.out.println("查询语句==========:" + query);
DeleteApi deleteApi = client.getDeleteApi();
OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
try {
// Delete data with specific time range
deleteApi.delete(start, stop, "", DEVICEDATA_BUCKET_NAME, org);
System.out.println("Data deleted successfully");
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
/**
* 转存设备历史数据(根据设备号查询设备历史数据)
*
* @param imei
*/
public void transferDeviceData(String imei) {
//1.查询所有的设备编号
List<SyncDataFlag> syncDataFlags = deviceInfoMapper.querySyncDeviceFlagInfo(imei);
//2.根据设备编号查询历史Influxdb所有的设备数据
if (syncDataFlags != null && syncDataFlags.size() > 0) {
for (SyncDataFlag syncDataFlag : syncDataFlags) {
Boolean syncDeviceData = false;
List<DeviceDataInfluxData> influxdbDataList = getOldInfluxdbData(syncDataFlag.getImei());
if (influxdbDataList != null && influxdbDataList.size() > 0) {
for (DeviceDataInfluxData influxData : influxdbDataList) {
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray());
//3.将设备数据保存到influxdb中
try (WriteApi writeApi = client.getWriteApi()) {
writeApi.writeMeasurement(DEVICEDATA_BUCKET_NAME, oldorg, WritePrecision.NS, influxData);
syncDeviceData = true;
} catch (Exception e) {
LOG.error("保存设备数据到influxdb失败:{}", e.getMessage());
syncDeviceData = false;
if (influxData != null && influxData.jsondata != null && !influxData.jsondata.equals("")) {
String message = influxData.jsondata;
History history = JSON.parseObject(message, History.class);
if (history != null && history.getImei() != null && !history.getImei().equals("")) {
amqpService.SendMessage("shengdilandevicedataback", JSON.toJSONString(history));
}
}
}
}
}
if (syncDeviceData) {
//4.更新同步设备数据状态
deviceInfoMapper.updateSyncDeviceDataFlagInfo(syncDataFlag.getId(), 1);
}
}
}
}
/**
* 查询设备编码对应的设备数据记录数
*/
public Long getDeviceDataCount(String imei) {
InfluxDBClient client = InfluxDBClientFactory.create(oldurl, oldtoken.toCharArray(), oldorg);
// 1. 拼接公共过滤条件片段(总条数和分页查询共用)
StringBuilder filterFragment = new StringBuilder();
filterFragment.append("from(bucket: \"iot\") ");
filterFragment.append("|> range(start: -400d)");
// 2. 拼接固定过滤条件(_measurement和_field,必选)
filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\")");
filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"jsondata\")");
// 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接)
if (imei != null && !imei.trim().isEmpty()) {
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"imei\"] == \"%s\")", imei.trim()));
}
// 5. 拼接固定排序条件(按时间倒序)
// 2. 构建总条数查询子句(yield命名为total)
StringBuilder totalQuery = new StringBuilder();
totalQuery.append(filterFragment); // 复用过滤条件
totalQuery.append(" |> count(column: \"_value\")"); // 统计总条数
totalQuery.append(" |> yield(name: \"total\")\n"); // 标记结果集为total
System.out.println("查询数量语句:" + totalQuery.toString());
//查询总条数
long totalCount = 0;
List<FluxTable> totalTables = client.getQueryApi().query(totalQuery.toString());
for (FluxTable table : totalTables) {
for (FluxRecord record : table.getRecords()) {
Object total = record.getValue();
if (total != null) {
totalCount = totalCount + ((Number) total).longValue();
}
}
}
return totalCount;
}
}