Browse Source

新增:

1.epmet-data-open模块
修改:
1.epmet-message服务,增加针对工作人员/机构的创建/更新事件适配
dev
wxz 4 years ago
parent
commit
932932ea48
  1. 10
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java
  2. 11
      epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java
  3. 40
      epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java
  4. 12
      epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java
  5. 15
      epmet-module/open-data-worker/open-data-worker-client/pom.xml
  6. 258
      epmet-module/open-data-worker/open-data-worker-server/pom.xml
  7. 26
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java
  8. 161
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java
  9. 39
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java
  10. 61
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java
  11. 61
      epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java
  12. 138
      epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml
  13. 14
      epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql
  14. 176
      epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml
  15. 18
      epmet-module/open-data-worker/pom.xml
  16. 3
      epmet-module/pom.xml

10
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/ConsomerGroupConstants.java

@ -45,4 +45,14 @@ public interface ConsomerGroupConstants {
*/
String POINT_OPERATION_LOG_GROUP = "point_operation_log_group";
/**
* 开放的对接数据(中间库) 组织变更事件监听器分组
*/
String OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP = "open_data_org_change_event_listener_group";
/**
* 开放的对接数据(中间库) 工作人员变更事件监听器分组
*/
String OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP = "open_data_staff_change_event_listener_group";
}

11
epmet-commons/epmet-commons-rocketmq/src/main/java/com/epmet/commons/rocketmq/constants/TopicConstants.java

@ -13,6 +13,7 @@ public interface TopicConstants {
* 项目变动
*/
String PROJECT_CHANGED = "project_changed";
/**
* 小组成就
*/
@ -27,4 +28,14 @@ public interface TopicConstants {
* 积分系统话题
*/
String POINT = "point";
/**
* 组织信息变更
*/
String ORG = "org";
/**
* 工作人员信息变更
*/
String STAFF = "staff";
}

40
epmet-module/epmet-message/epmet-message-client/src/main/java/com/epmet/constant/SystemMessageType.java

@ -25,4 +25,44 @@ public interface SystemMessageType {
*/
String POINT_RULE_CHANGED = "point_rule_changed";
/**
* 网格新增
*/
String GRID_CREATE = "grid_create";
/**
* 网格信息变更
*/
String GRID_CHANGE = "grid_change";
/**
* agency机构新增
*/
String AGENCY_CREATE = "agency_create";
/**
* agency机构信息变更
*/
String AGENCY_CHANGE = "agency_change";
/**
* 部门新增
*/
String DEPARTMENT_CREATE = "department_create";
/**
* 部门信息变更
*/
String DEPARTMENT_CHANGE = "department_change";
/**
* 工作人员新增
*/
String STAFF_CREATE = "staff_create";
/**
* 工作人员变更
*/
String STAFF_CHANGE = "staff_change";
}

12
epmet-module/epmet-message/epmet-message-server/src/main/java/com/epmet/service/impl/SystemMessageServiceImpl.java

@ -79,6 +79,18 @@ public class SystemMessageServiceImpl implements SystemMessageService {
case SystemMessageType.POINT_RULE_CHANGED:
topic = TopicConstants.POINT;
break;
case SystemMessageType.GRID_CREATE:
case SystemMessageType.GRID_CHANGE:
case SystemMessageType.AGENCY_CREATE:
case SystemMessageType.AGENCY_CHANGE:
case SystemMessageType.DEPARTMENT_CREATE:
case SystemMessageType.DEPARTMENT_CHANGE:
topic = TopicConstants.ORG;
break;
case SystemMessageType.STAFF_CREATE:
case SystemMessageType.STAFF_CHANGE:
topic = TopicConstants.STAFF;
break;
}
return topic;
}

15
epmet-module/open-data-worker/open-data-worker-client/pom.xml

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>open-data-worker</artifactId>
<groupId>com.epmet</groupId>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>open-data-worker-client</artifactId>
</project>

258
epmet-module/open-data-worker/open-data-worker-server/pom.xml

