|
|
|
@ -2,98 +2,47 @@ package com.topsail.influxdb.service; |
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.influxdb.client.DeleteApi; |
|
|
|
import com.influxdb.client.InfluxDBClient; |
|
|
|
import com.influxdb.client.InfluxDBClientFactory; |
|
|
|
import com.influxdb.client.WriteApi; |
|
|
|
import com.influxdb.client.domain.WritePrecision; |
|
|
|
import com.influxdb.query.FluxRecord; |
|
|
|
import com.influxdb.query.FluxTable; |
|
|
|
import com.topsail.influxdb.entity.DeviceBelongInfo; |
|
|
|
import com.topsail.influxdb.entity.DeviceHistoryData; |
|
|
|
import com.topsail.influxdb.entity.DeviceLogData; |
|
|
|
import com.topsail.influxdb.mapper.DeviceInfoMapper; |
|
|
|
import com.topsail.influxdb.model.DeviceDataInfluxData; |
|
|
|
import com.topsail.influxdb.model.DeviceLogInfluxData; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
import java.text.ParseException; |
|
|
|
import java.text.SimpleDateFormat; |
|
|
|
import java.time.Instant; |
|
|
|
import java.time.OffsetDateTime; |
|
|
|
import java.time.ZoneOffset; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.List; |
|
|
|
import java.util.TimeZone; |
|
|
|
import java.util.regex.Pattern; |
|
|
|
|
|
|
|
@Service |
|
|
|
public class DeviceLogService { |
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(DeviceLogService.class); |
|
|
|
@Autowired |
|
|
|
DeviceInfoMapper deviceInfoMapper; |
|
|
|
|
|
|
|
/** |
|
|
|
* 根据设备号查询设备历史数据 |
|
|
|
* |
|
|
|
* @param uid |
|
|
|
* @param pageNo |
|
|
|
* @param pageSize |
|
|
|
* @param startTime |
|
|
|
* @param endTime |
|
|
|
* @param imei |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
public List<DeviceHistoryData> getDeviceHistoryData(String uid, Integer pageNo, Integer pageSize, String startTime, String endTime, String imei) { |
|
|
|
String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
String bucket = "iot"; |
|
|
|
String org = "shengdilan"; |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"iot\") "); |
|
|
|
if (startTime != null && endTime != null) { |
|
|
|
SimpleDateFormat oldFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
|
|
SimpleDateFormat newFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
|
|
|
newFormat.setTimeZone(TimeZone.getTimeZone("UTC"));//时区转换 |
|
|
|
String start = null; |
|
|
|
try { |
|
|
|
start = newFormat.format(oldFormat.parse(startTime)); |
|
|
|
String stop = newFormat.format(oldFormat.parse(endTime)); |
|
|
|
query.append(String.format(" |> range(start:%s, stop:%s)", start, stop)); |
|
|
|
} catch (ParseException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
query.append("|> range(start: -36h)"); |
|
|
|
} |
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"history\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"jsondata\" ) |> sort(columns:[\"_time\"], desc:true) ", imei)); |
|
|
|
if (pageNo != null && pageSize != null) { |
|
|
|
query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNo - 1)); |
|
|
|
} |
|
|
|
query.append(" |> yield(name: \"last\")"); |
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
List<FluxTable> tables = client.getQueryApi().query(query.toString(), org); |
|
|
|
List<DeviceHistoryData> rerurnList = new ArrayList<>(); |
|
|
|
for (FluxTable fluxTable : tables) { |
|
|
|
List<FluxRecord> records = fluxTable.getRecords(); |
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
String value = (String) fluxRecord.getValueByKey("_value"); |
|
|
|
DeviceHistoryData data = JSONObject.parseObject(value, DeviceHistoryData.class); |
|
|
|
Integer singalstrength = (Integer) JSONObject.parseObject(value).get("singalstrength"); |
|
|
|
data.setSignalStrength(singalstrength); |
|
|
|
data.setTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format((Double.parseDouble(data.getTime())))); |
|
|
|
rerurnList.add(data); |
|
|
|
} |
|
|
|
} |
|
|
|
client.close(); |
|
|
|
Collections.reverse(rerurnList); |
|
|
|
return rerurnList; |
|
|
|
} |
|
|
|
//@Autowired |
|
|
|
//@Qualifier("inInfluxdbMetricsRepository") |
|
|
|
//private MetricsRepository<HistoryWithBLOBs> metricStore; |
|
|
|
|
|
|
|
//InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://47.97.117.253:9999", "uhr8-DJ10oDa1ZaQALDY5tmWhJ1r0sRYWkc00wUDM6o5MpKP4yB_dclU7ZDbaAfApOY0tAud1YEujsZWh9YohA==".toCharArray()); |
|
|
|
// InfluxDB基础配置 |
|
|
|
private static final String LOG_BUCKET_NAME = "devicelog"; |
|
|
|
@Value("${shengdilan.influxdb.token}") |
|
|
|
public String token; |
|
|
|
@Value("${shengdilan.influxdb.url}") |
|
|
|
public String url; |
|
|
|
@Value("${shengdilan.influxdb.org}") |
|
|
|
private String org; |
|
|
|
// 默认分页参数 |
|
|
|
private static final int DEFAULT_PAGE_NUM = 1; |
|
|
|
private static final int DEFAULT_PAGE_SIZE = 10; |
|
|
|
|
|
|
|
/** |
|
|
|
* 存储设备数据到influxdb |
|
|
|
@ -104,124 +53,50 @@ public class DeviceLogService { |
|
|
|
if (deviceLogData == null || (StringUtils.isEmpty(deviceLogData.getImei()))) { |
|
|
|
return; |
|
|
|
} |
|
|
|
String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
String bucket = "devicelog"; |
|
|
|
String org = "shengdilan"; |
|
|
|
|
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
DeviceDataInfluxData mem = new DeviceDataInfluxData(); |
|
|
|
mem.imei = history.getImei(); |
|
|
|
String value = history.getValue(); |
|
|
|
if (value != null && !value.equals("")) { |
|
|
|
String[] values = value.split(","); |
|
|
|
mem.value1 = 0; |
|
|
|
mem.value2 = 0; |
|
|
|
mem.value3 = 0; |
|
|
|
mem.value4 = 0; |
|
|
|
try { |
|
|
|
if (values.length > 0) { |
|
|
|
mem.value1 = Double.parseDouble(values[0]); |
|
|
|
} |
|
|
|
if (values.length > 1) { |
|
|
|
mem.value2 = Double.parseDouble(values[1]); |
|
|
|
} |
|
|
|
if (values.length > 2) { |
|
|
|
mem.value3 = Double.parseDouble(values[2]); |
|
|
|
} |
|
|
|
if (values.length > 3) { |
|
|
|
mem.value4 = Double.parseDouble(values[4]); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
LOG.info(e.getMessage()); |
|
|
|
} |
|
|
|
/* mem.battery = history.getBatterylevel(); |
|
|
|
mem.sigal = history.getSingalstrength(); |
|
|
|
mem.jsondata=JSON.toJSONString(history); |
|
|
|
mem.values=history.getValue(); |
|
|
|
mem.unit=history.getUnit(); |
|
|
|
mem.alarmtype=history.getAlarmtype(); |
|
|
|
//mem.sampleData = history.getSampledata(); |
|
|
|
//mem.time = Instant.now(); |
|
|
|
mem.time = history.getSenddate().toInstant(); |
|
|
|
System.out.println("写入中==========》"); |
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
writeApi.writeMeasurement(bucket, org, WritePrecision.NS, mem); |
|
|
|
System.out.println("写入成功==========》"); |
|
|
|
}*/ |
|
|
|
} |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); |
|
|
|
DeviceLogInfluxData deviceLogInfluxData = new DeviceLogInfluxData(); |
|
|
|
deviceLogInfluxData.imei = deviceLogData.getImei(); |
|
|
|
//从内部移动处出来,value有没有数值都进行插入 |
|
|
|
mem.battery = history.getBatterylevel(); |
|
|
|
mem.sigal = history.getSingalstrength(); |
|
|
|
mem.jsondata = JSON.toJSONString(history); |
|
|
|
mem.values = history.getValue(); |
|
|
|
mem.unit = history.getUnit(); |
|
|
|
mem.alarmtype = history.getAlarmtype(); |
|
|
|
//mem.sampleData = history.getSampledata(); |
|
|
|
mem.time = Instant.now(); |
|
|
|
// mem.houseId = deviceBelongInfo != null ? deviceBelongInfo.getHouseId() : null; |
|
|
|
// mem.deviceBelongInfo = deviceBelongInfo != null ? deviceBelongInfo.getDeviceBelongInfo() : null; |
|
|
|
//mem.time = history.getSenddate().toInstant(); |
|
|
|
deviceLogInfluxData.result = deviceLogData.getResult(); |
|
|
|
deviceLogInfluxData.statusIssue = deviceLogData.getStatusIssue(); |
|
|
|
deviceLogInfluxData.supplierId = deviceLogData.getSupplierId(); |
|
|
|
deviceLogInfluxData.companyId = deviceLogData.getCompanyId(); |
|
|
|
deviceLogInfluxData.operator = deviceLogData.getOperator(); |
|
|
|
deviceLogInfluxData.operation = deviceLogData.getOperation(); |
|
|
|
deviceLogInfluxData.logjson = JSON.toJSONString(deviceLogData); |
|
|
|
deviceLogInfluxData.targetValue = deviceLogData.getTargetValue(); |
|
|
|
deviceLogInfluxData.feedbackValue = deviceLogData.getFeedbackValue(); |
|
|
|
deviceLogInfluxData.deviceBelongInfo = deviceLogData.getDeviceBelongInfo(); |
|
|
|
deviceLogInfluxData.houseId = deviceLogData.getHouseId(); |
|
|
|
deviceLogInfluxData.time = Instant.now(); |
|
|
|
try (WriteApi writeApi = client.getWriteApi()) { |
|
|
|
writeApi.writeMeasurement(bucket, org, WritePrecision.NS, mem); |
|
|
|
writeApi.writeMeasurement(LOG_BUCKET_NAME, org, WritePrecision.NS, deviceLogInfluxData); |
|
|
|
} |
|
|
|
System.out.println("写入成功==========》" + history.getImei()); |
|
|
|
//amqpService.SendMessage("telegraf",influxvalue); |
|
|
|
// InfluxDBClient influxDBClient = InfluxDBClientFactory.create("http://47.97.117.253:9999", token); |
|
|
|
// Point point = Point |
|
|
|
// .measurement("topsail4") |
|
|
|
// .addTag("imei", history.getImei()) |
|
|
|
// .addField("batterylevel", history.getBatterylevel()) |
|
|
|
// .time(Instant.now().toEpochMilli(), WritePrecision.MS); |
|
|
|
// WriteApiBlocking writeApi = influxDBClient.getWriteApiBlocking(); |
|
|
|
// Gson gson=new Gson(); |
|
|
|
// history.setTime(null); |
|
|
|
// Temperature temperature =gson.fromJson(gson.toJson(history),Temperature.class); |
|
|
|
// temperature.time=Instant.now(); |
|
|
|
// writeApi.writeMeasurement("iot", "topsail", WritePrecision.NS, temperature); |
|
|
|
// influxDBClient.close(); |
|
|
|
} |
|
|
|
|
|
|
|
// public void AddDeviceLog(String imei, String type, String params) { |
|
|
|
// DeviceLog deviceLog = new DeviceLog(); |
|
|
|
// deviceLog.setImei(imei); |
|
|
|
// deviceLog.setOperation(type); |
|
|
|
// deviceLog.setParams(params); |
|
|
|
// deviceLog.setUpdatetime(new Date()); |
|
|
|
// deviceLogMapper.insert(deviceLog); |
|
|
|
// } |
|
|
|
|
|
|
|
/** |
|
|
|
* 查询设备的所属信息 |
|
|
|
*/ |
|
|
|
public DeviceBelongInfo getDeviceBelongInfo(String imei) { |
|
|
|
return deviceInfoMapper.queryDeviceBelongInfo(imei); |
|
|
|
System.out.println("下发日志写入成功==========》" + deviceLogData.getImei()); |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 删除掉所有的Influxdb数据 |
|
|
|
*/ |
|
|
|
public void deleteDeviceLog() { |
|
|
|
// String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
// String bucket = "devicelog"; |
|
|
|
// String org = "shengdilan"; |
|
|
|
// InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
// StringBuffer query = new StringBuffer(); |
|
|
|
// query.append("from(bucket: \"devicelog\") "); |
|
|
|
// query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); |
|
|
|
// query.append("|> range(start: -36d)"); |
|
|
|
// System.out.println("查询语句==========:" + query); |
|
|
|
// DeleteApi deleteApi = client.getDeleteApi(); |
|
|
|
// OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
// OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
// try { |
|
|
|
// // Delete data with specific time range |
|
|
|
// deleteApi.delete(start, stop, "", bucket, org); |
|
|
|
// System.out.println("Data deleted successfully"); |
|
|
|
// } catch (Exception e) { |
|
|
|
// e.printStackTrace(); |
|
|
|
// } finally { |
|
|
|
// client.close(); |
|
|
|
// } |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"devicelog\") "); |
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")")); |
|
|
|
query.append("|> range(start: -36d)"); |
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
DeleteApi deleteApi = client.getDeleteApi(); |
|
|
|
OffsetDateTime start = OffsetDateTime.of(2020, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
OffsetDateTime stop = OffsetDateTime.of(2026, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); |
|
|
|
try { |
|
|
|
// Delete data with specific time range |
|
|
|
deleteApi.delete(start, stop, "", LOG_BUCKET_NAME, org); |
|
|
|
System.out.println("Data deleted successfully"); |
|
|
|
} catch (Exception e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} finally { |
|
|
|
client.close(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
@ -239,13 +114,12 @@ public class DeviceLogService { |
|
|
|
* @param houseId 房间id |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
public List<DeviceLogData> getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String imei, String supplierId, String companyId, String operator, Integer houseId) { |
|
|
|
String token = "_HeUSb3p4M0iesYrbegw3KokEhAPFzqzGEB3DNC6f6f46DPlaiU29PXFbhtkhFuVVearVSLfcSAK8ubu3eYyug=="; |
|
|
|
String bucket = "devicelog"; |
|
|
|
String org = "shengdilan"; |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create("http://172.16.1.124:8086", token.toCharArray()); |
|
|
|
StringBuffer query = new StringBuffer(); |
|
|
|
query.append("from(bucket: \"devicelog\") "); |
|
|
|
public JSONObject getPageDeviceLog(Integer pageNode, Integer pageSize, String startTime, String endTime, String result, String imei, String supplierId, String companyId, String operator, Integer houseId, String bindingInfo) { |
|
|
|
JSONObject resultSet = new JSONObject(); |
|
|
|
InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org); |
|
|
|
// 1. 拼接公共过滤条件片段(总条数和分页查询共用) |
|
|
|
StringBuilder filterFragment = new StringBuilder(); |
|
|
|
filterFragment.append("from(bucket: \"devicelog\") "); |
|
|
|
if (startTime != null && endTime != null) { |
|
|
|
SimpleDateFormat oldFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
|
|
SimpleDateFormat newFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); |
|
|
|
@ -254,22 +128,87 @@ public class DeviceLogService { |
|
|
|
try { |
|
|
|
start = newFormat.format(oldFormat.parse(startTime)); |
|
|
|
String stop = newFormat.format(oldFormat.parse(endTime)); |
|
|
|
query.append(String.format(" |> range(start:%s, stop:%s)", start, stop)); |
|
|
|
filterFragment.append(String.format(" |> range(start:%s, stop:%s)", start, stop)); |
|
|
|
} catch (ParseException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
query.append("|> range(start: -36h)"); |
|
|
|
filterFragment.append("|> range(start: -36h)"); |
|
|
|
} |
|
|
|
query.append(String.format(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\") |> filter(fn: (r) => r[\"imei\"] == \"%s\") |> filter(fn: (r) => r[\"_field\"] == \"logjson\" ) |> sort(columns:[\"_time\"], desc:true) ", imei)); |
|
|
|
if (pageNode != null && pageSize != null) { |
|
|
|
query.append(String.format(" |> limit(n: %s,offset:%s)", pageSize, pageNode - 1)); |
|
|
|
// 2. 拼接固定过滤条件(_measurement和_field,必选) |
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_measurement\"] == \"devicelog\")"); |
|
|
|
filterFragment.append(" |> filter(fn: (r) => r[\"_field\"] == \"logjson\")"); |
|
|
|
// 3. 动态拼接imei过滤条件(仅当imei非空且非空白字符串时拼接) |
|
|
|
if (imei != null && !imei.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"imei\"] == \"%s\")", imei.trim())); |
|
|
|
} |
|
|
|
query.append(" |> yield(name: \"last\")"); |
|
|
|
System.out.println("查询语句==========:" + query); |
|
|
|
List<FluxTable> tables = client.getQueryApi().query(query.toString(), org); |
|
|
|
// 4. 动态拼接result过滤条件(仅当result非空且非空白字符串时拼接) |
|
|
|
if (result != null && !result.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"result\"] == \"%s\")", result.trim())); |
|
|
|
} |
|
|
|
// 5. 动态拼接supplierId过滤条件(仅当supplierId非空且非空白字符串时拼接) |
|
|
|
if (supplierId != null && !supplierId.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"supplierId\"] == \"%s\")", supplierId.trim())); |
|
|
|
} |
|
|
|
// 6. 动态拼接companyId过滤条件(仅当companyId非空且非空白字符串时拼接) |
|
|
|
if (companyId != null && !companyId.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"companyId\"] == \"%s\")", companyId.trim())); |
|
|
|
} |
|
|
|
// 7. 动态拼接operator过滤条件(仅当operator非空且非空白字符串时拼接) |
|
|
|
if (operator != null && !operator.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"operator\"] == \"%s\")", operator.trim())); |
|
|
|
} |
|
|
|
// 8. 动态拼接houseId过滤条件(仅当houseId非空且非空白字符串时拼接) |
|
|
|
if (houseId != null) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"houseId\"] == \"%s\")", houseId)); |
|
|
|
} |
|
|
|
// ========== 核心修复:Java 8不支持""",改用+拼接多行字符串 ========== |
|
|
|
// 1. 拼接模糊过滤条件(Flux逻辑不变,仅改字符串写法) |
|
|
|
// 3. 核心修复:belonginfo模糊查询(两步修复) |
|
|
|
if (bindingInfo != null && !bindingInfo.trim().isEmpty()) { |
|
|
|
filterFragment.append(String.format(" |> filter(fn: (r) => r[\"deviceBelongInfo\"] =~ /.*" + bindingInfo.trim() + ".*/)")); |
|
|
|
} |
|
|
|
// if (bindingInfo != null && !bindingInfo.trim().isEmpty()) { |
|
|
|
// // 核心:containsStr实现模糊匹配,先判断字段非空避免报错 |
|
|
|
// filterFragment.append(String.format( |
|
|
|
// " |> filter(fn: (r) => contains(value: r[\"deviceBelongInfo\"], set: [ \"%s\"]))", |
|
|
|
// bindingInfo.trim() |
|
|
|
// )); |
|
|
|
// } |
|
|
|
// 5. 拼接固定排序条件(按时间倒序) |
|
|
|
// 2. 构建总条数查询子句(yield命名为total) |
|
|
|
StringBuilder totalQuery = new StringBuilder(); |
|
|
|
totalQuery.append(filterFragment); // 复用过滤条件 |
|
|
|
totalQuery.append(" |> count(column: \"_value\")"); // 统计总条数 |
|
|
|
totalQuery.append(" |> yield(name: \"total\")\n"); // 标记结果集为total |
|
|
|
|
|
|
|
// 3. 构建分页数据查询子句(yield命名为data) |
|
|
|
StringBuilder dataQuery = new StringBuilder(); |
|
|
|
dataQuery.append(filterFragment); // 复用过滤条件 |
|
|
|
dataQuery.append(" |> sort(columns:[\"_time\"], desc:true)"); |
|
|
|
// 处理分页参数默认值 |
|
|
|
int finalPageNum = (pageNode == null || pageNode <= 0) ? DEFAULT_PAGE_NUM : pageNode; |
|
|
|
int finalPageSize = (pageSize == null || pageSize <= 0) ? DEFAULT_PAGE_SIZE : pageSize; |
|
|
|
int offset = (finalPageNum - 1) * finalPageSize; |
|
|
|
dataQuery.append(String.format(" |> limit(n: %d, offset: %d)", finalPageSize, offset)); |
|
|
|
dataQuery.append(" |> yield(name: \"data\")"); // 标记结果集为data |
|
|
|
System.out.println("查询数量语句:" + totalQuery.toString()); |
|
|
|
System.out.println("查询数据语句:" + dataQuery.toString()); |
|
|
|
//查询总条数 |
|
|
|
long totalCount = 0; |
|
|
|
List<FluxTable> totalTables = client.getQueryApi().query(totalQuery.toString()); |
|
|
|
for (FluxTable table : totalTables) { |
|
|
|
for (FluxRecord record : table.getRecords()) { |
|
|
|
Object total = record.getValue(); |
|
|
|
if (total != null) { |
|
|
|
totalCount = totalCount + ((Number) total).longValue(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
//查询数据结果 |
|
|
|
List<FluxTable> dataResults = client.getQueryApi().query(dataQuery.toString()); |
|
|
|
List<DeviceLogData> rerurnList = new ArrayList<>(); |
|
|
|
for (FluxTable fluxTable : tables) { |
|
|
|
for (FluxTable fluxTable : dataResults) { |
|
|
|
List<FluxRecord> records = fluxTable.getRecords(); |
|
|
|
for (FluxRecord fluxRecord : records) { |
|
|
|
String value = (String) fluxRecord.getValueByKey("_value"); |
|
|
|
@ -279,7 +218,25 @@ public class DeviceLogService { |
|
|
|
} |
|
|
|
client.close(); |
|
|
|
Collections.reverse(rerurnList); |
|
|
|
return rerurnList; |
|
|
|
resultSet.put("count", totalCount); |
|
|
|
resultSet.put("list", rerurnList); |
|
|
|
return resultSet; |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* Flux正则转义(仅转义RE2引擎的特殊字符,避免\Q\E) |
|
|
|
* Flux正则特殊字符:. * + ? | ( ) [ ] { } ^ $ \ |
|
|
|
*/ |
|
|
|
private static String escapeFluxRegex(String keyword) { |
|
|
|
if (keyword == null || keyword.isEmpty()) { |
|
|
|
return ""; |
|
|
|
} |
|
|
|
// 转义Flux正则的特殊字符(替换为\+字符) |
|
|
|
String[] specialChars = {"\\", ".", "*", "+", "?", "|", "(", ")", "[", "]", "{", "}", "^", "$"}; |
|
|
|
String escaped = keyword; |
|
|
|
for (String ch : specialChars) { |
|
|
|
escaped = escaped.replace(ch, "\\" + ch); |
|
|
|
} |
|
|
|
return escaped; |
|
|
|
} |
|
|
|
} |