diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
index 890d19acf1..b4dba548b7 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
@@ -45,4 +45,14 @@ public interface ConsomerGroupConstants {
*/
String POINT_OPERATION_LOG_GROUP = "point_operation_log_group";
+ /**
+ * 开放的对接数据(中间库) 组织变更事件监听器分组
+ */
+ String OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP = "open_data_org_change_event_listener_group";
+
+ /**
+ * 开放的对接数据(中间库) 工作人员变更事件监听器分组
+ */
+ String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group";
+
}
diff --git a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
index 8fe36aa53a..927fa19198 100644
--- a/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
+++ b/epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
@@ -13,6 +13,7 @@ public interface TopicConstants {
* 项目变动
*/
String PROJECT_CHANGED = "project_changed";
+
/**
* 小组成就
*/
@@ -27,4 +28,14 @@ public interface TopicConstants {
* 积分系统话题
*/
String POINT = "point";
+
+ /**
+ * 组织信息变更
+ */
+ String ORG = "org";
+
+ /**
+ * 工作人员信息变更
+ */
+ String STAFF = "staff";
}
diff --git a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
index 626b081436..e1c59f003e 100644
--- a/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
+++ b/epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
@@ -25,4 +25,44 @@ public interface SystemMessageType {
*/
String POINT_RULE_CHANGED = "point_rule_changed";
+ /**
+ * 网格新增
+ */
+ String GRID_CREATE = "grid_create";
+
+ /**
+ * 网格信息变更
+ */
+ String GRID_CHANGE = "grid_change";
+
+ /**
+ * agency机构新增
+ */
+ String AGENCY_CREATE = "agency_create";
+
+ /**
+ * agency机构信息变更
+ */
+ String AGENCY_CHANGE = "agency_change";
+
+ /**
+ * 部门新增
+ */
+ String DEPARTMENT_CREATE = "department_create";
+
+ /**
+ * 部门信息变更
+ */
+ String DEPARTMENT_CHANGE = "department_change";
+
+ /**
+ * 工作人员新增
+ */
+ String STAFF_CREATE = "staff_create";
+
+ /**
+ * 工作人员变更
+ */
+ String STAFF_CHANGE = "staff_change";
+
}
diff --git a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
index 9c56520419..f7aacf297c 100644
--- a/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
+++ b/epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
@@ -79,6 +79,18 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.POINT_RULE_CHANGED:
topic = TopicConstants.POINT;
break;
+ case SystemMessageType.GRID_CREATE:
+ case SystemMessageType.GRID_CHANGE:
+ case SystemMessageType.AGENCY_CREATE:
+ case SystemMessageType.AGENCY_CHANGE:
+ case SystemMessageType.DEPARTMENT_CREATE:
+ case SystemMessageType.DEPARTMENT_CHANGE:
+ topic = TopicConstants.ORG;
+ break;
+ case SystemMessageType.STAFF_CREATE:
+ case SystemMessageType.STAFF_CHANGE:
+ topic = TopicConstants.STAFF;
+ break;
}
return topic;
}
diff --git a/epmet-module/open-data-worker/open-data-worker-client/pom.xml b/epmet-module/open-data-worker/open-data-worker-client/pom.xml
new file mode 100644
index 0000000000..14be115052
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-client/pom.xml
@@ -0,0 +1,15 @@
+
+
+
+ open-data-worker
+ com.epmet
+ 2.0.0
+
+ 4.0.0
+
+ open-data-worker-client
+
+
+
\ No newline at end of file
diff --git a/epmet-module/open-data-worker/open-data-worker-server/pom.xml b/epmet-module/open-data-worker/open-data-worker-server/pom.xml
new file mode 100644
index 0000000000..289b030dc7
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/pom.xml
@@ -0,0 +1,258 @@
+
+
+
+ open-data-worker
+ com.epmet
+ 2.0.0
+
+ 4.0.0
+
+ open-data-worker-server
+
+
+
+ com.epmet
+ epmet-commons-tools
+ 2.0.0
+
+
+ com.epmet
+ epmet-commons-mybatis
+ 2.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.springframework
+ spring-context-support
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ de.codecentric
+ spring-boot-admin-starter-client
+ ${spring.boot.admin.version}
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-discovery
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-alibaba-nacos-config
+
+
+
+ io.github.openfeign
+ feign-httpclient
+ 10.3.0
+
+
+
+
+ com.epmet
+ epmet-commons-rocketmq
+ 2.0.0
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+ ${project.basedir}/src/main/java
+
+
+ true
+ ${basedir}/src/main/resources
+
+
+
+
+
+
+ dev
+
+ 8117
+ dev
+
+
+
+
+
+ epmet_open_data_user
+ EpmEt-db-UsEr
+
+ 0
+ 192.168.1.140
+ 6379
+ 123456
+
+ true
+ 192.168.1.140:8848
+ 1fecc730-5e6e-464c-aae9-7567944e7936
+
+
+ false
+
+
+ true
+
+
+ false
+
+
+ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
+ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+
+ true
+ 192.168.1.140:9876;192.168.1.141:9876
+
+
+
+ local
+
+ true
+
+
+ 8117
+ local
+
+
+
+
+
+ epmet_open_data_user
+ EpmEt-db-UsEr
+
+ 0
+ 192.168.1.140
+ 6379
+ 123456
+
+ false
+ 192.168.1.140:8848
+ 1fecc730-5e6e-464c-aae9-7567944e7936
+
+
+ false
+
+
+ false
+
+
+ false
+
+
+ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
+ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+
+ false
+ 192.168.1.140:9876;192.168.1.141:9876
+
+
+
+ test
+
+
+ 8117
+ test
+
+
+
+
+
+ epmet
+ elink@833066
+
+ 0
+ r-m5eoz5b6tkx09y6bpz.redis.rds.aliyuncs.com
+ 6379
+ EpmEtrEdIs!q@w
+
+ true
+ 192.168.10.150:8848
+ 67e3c350-533e-4d7c-9f8f-faf1b4aa82ae
+
+
+ false
+
+
+ true
+
+
+ true
+
+
+ https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4
+ SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd
+
+
+ true
+ 192.168.10.161:9876
+
+
+
+ prod
+
+ 8117
+ prod
+
+
+
+
+
+ epmet_open_data_user
+ EpmEt-db-UsEr
+
+ 0
+ r-m5ez3n1j0qc3ykq2ut.redis.rds.aliyuncs.com
+ 6379
+ EpmEtclOUdrEdIs!Q2w
+
+ true
+ 192.168.11.180:8848
+ bd205d23-e696-47be-b995-916313f86e99
+
+
+ false
+
+
+ true
+
+
+ true
+
+
+ https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c
+ SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1
+
+
+ true
+ 192.168.11.187:9876;192.168.11.184:9876
+
+
+
+
+
\ No newline at end of file
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java
new file mode 100644
index 0000000000..bb8ff976ac
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java
@@ -0,0 +1,26 @@
+package com.epmet.opendata;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.cloud.openfeign.EnableFeignClients;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * @Description OpenData服务启动类
+ * 注意:此处要使用@ComponentScan的原因是,启动类处于包com.epmet.opendata下,想用com.epmet.commons包下的一些Spring对象,需要
+ * 手动指定扫描包,否则无法扫描
+ * @author wxz
+ * @date 2021.10.13 15:16:05
+*/
+@SpringBootApplication
+@EnableDiscoveryClient
+@EnableFeignClients
+@ServletComponentScan
+@ComponentScan(value = { "com.epmet.opendata", "com.epmet.commons" })
+public class OpenDataApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(OpenDataApplication.class, args);
+ }
+}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java
new file mode 100644
index 0000000000..a25220cbfb
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java
@@ -0,0 +1,161 @@
+package com.epmet.opendata.config;
+
+import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
+import com.alibaba.nacos.api.naming.pojo.ListView;
+import com.epmet.commons.tools.exception.ExceptionUtils;
+import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
+import com.netflix.loadbalancer.ILoadBalancer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author wxz
+ * @Description Nacos服务列表刷新监听注册器
+ * @date 2021.09.22 14:33:11
+ */
+@Slf4j
+@Configuration
+@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceListChangedListening", name = "enable", havingValue = "true", matchIfMissing = false)
+public class NacosServiceListListenerRegisterer {
+
+ public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit";
+ // 服务列表拉取间隔:10s
+ public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10;
+
+ private NamingService namingService;
+
+ private ScheduledExecutorService executor;
+
+ @Autowired
+ private NacosDiscoveryProperties discoveryProperties;
+
+ @Autowired
+ private SpringClientFactory springClientFactory;
+
+ // 监听中的服务列表
+ private List observingServers = new ArrayList<>(33);
+
+ @PostConstruct
+ public void init() {
+ namingService = discoveryProperties.namingServiceInstance();
+ // 启动监听
+ executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("NacosServiceListWatchingRegisterer");
+ return thread;
+ }
+ });
+
+ // 立即启动,并15s刷新一次服务列表,用于新服务列表的发现
+ ScheduledFuture> future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public class EpmetNacosServiceListListener implements Runnable {
+
+ @Override
+ public void run() {
+ doRefreshServerList();
+ }
+
+ /**
+ * @param
+ * @return
+ * @description 执行刷新
+ * @author wxz
+ * @date 2021.09.22 16:04:49
+ */
+ private synchronized void doRefreshServerList() {
+ ListView serviceListView = null;
+ try {
+ serviceListView = namingService.getServicesOfServer(1, 100);
+ //启动监听
+ if (serviceListView == null || serviceListView.getCount() == 0) {
+ log.info("【Nacos服务列表定时刷新】当前无任何可添加监听的服务");
+ return;
+ }
+ List serviceList = serviceListView.getData();
+ log.info("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList);
+
+ for (String service : serviceList) {
+ try {
+
+ // 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。
+ if (observingServers.contains(service)) {
+ continue;
+ }
+
+ namingService.subscribe(service, new EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ if (event instanceof NamingEvent) {
+ NamingEvent namingEvent = (NamingEvent) event;
+ log.info("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances());
+ doRefreshServerList(service);
+ }
+ }
+ });
+
+ // 将该服务加入到监听列表中
+ observingServers.add(service);
+ } catch (NacosException e) {
+ String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
+ log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace);
+ }
+ }
+
+ } catch (NacosException e) {
+ String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
+ log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace);
+ }
+ }
+
+ /**
+ * @param serviceName
+ * @return
+ * @description 刷新ServerList
+ * @author wxz
+ * @date 2021.09.22 09:29:16
+ */
+ private void doRefreshServerList(String serviceName) {
+ // 刷新方式1:反射调用DynamicServerListLoadBalancer中的restOfInit()方法。该方法原来只执行一次,此处不推荐用
+ //ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName);
+ //if (loadBalancer instanceof ZoneAwareLoadBalancer) {
+ // ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer;
+ // IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName);
+ // try {
+ // Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class);
+ // restOfInitMethod.setAccessible(true);
+ // restOfInitMethod.invoke(zaLoadBalancer, clientConfig);
+ // } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ // String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
+ // log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace);
+ // }
+ //}
+
+ // 刷新方式2:DynamicServerListLoadBalancer#updateListOfServers()该方法为ribbon定时刷新服务列表的时候真正调用的方法,但是加了@VisibleForTesting
+ // 暂且 1 try
+ ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName);
+ if (loadBalancer instanceof DynamicServerListLoadBalancer) {
+ DynamicServerListLoadBalancer dslb = (DynamicServerListLoadBalancer) loadBalancer;
+ dslb.updateListOfServers();
+ }
+ }
+ }
+
+}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
new file mode 100644
index 0000000000..ede4eeb83a
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
@@ -0,0 +1,39 @@
+package com.epmet.opendata.mq;
+
+import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
+import com.epmet.commons.rocketmq.constants.TopicConstants;
+import com.epmet.commons.rocketmq.register.MQAbstractRegister;
+import com.epmet.commons.rocketmq.register.MQConsumerProperties;
+import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
+import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.springframework.stereotype.Component;
+
+/**
+ * @Description 如果rocketmq.enable=true,这里必须实现,且 实例化
+ * @author wxz
+ * @date 2021.07.14 17:13:41
+*/
+@Component
+public class RocketMQConsumerRegister extends MQAbstractRegister {
+
+ @Override
+ public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
+ // 客户初始化监听器注册
+ register(consumerProperties,
+ ConsomerGroupConstants.OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP,
+ MessageModel.CLUSTERING,
+ TopicConstants.ORG,
+ "*",
+ new OpenDataOrgChangeEventListener());
+
+ register(consumerProperties,
+ ConsomerGroupConstants.OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP,
+ MessageModel.CLUSTERING,
+ TopicConstants.STAFF,
+ "*",
+ new OpenDataStaffChangeEventListener());
+
+ // ...其他监听器类似
+ }
+}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
new file mode 100644
index 0000000000..17cc9eca72
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
@@ -0,0 +1,61 @@
+package com.epmet.opendata.mq.listener;
+
+import com.epmet.commons.tools.distributedlock.DistributedLock;
+import com.epmet.commons.tools.exception.ExceptionUtils;
+import com.epmet.commons.tools.exception.RenException;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.redisson.api.RLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @Description 系统对接中间库,组织信息变更监听器
+ * @author wxz
+ * @date 2021.10.13 15:21:48
+*/
+public class OpenDataOrgChangeEventListener implements MessageListenerConcurrently {
+
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ try {
+ msgs.forEach(msg -> consumeMessage(msg));
+ } catch (Exception e) {
+ logger.error(ExceptionUtils.getErrorStackTrace(e));
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ private void consumeMessage(MessageExt messageExt) {
+ // msg即为消息体
+ // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
+ String msg = new String(messageExt.getBody());
+ String tags = messageExt.getTags();
+
+ logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
+
+ DistributedLock distributedLock = null;
+ RLock lock = null;
+ try {
+ //distributedLock = SpringContextUtils.getBean(DistributedLock.class);
+ //lock = distributedLock.getLock(String.format("lock:open_data_org:%s", orgId),
+ // 30L, 30L, TimeUnit.SECONDS);
+ } catch (RenException e) {
+ // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
+ logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ } catch (Exception e) {
+ // 不是我们自己抛出的异常,可以让MQ重试
+ logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ throw e;
+ } finally {
+ //distributedLock.unLock(lock);
+ }
+ }
+}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
new file mode 100644
index 0000000000..b8f0be40bd
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
@@ -0,0 +1,61 @@
+package com.epmet.opendata.mq.listener;
+
+import com.epmet.commons.tools.distributedlock.DistributedLock;
+import com.epmet.commons.tools.exception.ExceptionUtils;
+import com.epmet.commons.tools.exception.RenException;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.redisson.api.RLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @Description 系统对接中间库,工作人员信息变更监听器
+ * @author wxz
+ * @date 2021.10.13 15:21:48
+*/
+public class OpenDataStaffChangeEventListener implements MessageListenerConcurrently {
+
+ private Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ try {
+ msgs.forEach(msg -> consumeMessage(msg));
+ } catch (Exception e) {
+ logger.error(ExceptionUtils.getErrorStackTrace(e));
+ return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+
+ private void consumeMessage(MessageExt messageExt) {
+ // msg即为消息体
+ // tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
+ String msg = new String(messageExt.getBody());
+ String tags = messageExt.getTags();
+
+ logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}", msg, tags);
+
+ DistributedLock distributedLock = null;
+ RLock lock = null;
+ try {
+ //distributedLock = SpringContextUtils.getBean(DistributedLock.class);
+ //lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId),
+ // 30L, 30L, TimeUnit.SECONDS);
+ } catch (RenException e) {
+ // 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
+ logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ } catch (Exception e) {
+ // 不是我们自己抛出的异常,可以让MQ重试
+ logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
+ throw e;
+ } finally {
+ //distributedLock.unLock(lock);
+ }
+ }
+}
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml
new file mode 100644
index 0000000000..59302e27eb
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml
@@ -0,0 +1,138 @@
+server:
+ port: @server.port@
+ version: @version@
+ servlet:
+ context-path: /opendata
+
+spring:
+ main:
+ allow-bean-definition-overriding: true
+ application:
+ name: open-data-server
+ #环境 dev|test|prod
+ profiles:
+ active: @spring.profiles.active@
+ jackson:
+ time-zone: GMT+8
+ date-format: yyyy-MM-dd HH:mm:ss
+ redis:
+ database: @spring.redis.index@
+ host: @spring.redis.host@
+ port: @spring.redis.port@
+ password: @spring.redis.password@
+ timeout: 30s
+ datasource:
+ druid:
+ #MySQL
+ driver-class-name: com.mysql.cj.jdbc.Driver
+ url: @spring.datasource.druid.url@
+ username: @spring.datasource.druid.username@
+ password: @spring.datasource.druid.password@
+ cloud:
+ nacos:
+ discovery:
+ server-addr: @nacos.server-addr@
+ #nacos的命名空间ID,默认是public
+ namespace: @nacos.discovery.namespace@
+ #不把自己注册到注册中心的地址
+ register-enabled: @nacos.register-enabled@
+ ip: @nacos.ip@
+ serviceListChangedListening:
+ enable: @nacos.service-list-changed-listening.enable@
+ config:
+ enabled: @nacos.config-enabled@
+ server-addr: @nacos.server-addr@
+ namespace: @nacos.config.namespace@
+ group: @nacos.config.group@
+ file-extension: yaml
+ #指定共享配置,且支持动态刷新
+# ext-config:
+# - data-id: datasource.yaml
+# group: ${spring.cloud.nacos.config.group}
+# refresh: true
+# - data-id: common.yaml
+# group: ${spring.cloud.nacos.config.group}
+# refresh: true
+
+ # 数据迁移工具flyway
+ flyway:
+ enabled: @spring.flyway.enabled@
+ locations: classpath:db/migration
+ url: @spring.datasource.druid.url@
+ user: @spring.datasource.druid.username@
+ password: @spring.datasource.druid.password@
+ baseline-on-migrate: true
+ baseline-version: 0
+
+
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
+ endpoint:
+ health:
+ show-details: ALWAYS
+
+mybatis-plus:
+ mapper-locations: classpath:/mapper/**/*.xml
+ #实体扫描,多个package用逗号或者分号分隔
+ typeAliasesPackage: com.epmet.entity
+ global-config:
+ #数据库相关配置
+ db-config:
+ #主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
+ id-type: ID_WORKER
+ #字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
+ field-strategy: NOT_NULL
+ #驼峰下划线转换
+ column-underline: true
+ banner: false
+ #原生配置
+ configuration:
+ map-underscore-to-camel-case: true
+ cache-enabled: false
+ call-setters-on-nulls: true
+ jdbc-type-for-null: 'null'
+
+feign:
+ hystrix:
+ enabled: true
+ client:
+ config:
+ default:
+ loggerLevel: BASIC
+ okhttp:
+ enabled: true
+
+hystrix:
+ command:
+ default:
+ execution:
+ isolation:
+ thread:
+ timeoutInMilliseconds: 60000 #缺省为1000
+
+ribbon:
+ ReadTimeout: 300000
+ ConnectTimeout: 300000
+
+#pageHelper分页插件
+pagehelper:
+ helper-dialect: mysql
+ reasonable: false #分页合理化配置,例如输入页码为-1,则自动转化为最小页码1
+
+#feign 日志需要该配置
+logging:
+ level:
+ com.epmet: debug
+
+dingTalk:
+ robot:
+ webHook: @dingTalk.robot.webHook@
+ secret: @dingTalk.robot.secret@
+
+rocketmq:
+ # 是否开启mq
+ enable: @rocketmq.enable@
+ name-server: @rocketmq.nameserver@
\ No newline at end of file
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql
new file mode 100644
index 0000000000..ac7d9c109c
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql
@@ -0,0 +1,14 @@
+create database epmet_open_data default character set utf8mb4 collate utf8mb4_general_ci;
+
+-- 开发环境的脚本
+CREATE USER epmet_open_data_user@'%' IDENTIFIED BY 'EpmEt-db-UsEr';
+GRANT ALL ON `epmet_open_data`.* TO 'epmet_open_data_user'@'%';
+flush privileges;
+
+
+
+-- 测试环境的脚本
+
+
+
+-- 生产环境的数据
\ No newline at end of file
diff --git a/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml
new file mode 100644
index 0000000000..bcbff37445
--- /dev/null
+++ b/epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml
@@ -0,0 +1,176 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ ${appname}
+
+
+
+
+
+
+
+
+ debug
+
+
+ ${CONSOLE_LOG_PATTERN}
+
+ UTF-8
+
+
+
+
+
+
+
+ ${log.path}/debug.log
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n
+ UTF-8
+
+
+
+
+ ${log.path}/debug-%d{yyyy-MM-dd}.%i.log
+
+ 100MB
+
+
+ 15
+
+
+
+ debug
+ ACCEPT
+ DENY
+
+
+
+
+
+
+ ${log.path}/info.log
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n
+ UTF-8
+
+
+
+
+ ${log.path}/info-%d{yyyy-MM-dd}.%i.log
+
+ 100MB
+
+
+ 15
+
+
+
+ info
+ ACCEPT
+ DENY
+
+
+
+
+
+
+ ${log.path}/warn.log
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n
+ UTF-8
+
+
+
+ ${log.path}/warn-%d{yyyy-MM-dd}.%i.log
+
+ 100MB
+
+
+ 15
+
+
+
+ warn
+ ACCEPT
+ DENY
+
+
+
+
+
+
+ ${log.path}/error.log
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n
+ UTF-8
+
+
+
+ ${log.path}/error-%d{yyyy-MM-dd}.%i.log
+
+ 100MB
+
+
+ 15
+
+
+
+ ERROR
+ ACCEPT
+ DENY
+ ${webHook}
+ ${secret}
+ ${appname}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/epmet-module/open-data-worker/pom.xml b/epmet-module/open-data-worker/pom.xml
new file mode 100644
index 0000000000..97fe762b08
--- /dev/null
+++ b/epmet-module/open-data-worker/pom.xml
@@ -0,0 +1,18 @@
+
+
+
+ epmet-module
+ com.epmet
+ 2.0.0
+
+ 4.0.0
+
+ open-data-worker
+ pom
+
+ open-data-worker-client
+ open-data-worker-server
+
+
\ No newline at end of file
diff --git a/epmet-module/pom.xml b/epmet-module/pom.xml
index 56b1452eec..8b9efdfdb8 100644
--- a/epmet-module/pom.xml
+++ b/epmet-module/pom.xml
@@ -44,6 +44,7 @@
epmet-point
epmet-ext
data-aggregator
-
+ open-data-worker
+