@ -0,0 +1,258 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>open-data-worker</artifactId>
<groupId>com.epmet</groupId>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>open-data-worker-server</artifactId>
<dependencies>
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-commons-tools</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-commons-mybatis</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>${spring.boot.admin.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- 替换Feign原生httpclient -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
<version>10.3.0</version>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>com.epmet</groupId>
<artifactId>epmet-commons-rocketmq</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
<sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
<resources>
<resource>
<filtering>true</filtering>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>dev</id>
<properties>
<server.port>8117</server.port>
<spring.profiles.active>dev</spring.profiles.active>
<!-- 数据库配置-->
<spring.datasource.druid.url>
<![CDATA[jdbc:mysql://192.168.1.140:3306/epmet_open_data?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai]]>
</spring.datasource.druid.url>
<spring.datasource.druid.username>epmet_open_data_user</spring.datasource.druid.username>
<spring.datasource.druid.password>EpmEt-db-UsEr</spring.datasource.druid.password>
<!-- redis配置 -->
<spring.redis.index>0</spring.redis.index>
<spring.redis.host>192.168.1.140</spring.redis.host>
<spring.redis.port>6379</spring.redis.port>
<spring.redis.password>123456</spring.redis.password>
<!-- nacos -->
<nacos.register-enabled>true</nacos.register-enabled>
<nacos.server-addr>192.168.1.140:8848</nacos.server-addr>
<nacos.discovery.namespace>1fecc730-5e6e-464c-aae9-7567944e7936</nacos.discovery.namespace>
<nacos.config.namespace></nacos.config.namespace>
<nacos.config.group></nacos.config.group>
<nacos.config-enabled>false</nacos.config-enabled>
<nacos.ip/>
<!--是否开启服务列表变更监听-->
<nacos.service-list-changed-listening.enable>true</nacos.service-list-changed-listening.enable>
<!--flyway migration 数据库迁移工具-->
<spring.flyway.enabled>false</spring.flyway.enabled>
<!--钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
<id>local</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<server.port>8117</server.port>
<spring.profiles.active>local</spring.profiles.active>
<!-- 数据库配置-->
<spring.datasource.druid.url>
<![CDATA[jdbc:mysql://192.168.1.140:3306/epmet_open_data?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai]]>
</spring.datasource.druid.url>
<spring.datasource.druid.username>epmet_open_data_user</spring.datasource.druid.username>
<spring.datasource.druid.password>EpmEt-db-UsEr</spring.datasource.druid.password>
<!-- redis配置 -->
<spring.redis.index>0</spring.redis.index>
<spring.redis.host>192.168.1.140</spring.redis.host>
<spring.redis.port>6379</spring.redis.port>
<spring.redis.password>123456</spring.redis.password>
<!-- nacos -->
<nacos.register-enabled>false</nacos.register-enabled>
<nacos.server-addr>192.168.1.140:8848</nacos.server-addr>
<nacos.discovery.namespace>1fecc730-5e6e-464c-aae9-7567944e7936</nacos.discovery.namespace>
<nacos.config.namespace></nacos.config.namespace>
<nacos.config.group></nacos.config.group>
<nacos.config-enabled>false</nacos.config-enabled>
<nacos.ip/>
<!--是否开启服务列表变更监听-->
<nacos.service-list-changed-listening.enable>false</nacos.service-list-changed-listening.enable>
<!--flyway migration 数据库迁移工具-->
<spring.flyway.enabled>false</spring.flyway.enabled>
<!--钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>false</rocketmq.enable>
<rocketmq.nameserver>192.168.1.140:9876;192.168.1.141:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
<id>test</id>
<!--<activation>
<activeByDefault>true</activeByDefault>
</activation>-->
<properties>
<server.port>8117</server.port>
<spring.profiles.active>test</spring.profiles.active>
<!-- 数据库配置-->
<spring.datasource.druid.url>
<![CDATA[jdbc:mysql://rm-m5ef9t617j6o5eup7.mysql.rds.aliyuncs.com:3306/epmet_open_data?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai]]>
</spring.datasource.druid.url>
<spring.datasource.druid.username>epmet</spring.datasource.druid.username>
<spring.datasource.druid.password>elink@833066</spring.datasource.druid.password>
<!-- redis配置 -->
<spring.redis.index>0</spring.redis.index>
<spring.redis.host>r-m5eoz5b6tkx09y6bpz.redis.rds.aliyuncs.com</spring.redis.host>
<spring.redis.port>6379</spring.redis.port>
<spring.redis.password>EpmEtrEdIs!q@w</spring.redis.password>
<!-- nacos -->
<nacos.register-enabled>true</nacos.register-enabled>
<nacos.server-addr>192.168.10.150:8848</nacos.server-addr>
<nacos.discovery.namespace>67e3c350-533e-4d7c-9f8f-faf1b4aa82ae</nacos.discovery.namespace>
<nacos.config.namespace></nacos.config.namespace>
<nacos.config.group></nacos.config.group>
<nacos.config-enabled>false</nacos.config-enabled>
<nacos.ip/>
<!--是否开启服务列表变更监听-->
<nacos.service-list-changed-listening.enable>true</nacos.service-list-changed-listening.enable>
<!--flyway migration 数据库迁移工具-->
<spring.flyway.enabled>true</spring.flyway.enabled>
<!--钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=e894e5690f9d6a527722974c71548ff6c0fe29bd956589a09e21b16442a35ed4</dingTalk.robot.webHook>
<dingTalk.robot.secret>SECfcc020bdc83bb17a2c00f39977b1fbc409ef4188c7beaea11c5caa90eeaf87fd</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.10.161:9876</rocketmq.nameserver>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<server.port>8117</server.port>
<spring.profiles.active>prod</spring.profiles.active>
<!-- 数据库配置-->
<spring.datasource.druid.url>
<![CDATA[jdbc:mysql://rm-m5e3vzs2637224wj9.mysql.rds.aliyuncs.com:3306/epmet_open_data?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai]]>
</spring.datasource.druid.url>
<spring.datasource.druid.username>epmet_open_data_user</spring.datasource.druid.username>
<spring.datasource.druid.password>EpmEt-db-UsEr</spring.datasource.druid.password>
<!-- redis配置 -->
<spring.redis.index>0</spring.redis.index>
<spring.redis.host>r-m5ez3n1j0qc3ykq2ut.redis.rds.aliyuncs.com</spring.redis.host>
<spring.redis.port>6379</spring.redis.port>
<spring.redis.password>EpmEtclOUdrEdIs!Q2w</spring.redis.password>
<!-- nacos -->
<nacos.register-enabled>true</nacos.register-enabled>
<nacos.server-addr>192.168.11.180:8848</nacos.server-addr>
<nacos.discovery.namespace>bd205d23-e696-47be-b995-916313f86e99</nacos.discovery.namespace>
<nacos.config.namespace></nacos.config.namespace>
<nacos.config.group></nacos.config.group>
<nacos.config-enabled>false</nacos.config-enabled>
<nacos.ip/>
<!--是否开启服务列表变更监听-->
<nacos.service-list-changed-listening.enable>true</nacos.service-list-changed-listening.enable>
<!--flyway migration 数据库迁移工具-->
<spring.flyway.enabled>true</spring.flyway.enabled>
<!--生产钉钉 机器人地址-->
<dingTalk.robot.webHook>https://oapi.dingtalk.com/robot/send?access_token=a5f66c3374b1642fe2142dbf56d5997e280172d4e8f2b546c9423a68c82ece6c</dingTalk.robot.webHook>
<dingTalk.robot.secret>SEC95f4f40b533ad379ea6a6d1af6dd37029383cfe1b7cd96dfac2678be2c1c3ed1</dingTalk.robot.secret>
<!--rocketmq-->
<rocketmq.enable>true</rocketmq.enable>
<rocketmq.nameserver>192.168.11.187:9876;192.168.11.184:9876</rocketmq.nameserver>
</properties>
</profile>
</profiles>
</project>

26
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/OpenDataApplication.java

@ -0,0 +1,26 @@
package com.epmet.opendata;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
/**
* @Description OpenData服务启动类
* 注意此处要使用@ComponentScan的原因是启动类处于包com.epmet.opendata下想用com.epmet.commons包下的一些Spring对象需要
* 手动指定扫描包否则无法扫描
* @author wxz
* @date 2021.10.13 15:16:05
*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@ServletComponentScan
@ComponentScan(value = { "com.epmet.opendata", "com.epmet.commons" })
public class OpenDataApplication {
public static void main(String[] args) {
SpringApplication.run(OpenDataApplication.class, args);
}
}

161
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/config/NacosServiceListListenerRegisterer.java

@ -0,0 +1,161 @@
package com.epmet.opendata.config;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.netflix.loadbalancer.DynamicServerListLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author wxz
* @Description Nacos服务列表刷新监听注册器
* @date 2021.09.22 14:33:11
*/
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "spring.cloud.nacos.discovery.serviceListChangedListening", name = "enable", havingValue = "true", matchIfMissing = false)
public class NacosServiceListListenerRegisterer {
public static final String REFRESH_SERVER_LIST_METHOD_NAME = "restOfInit";
// 服务列表拉取间隔:10s
public static final long SERVICE_LIST_PULLING_DELAY_SECONDS = 10;
private NamingService namingService;
private ScheduledExecutorService executor;
@Autowired
private NacosDiscoveryProperties discoveryProperties;
@Autowired
private SpringClientFactory springClientFactory;
// 监听中的服务列表
private List<String> observingServers = new ArrayList<>(33);
@PostConstruct
public void init() {
namingService = discoveryProperties.namingServiceInstance();
// 启动监听
executor = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("NacosServiceListWatchingRegisterer");
return thread;
}
});
// 立即启动,并15s刷新一次服务列表,用于新服务列表的发现
ScheduledFuture<?> future = executor.scheduleAtFixedRate(new EpmetNacosServiceListListener(), 0, SERVICE_LIST_PULLING_DELAY_SECONDS, TimeUnit.SECONDS);
}
public class EpmetNacosServiceListListener implements Runnable {
@Override
public void run() {
doRefreshServerList();
}
/**
* @param
* @return
* @description 执行刷新
* @author wxz
* @date 2021.09.22 16:04:49
*/
private synchronized void doRefreshServerList() {
ListView<String> serviceListView = null;
try {
serviceListView = namingService.getServicesOfServer(1, 100);
//启动监听
if (serviceListView == null || serviceListView.getCount() == 0) {
log.info("【Nacos服务列表定时刷新】当前无任何可添加监听的服务");
return;
}
List<String> serviceList = serviceListView.getData();
log.info("【Nacos服务列表定时刷新】Nacos服务端服务列表: {}", serviceList);
for (String service : serviceList) {
try {
// 如果该服务已经在监听列表中存在了,则不再注册监听。注:不能取消空服务的监听,因为空服务随时可能恢复运行,需要实时监听。
if (observingServers.contains(service)) {
continue;
}
namingService.subscribe(service, new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
log.info("【Nacos服务列表刷新监听】收到事件:{}:[{}]", namingEvent.getServiceName(), namingEvent.getInstances());
doRefreshServerList(service);
}
}
});
// 将该服务加入到监听列表中
observingServers.add(service);
} catch (NacosException e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
log.error("【Nacos服务列表定时刷新】订阅ApplicationContext的刷新事件失败,错误信息:{}", errorStackTrace);
}
}
} catch (NacosException e) {
String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
log.error("【Nacos服务列表定时刷新】链接Nacos服务端失败,错误信息:{}", errorStackTrace);
}
}
/**
* @param serviceName
* @return
* @description 刷新ServerList
* @author wxz
* @date 2021.09.22 09:29:16
*/
private void doRefreshServerList(String serviceName) {
// 刷新方式1:反射调用DynamicServerListLoadBalancer中的restOfInit()方法。该方法原来只执行一次,此处不推荐用
//ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName);
//if (loadBalancer instanceof ZoneAwareLoadBalancer) {
// ZoneAwareLoadBalancer zaLoadBalancer = (ZoneAwareLoadBalancer) loadBalancer;
// IClientConfig clientConfig = springClientFactory.getClientConfig(serviceName);
// try {
// Method restOfInitMethod = zaLoadBalancer.getClass().getSuperclass().getDeclaredMethod(REFRESH_SERVER_LIST_METHOD_NAME, IClientConfig.class);
// restOfInitMethod.setAccessible(true);
// restOfInitMethod.invoke(zaLoadBalancer, clientConfig);
// } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
// String errorStackTrace = ExceptionUtils.getErrorStackTrace(e);
// log.error("【LoadBalancer刷新服务列表】失败:{}", errorStackTrace);
// }
//}
// 刷新方式2:DynamicServerListLoadBalancer#updateListOfServers()该方法为ribbon定时刷新服务列表的时候真正调用的方法,但是加了@VisibleForTesting
// 暂且 1 try
ILoadBalancer loadBalancer = springClientFactory.getLoadBalancer(serviceName);
if (loadBalancer instanceof DynamicServerListLoadBalancer) {
DynamicServerListLoadBalancer dslb = (DynamicServerListLoadBalancer) loadBalancer;
dslb.updateListOfServers();
}
}
}
}

39
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/RocketMQConsumerRegister.java

@ -0,0 +1,39 @@
package com.epmet.opendata.mq;
import com.epmet.commons.rocketmq.constants.ConsomerGroupConstants;
import com.epmet.commons.rocketmq.constants.TopicConstants;
import com.epmet.commons.rocketmq.register.MQAbstractRegister;
import com.epmet.commons.rocketmq.register.MQConsumerProperties;
import com.epmet.opendata.mq.listener.OpenDataOrgChangeEventListener;
import com.epmet.opendata.mq.listener.OpenDataStaffChangeEventListener;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
/**
* @Description 如果rocketmq.enable=true这里必须实现 实例化
* @author wxz
* @date 2021.07.14 17:13:41
*/
@Component
public class RocketMQConsumerRegister extends MQAbstractRegister {
@Override
public void registerAllListeners(String env, MQConsumerProperties consumerProperties) {
// 客户初始化监听器注册
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_ORG_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.ORG,
"*",
new OpenDataOrgChangeEventListener());
register(consumerProperties,
ConsomerGroupConstants.OPEN_DATA_STAFF_CHANGE_EVENT_LISTENER_GROUP,
MessageModel.CLUSTERING,
TopicConstants.STAFF,
"*",
new OpenDataStaffChangeEventListener());
// ...其他监听器类似
}
}

61
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataOrgChangeEventListener.java

@ -0,0 +1,61 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @Description 系统对接中间库组织信息变更监听器
* @author wxz
* @date 2021.10.13 15:21:48
*/
public class OpenDataOrgChangeEventListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String tags = messageExt.getTags();
logger.info("【开放数据事件监听器】-组织信息变更-收到消息内容:{},操作:{}", msg, tags);
DistributedLock distributedLock = null;
RLock lock = null;
try {
//distributedLock = SpringContextUtils.getBean(DistributedLock.class);
//lock = distributedLock.getLock(String.format("lock:open_data_org:%s", orgId),
// 30L, 30L, TimeUnit.SECONDS);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-组织信息变更-同步信息到中间库失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
//distributedLock.unLock(lock);
}
}
}

