|
|
@@ -14,6 +14,7 @@ import com.middle.platform.manage.biz.domain.req.IotUrlPara;
|
|
|
import com.middle.platform.manage.biz.entity.IotProduct;
|
|
|
import com.middle.platform.manage.biz.entity.IotUrl;
|
|
|
import com.middle.platform.manage.biz.mapper.IotUrlMapper;
|
|
|
+import com.middle.platform.redis.service.CacheService;
|
|
|
import jakarta.annotation.Resource;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
@@ -34,7 +35,57 @@ public class IotUrlService {
|
|
|
@Resource
|
|
|
private IotProductService iotProductService;
|
|
|
private final DynamicTopicApi dynamicTopicApi;
|
|
|
- private String topicPrefix = "/device/%s/*/";
|
|
|
+ private final CacheService cacheService;
|
|
|
+ private String topicPrefix = "/iot/%s/+/";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 校验topic
|
|
|
+ *
|
|
|
+ * @param iotUrlPara
|
|
|
+ */
|
|
|
+ private void checkTopic(IotUrlPara iotUrlPara) {
|
|
|
+ //校验前缀是否为 /iot/{productKey}/*/
|
|
|
+ IotProduct query = iotProductService.query(iotUrlPara.getProductId());
|
|
|
+ if (Objects.isNull(query)) {
|
|
|
+ throw new BusinessException("产品不存在");
|
|
|
+ }
|
|
|
+ //产品key
|
|
|
+ String code = query.getCode();
|
|
|
+ if (!StringUtils.startsWith(iotUrlPara.getUrl(), String.format(topicPrefix, code))) {
|
|
|
+ throw new BusinessException("topic必须以+" + String.format(topicPrefix, code) + "为前缀");
|
|
|
+ }
|
|
|
+ //todo topic合规校验
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅地址,添加缓存
|
|
|
+ *
|
|
|
+ * @param url topic地址
|
|
|
+ * @param func func+permission
|
|
|
+ */
|
|
|
+ private void subUrl(String url, String func) {
|
|
|
+ ThreadTask.addJob(() -> {
|
|
|
+ //订阅topic
|
|
|
+ dynamicTopicApi.saveTopic(new TopicDto(url, 0));
|
|
|
+ //写入 CacheConstant.TOPIC_CACHE缓存
|
|
|
+ cacheService.setTopic(url, func);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 取消订阅的topic,并删除缓存的topic
|
|
|
+ *
|
|
|
+ * @param url
|
|
|
+ */
|
|
|
+ private void removeUrl(String url) {
|
|
|
+ ThreadTask.addJob(() -> {
|
|
|
+ dynamicTopicApi.removeTopic(new TopicDto(url, 0));
|
|
|
+ //删除原topic
|
|
|
+ cacheService.delTopic(url);
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* 新增topic
|
|
|
@@ -42,6 +93,7 @@ public class IotUrlService {
|
|
|
* @param iotUrlPara
|
|
|
* @return
|
|
|
*/
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
public Object saveUrl(IotUrlPara iotUrlPara) {
|
|
|
//topi规则校验
|
|
|
checkTopic(iotUrlPara);
|
|
|
@@ -55,31 +107,13 @@ public class IotUrlService {
|
|
|
iotUrl.setRemark(iotUrlPara.getRemark());
|
|
|
iotUrlMapper.insert(iotUrl);
|
|
|
//新增订阅的topic
|
|
|
- if (Objects.equals(iotUrlPara.getType(), UrlType.MQTT) && (Objects.equals(iotUrlPara.getPermission(), TopicType.PUB) || Objects.equals(iotUrlPara.getPermission(), TopicType.BOTH))) {
|
|
|
- dynamicTopicApi.saveTopic(new TopicDto(iotUrlPara.getUrl(), 0));
|
|
|
+ if (Objects.equals(iotUrlPara.getType(), UrlType.MQTT)
|
|
|
+ && (Objects.equals(iotUrlPara.getPermission(), TopicType.PUB) || Objects.equals(iotUrlPara.getPermission(), TopicType.BOTH))) {
|
|
|
+ subUrl(iotUrl.getUrl(), iotUrl.getFunc() + String.valueOf(iotUrl.getPermission()));
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 校验topic
|
|
|
- *
|
|
|
- * @param iotUrlPara
|
|
|
- */
|
|
|
- private void checkTopic(IotUrlPara iotUrlPara) {
|
|
|
- //校验前缀是否为 /device/{productKey}/*/
|
|
|
- IotProduct query = iotProductService.query(iotUrlPara.getProductId());
|
|
|
- if (Objects.isNull(query)) {
|
|
|
- throw new BusinessException("产品不存在");
|
|
|
- }
|
|
|
- //产品key
|
|
|
- String code = query.getCode();
|
|
|
- if (!StringUtils.startsWith(iotUrlPara.getUrl(), String.format(topicPrefix, code))) {
|
|
|
- throw new BusinessException("topic必须以+" + String.format(topicPrefix, code) + "为前缀");
|
|
|
- }
|
|
|
- //todo topic合规校验
|
|
|
-
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* 修改topic
|
|
|
@@ -109,9 +143,8 @@ public class IotUrlService {
|
|
|
iotUrlMapper.updateById(iotUrl);
|
|
|
//取消订阅原topic 订阅topic
|
|
|
if (Objects.equals(iotUrlPara.getType(), UrlType.MQTT) && (Objects.equals(iotUrlPara.getPermission(), TopicType.PUB) || Objects.equals(iotUrlPara.getPermission(), TopicType.BOTH))) {
|
|
|
- dynamicTopicApi.removeTopic(new TopicDto(iotUrlPara.getUrl(), 0));
|
|
|
- dynamicTopicApi.saveTopic(new TopicDto(iotUrlPara.getUrl(), 0));
|
|
|
-
|
|
|
+ removeUrl(iotUrlCheck.getUrl());
|
|
|
+ subUrl(iotUrl.getUrl(), iotUrl.getFunc() + String.valueOf(iotUrl.getPermission()));
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
@@ -133,7 +166,7 @@ public class IotUrlService {
|
|
|
iotUrlMapper.update(iotUrl, new LambdaQueryWrapper<IotUrl>().eq(IotUrl::getId, id).eq(IotUrl::getDelFlag, Global.UN_DEL));
|
|
|
//取消订阅
|
|
|
if (Objects.equals(iotUrlCheck.getType(), UrlType.MQTT) && (Objects.equals(iotUrlCheck.getPermission(), TopicType.PUB) || Objects.equals(iotUrlCheck.getPermission(), TopicType.BOTH))) {
|
|
|
- dynamicTopicApi.removeTopic(new TopicDto(iotUrlCheck.getUrl(), 0));
|
|
|
+ removeUrl(iotUrlCheck.getUrl());
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
@@ -167,9 +200,13 @@ public class IotUrlService {
|
|
|
iotUrl.setType(iotProduct.getReportProtocol());
|
|
|
iotUrl.setRemark(value.getDesc());
|
|
|
iotUrlMapper.insert(iotUrl);
|
|
|
+ subUrl(iotUrl.getUrl(), iotUrl.getFunc() + String.valueOf(iotUrl.getPermission()));
|
|
|
}
|
|
|
}
|
|
|
+ //http
|
|
|
+ if (Objects.equals(iotProduct.getReportProtocol(), UrlProtocol.HTTP)) {
|
|
|
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|