61
epmet-module/open-data-worker/open-data-worker-server/src/main/java/com/epmet/opendata/mq/listener/OpenDataStaffChangeEventListener.java

@ -0,0 +1,61 @@
package com.epmet.opendata.mq.listener;
import com.epmet.commons.tools.distributedlock.DistributedLock;
import com.epmet.commons.tools.exception.ExceptionUtils;
import com.epmet.commons.tools.exception.RenException;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @Description 系统对接中间库工作人员信息变更监听器
* @author wxz
* @date 2021.10.13 15:21:48
*/
public class OpenDataStaffChangeEventListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
msgs.forEach(msg -> consumeMessage(msg));
} catch (Exception e) {
logger.error(ExceptionUtils.getErrorStackTrace(e));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void consumeMessage(MessageExt messageExt) {
// msg即为消息体
// tags为SystemMessageType.java中的项,为具体的操作,此处拿到tags,判断是创建还是变更,来做响应的后续操作即可
String msg = new String(messageExt.getBody());
String tags = messageExt.getTags();
logger.info("【开放数据事件监听器】-工作人员信息变更-收到消息内容:{}, 操作:{}", msg, tags);
DistributedLock distributedLock = null;
RLock lock = null;
try {
//distributedLock = SpringContextUtils.getBean(DistributedLock.class);
//lock = distributedLock.getLock(String.format("lock:open_data_staff:%s", staffId),
// 30L, 30L, TimeUnit.SECONDS);
} catch (RenException e) {
// 如果是我们手动抛出的异常,说明在业务可控范围内。目前不需要MQ重试
logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
} catch (Exception e) {
// 不是我们自己抛出的异常,可以让MQ重试
logger.error("【开放数据事件监听器】-工作人员信息变更-初始化客户组织失败:".concat(ExceptionUtils.getErrorStackTrace(e)));
throw e;
} finally {
//distributedLock.unLock(lock);
}
}
}

138
epmet-module/open-data-worker/open-data-worker-server/src/main/resources/bootstrap.yml

@ -0,0 +1,138 @@
server:
port: @server.port@
version: @version@
servlet:
context-path: /opendata
spring:
main:
allow-bean-definition-overriding: true
application:
name: open-data-server
#环境 dev|test|prod
profiles:
active: @spring.profiles.active@
jackson:
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
redis:
database: @spring.redis.index@
host: @spring.redis.host@
port: @spring.redis.port@
password: @spring.redis.password@
timeout: 30s
datasource:
druid:
#MySQL
driver-class-name: com.mysql.cj.jdbc.Driver
url: @spring.datasource.druid.url@
username: @spring.datasource.druid.username@
password: @spring.datasource.druid.password@
cloud:
nacos:
discovery:
server-addr: @nacos.server-addr@
#nacos的命名空间ID,默认是public
namespace: @nacos.discovery.namespace@
#不把自己注册到注册中心的地址
register-enabled: @nacos.register-enabled@
ip: @nacos.ip@
serviceListChangedListening:
enable: @nacos.service-list-changed-listening.enable@
config:
enabled: @nacos.config-enabled@
server-addr: @nacos.server-addr@
namespace: @nacos.config.namespace@
group: @nacos.config.group@
file-extension: yaml
#指定共享配置,且支持动态刷新
# ext-config:
# - data-id: datasource.yaml
# group: ${spring.cloud.nacos.config.group}
# refresh: true
# - data-id: common.yaml
# group: ${spring.cloud.nacos.config.group}
# refresh: true
# 数据迁移工具flyway
flyway:
enabled: @spring.flyway.enabled@
locations: classpath:db/migration
url: @spring.datasource.druid.url@
user: @spring.datasource.druid.username@
password: @spring.datasource.druid.password@
baseline-on-migrate: true
baseline-version: 0
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: ALWAYS
mybatis-plus:
mapper-locations: classpath:/mapper/**/*.xml
#实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.epmet.entity
global-config:
#数据库相关配置
db-config:
#主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (数字类型唯一ID)", UUID:"全局唯一ID UUID";
id-type: ID_WORKER
#字段策略 IGNORED:"忽略判断",NOT_NULL:"非 NULL 判断"),NOT_EMPTY:"非空判断"
field-strategy: NOT_NULL
#驼峰下划线转换
column-underline: true
banner: false
#原生配置
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
call-setters-on-nulls: true
jdbc-type-for-null: 'null'
feign:
hystrix:
enabled: true
client:
config:
default:
loggerLevel: BASIC
okhttp:
enabled: true
hystrix:
command:
default:
execution:
isolation:
thread:
timeoutInMilliseconds: 60000 #缺省为1000
ribbon:
ReadTimeout: 300000
ConnectTimeout: 300000
#pageHelper分页插件
pagehelper:
helper-dialect: mysql
reasonable: false #分页合理化配置,例如输入页码为-1,则自动转化为最小页码1
#feign 日志需要该配置
logging:
level:
com.epmet: debug
dingTalk:
robot:
webHook: @dingTalk.robot.webHook@
secret: @dingTalk.robot.secret@
rocketmq:
# 是否开启mq
enable: @rocketmq.enable@
name-server: @rocketmq.nameserver@

14
epmet-module/open-data-worker/open-data-worker-server/src/main/resources/init_db.sql

@ -0,0 +1,14 @@
create database epmet_open_data default character set utf8mb4 collate utf8mb4_general_ci;
-- 开发环境的脚本
CREATE USER epmet_open_data_user@'%' IDENTIFIED BY 'EpmEt-db-UsEr';
GRANT ALL ON `epmet_open_data`.* TO 'epmet_open_data_user'@'%';
flush privileges;
-- 测试环境的脚本
-- 生产环境的数据

176
epmet-module/open-data-worker/open-data-worker-server/src/main/resources/logback-spring.xml

@ -0,0 +1,176 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml"/>
<property name="log.path" value="logs/open-data"/>
<springProperty scope="context" name="appname" source="spring.application.name"/>
<springProperty scope="context" name="webHook" source="dingTalk.robot.webHook"/>
<springProperty scope="context" name="secret" source="dingTalk.robot.secret"/>
<!-- 日志上下文名称 -->
<contextName>${appname}</contextName>
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN"
value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!--1. 输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<!-- 设置字符集 -->
<charset>UTF-8</charset>
</encoder>
</appender>
<!--2. 输出到文档-->
<!-- 2.1 level为 DEBUG 日志,时间滚动输出 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文档的路径及文档名 -->
<file>${log.path}/debug.log</file>
<!--日志文档输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${log.path}/debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文档保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文档只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 2.2 level为 INFO 日志,时间滚动输出 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文档的路径及文档名 -->
<file>${log.path}/info.log</file>
<!--日志文档输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${log.path}/info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文档保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文档只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 2.3 level为 WARN 日志,时间滚动输出 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文档的路径及文档名 -->
<file>${log.path}/warn.log</file>
<!--日志文档输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文档保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文档只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 2.4 level为 ERROR 日志,时间滚动输出 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文档的路径及文档名 -->
<file>${log.path}/error.log</file>
<!--日志文档输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%contextName] [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文档保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文档只记录ERROR级别的 发送钉钉短信 -->
<filter class="com.epmet.commons.tools.filter.LogMsgSendFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
<webHook>${webHook}</webHook>
<secret>${secret}</secret>
<appName>${appname}</appName>
</filter>
<!-- 此日志文档只记录ERROR级别的 -->
<!--<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>-->
</appender>
<!-- 开发、测试环境 -->
<springProfile name="dev,test,local">
<logger name="org.springframework.web" level="INFO"/>
<logger name="org.springboot.sample" level="INFO"/>
<logger name="com.epmet.dao" level="INFO"/>
<logger name="com.epmet.dao" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="DEBUG_FILE"/>
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="WARN_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</springProfile>
<!-- 生产环境 -->
<springProfile name="prod">
<logger name="org.springframework.web" level="INFO"/>
<logger name="org.springboot.sample" level="INFO"/>
<logger name="com.epmet.dao" level="INFO"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="DEBUG_FILE"/>
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="WARN_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</springProfile>
</configuration>

18
epmet-module/open-data-worker/pom.xml

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>epmet-module</artifactId>
<groupId>com.epmet</groupId>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>open-data-worker</artifactId>
<packaging>pom</packaging>
<modules>
<module>open-data-worker-client</module>
<module>open-data-worker-server</module>
</modules>
</project>

3
epmet-module/pom.xml

@ -44,6 +44,7 @@
<module>epmet-point</module>
<module>epmet-ext</module>
<module>data-aggregator</module>
</modules>
<module>open-data-worker</module>
</modules>
</project>

Loading…
Cancel
Save