提交 9563427f authored 作者: 黄承天's avatar 黄承天

fix(user):完成了增量同步的功能(未启动)

上级 25ed9221
package org.matrix.remote.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* @author c
* @since 2022-03-21
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("kt_change")
public class Change {
private String type;
private String table;
private String key;
}
package org.matrix.remote.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.matrix.remote.entity.Change;
import org.springframework.stereotype.Repository;
/**
* @author c
*/
@DS("zentao")
@Repository
public interface ChangeMapper extends BaseMapper<Change> {
}
package org.matrix.remote.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.matrix.local.entity.Project;
import org.matrix.local.entity.User;
import org.matrix.local.entity.UserProject;
import org.matrix.local.mapper.KsProjectMapper;
import org.matrix.local.mapper.KsUserMapper;
import org.matrix.local.mapper.KsUserProjectMapper;
......@@ -17,18 +14,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.util.Objects.nonNull;
/**
* 禅道数据收集工具
* 禅道数据全量同步工具
*/
@Service
public class DataCollector {
public class DataAllCollector {
@Autowired
private ZtUserMapper ztUserMapper;
......@@ -42,6 +34,8 @@ public class DataCollector {
private KsProjectMapper ksProjectMapper;
@Autowired
private KsUserProjectMapper ksUserProjectMapper;
@Autowired
private TransHelper transHelper;
private List<ZtUser> ztUsers = new ArrayList<>();
private List<ZtUserView> ztUserViews = new ArrayList<>();
......@@ -97,7 +91,7 @@ public class DataCollector {
private void saveUser() {
ksUserMapper.delete(Wrappers.lambdaQuery());
ztUsers.stream()
.map(this::user)
.map(transHelper::user)
.forEach(ksUserMapper::insert);
}
......@@ -107,7 +101,7 @@ public class DataCollector {
private void saveProject() {
ksProjectMapper.delete(Wrappers.lambdaQuery());
ztProjects.stream()
.map(this::project)
.map(transHelper::project)
.forEach(ksProjectMapper::insert);
}
......@@ -117,44 +111,8 @@ public class DataCollector {
private void saveUserProject() {
ksUserProjectMapper.delete(Wrappers.lambdaQuery());
ztUserViews.stream()
.flatMap(ztUserView -> userProjects(ztUserView).stream())
.flatMap(ztUserView -> transHelper.userProjects(ztUserView).stream())
.forEach(ksUserProjectMapper::insert);
}
//------------------------private------------------------------------//
private User user(ZtUser ztUser) {
return new User(Long.valueOf(ztUser.getId()), ztUser.getRealname(), ztUser.getAccount(), ztUser.getPassword());
}
private Project project(ZtProject ztProject) {
return new Project(Long.valueOf(ztProject.getId()), ztProject.getName());
}
private List<UserProject> userProjects(ZtUserView ztUserView) {
Long userId = ksUserMapper.selectOne(Wrappers.<User>lambdaQuery().eq(User::getAccount, ztUserView.getAccount())).getId();
return toList(ztUserView.getProjects()).stream()
.map(Integer::parseInt)
.map(id -> ztProjectMapper.selectById(id))
.filter(ztProject -> Objects.equals("0", ztProject.getDeleted()))
.map(this::findProjectId)
.map(projectId -> new UserProject(null, userId, projectId))
.collect(Collectors.toList());
}
private Long findProjectId(ZtProject ztProject) {
if (nonNull(ztProject.getProject()) && ztProject.getProject() > 0) {
return Long.valueOf(ztProject.getProject());
} else {
return Long.valueOf(ztProject.getId());
}
}
private List<String> toList(String content) {
List<String> result = Arrays.stream(content.split(",")).collect(Collectors.toList());
result.remove(0);
return result;
}
}
package org.matrix.remote.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import lombok.extern.slf4j.Slf4j;
import org.matrix.local.entity.Project;
import org.matrix.local.entity.User;
import org.matrix.local.entity.UserProject;
import org.matrix.local.mapper.KsProjectMapper;
import org.matrix.local.mapper.KsUserMapper;
import org.matrix.local.mapper.KsUserProjectMapper;
import org.matrix.remote.entity.Change;
import org.matrix.remote.entity.ZtProject;
import org.matrix.remote.entity.ZtUser;
import org.matrix.remote.entity.ZtUserView;
import org.matrix.remote.mapper.ChangeMapper;
import org.matrix.remote.mapper.ZtProjectMapper;
import org.matrix.remote.mapper.ZtUserMapper;
import org.matrix.remote.mapper.ZtUserViewMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Objects;
import static java.util.Objects.nonNull;
/**
* 禅道数据增量同步工具(基于触发器和专用表)
*/
@Slf4j
@Service
public class DataTriggerCollector {
@Autowired
private ChangeMapper changeMapper;
@Autowired
private ZtUserMapper ztUserMapper;
@Autowired
private ZtUserViewMapper ztUserViewMapper;
@Autowired
private ZtProjectMapper ztProjectMapper;
@Autowired
private KsUserMapper ksUserMapper;
@Autowired
private KsProjectMapper ksProjectMapper;
@Autowired
private KsUserProjectMapper ksUserProjectMapper;
@Autowired
private TransHelper transHelper;
/**
* 执行一次增量同步
*/
public void execute() {
boolean hasChanges = changeMapper.selectCount(Wrappers.lambdaQuery()) > 0;
if (hasChanges) {
List<Change> changes = changeMapper.selectList(Wrappers.lambdaQuery());
changes.forEach(this::handle);
}
}
//----------------------------------------private-----------------------------------------
private void handle(Change change) {
String table = change.getTable();
log.info("发现数据变更 数据类型:{} 操作类型:{} 主键:{}", change.getTable(), change.getType(), change.getKey());
switch (table) {
case "user":
handleUser(change);
return;
case "project":
handleProject(change);
return;
case "userview":
handleUserView(change);
return;
default:
}
changeMapper.delete(Wrappers.lambdaQuery(change));
}
private void handleUser(Change change) {
Integer id = Integer.valueOf(change.getKey());
ZtUser ztUser = ztUserMapper.selectById(id);
String type = change.getType();
switch (type) {
case "insert":
if (nonNull(ztUser)) {
User user = transHelper.user(ztUser);
ksUserMapper.insert(user);
}
return;
case "update":
if (nonNull(ztUser)) {
User user = transHelper.user(ztUser);
ksUserMapper.updateById(user);
}
return;
case "delete":
ksUserMapper.deleteById(id);
return;
default:
}
}
private void handleProject(Change change) {
Integer id = Integer.valueOf(change.getKey());
ZtProject ztProject = ztProjectMapper.selectById(id);
String type = change.getType();
switch (type) {
case "insert":
if (nonNull(ztProject)) {
Project project = transHelper.project(ztProject);
ksProjectMapper.insert(project);
}
return;
case "update":
if (nonNull(ztProject)) {
Project project = transHelper.project(ztProject);
ksProjectMapper.updateById(project);
}
return;
case "delete":
ksProjectMapper.deleteById(id);
return;
default:
}
}
private void handleUserView(Change change) {
String account = change.getKey();
ZtUserView ztUserView = ztUserViewMapper.selectOne(Wrappers.<ZtUserView>lambdaQuery().eq(ZtUserView::getAccount, account));
String type = change.getType();
switch (type) {
case "insert":
case "update":
if (nonNull(ztUserView)) {
List<UserProject> userProjects = transHelper.userProjects(ztUserView);
userProjects.stream().findAny()
.map(UserProject::getUserId)
.ifPresent(userId -> ksUserProjectMapper.delete(Wrappers.<UserProject>lambdaQuery().eq(UserProject::getUserId, userId)));
userProjects.forEach(ksUserProjectMapper::insert);
}
return;
case "delete":
if (nonNull(ztUserView)) {
Long userId = ksUserMapper.selectOne(Wrappers.<User>lambdaQuery().eq(User::getAccount, ztUserView.getAccount())).getId();
ksUserProjectMapper.delete(Wrappers.<UserProject>lambdaQuery().eq(UserProject::getUserId, userId));
}
return;
default:
}
}
}
package org.matrix.remote.service;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.matrix.local.entity.Project;
import org.matrix.local.entity.User;
import org.matrix.local.entity.UserProject;
import org.matrix.local.mapper.KsProjectMapper;
import org.matrix.local.mapper.KsUserMapper;
import org.matrix.local.mapper.KsUserProjectMapper;
import org.matrix.remote.entity.ZtProject;
import org.matrix.remote.entity.ZtUser;
import org.matrix.remote.entity.ZtUserView;
import org.matrix.remote.mapper.ZtProjectMapper;
import org.matrix.remote.mapper.ZtUserMapper;
import org.matrix.remote.mapper.ZtUserViewMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static java.util.Objects.nonNull;
@Service
public class TransHelper {
@Autowired
private ZtUserMapper ztUserMapper;
@Autowired
private ZtUserViewMapper ztUserViewMapper;
@Autowired
private ZtProjectMapper ztProjectMapper;
@Autowired
private KsUserMapper ksUserMapper;
@Autowired
private KsProjectMapper ksProjectMapper;
@Autowired
private KsUserProjectMapper ksUserProjectMapper;
public User user(ZtUser ztUser) {
return new User(Long.valueOf(ztUser.getId()), ztUser.getRealname(), ztUser.getAccount(), ztUser.getPassword());
}
public Project project(ZtProject ztProject) {
return new Project(Long.valueOf(ztProject.getId()), ztProject.getName());
}
public List<UserProject> userProjects(ZtUserView ztUserView) {
Long userId = ksUserMapper.selectOne(Wrappers.<User>lambdaQuery().eq(User::getAccount, ztUserView.getAccount())).getId();
return toList(ztUserView.getProjects()).stream()
.map(Integer::parseInt)
.map(id -> ztProjectMapper.selectById(id))
.filter(ztProject -> Objects.equals("0", ztProject.getDeleted()))
.map(this::findProjectId)
.map(projectId -> new UserProject(null, userId, projectId))
.collect(Collectors.toList());
}
public Long findProjectId(ZtProject ztProject) {
if (nonNull(ztProject.getProject()) && ztProject.getProject() > 0) {
return Long.valueOf(ztProject.getProject());
} else {
return Long.valueOf(ztProject.getId());
}
}
public List<String> toList(String content) {
List<String> result = Arrays.stream(content.split(",")).collect(Collectors.toList());
result.remove(0);
return result;
}
}
package org.matrix.remote.service;
import com.baomidou.dynamic.datasource.annotation.DS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
/**
* 用于增量同步的初始化操作
* 1.对需要监听数据的表进行添加触发器
* 2.与创建记录数据变化的表
* <p>
* 触发器的内容是对记录用的表里插入该条数据变化的大致内容
* 此后可通过读取该表来进行增量同步
*
* PS:选择性使用
*/
@SuppressWarnings("SqlNoDataSourceInspection")
@DS("zentao")
@Service
public class TriggerInitial {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 创建数据变化表
* 该表有3个字段
* type:该条变化数据的操作类型(INSERT、UPDATE、DELETE)
* table:该条变化数据属于哪个表
* key:该条变化数据的主键值(可以根据该主键值在原表查出具体数据再做同步更新、删除操作则只需指定主键值)
*/
public void createChangeTable() {
String dropChangeTableSQL = "DROP TABLE IF EXISTS `zentao`.`kt_change`;\n";
jdbcTemplate.execute(dropChangeTableSQL);
String createChangeTableSQL = "CREATE TABLE `zentao`.`kt_change` (\n" +
" `type` VARCHAR(11) NULL,\n" +
" `table` VARCHAR(255) NULL,\n" +
" `data_key` VARCHAR(255) NULL);\n";
jdbcTemplate.execute(createChangeTableSQL);
}
/**
* 创建User表的触发器
*/
public void createUserTrigger() {
String dropInsertTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_user_AFTER_INSERT`;\n";
String createInsertTrigger = "CREATE TRIGGER `zentao`.`zt_user_AFTER_INSERT` AFTER INSERT ON `zt_user` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `content`) values ('insert','user',new.id);\n" +
"END;";
jdbcTemplate.execute(dropInsertTrigger);
jdbcTemplate.execute(createInsertTrigger);
String dropUpdateTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_user_AFTER_UPDATE`;\n";
String createUpdateTrigger = "CREATE TRIGGER `zentao`.`zt_user_AFTER_UPDATE` AFTER UPDATE ON `zt_user` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('update','user',new.id);\n" +
"END;";
jdbcTemplate.execute(dropUpdateTrigger);
jdbcTemplate.execute(createUpdateTrigger);
String dropDeleteTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_user_AFTER_DELETE`;\n";
String createDeleteTrigger = "CREATE TRIGGER `zentao`.`zt_user_AFTER_DELETE` AFTER DELETE ON `zt_user` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('delete','user',old.id);\n" +
"END;";
jdbcTemplate.execute(dropDeleteTrigger);
jdbcTemplate.execute(createDeleteTrigger);
}
/**
* 创建Project表的触发器
*/
public void createProjectTrigger() {
String dropInsertTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_project_AFTER_INSERT`;\n";
String createInsertTrigger = "CREATE TRIGGER `zentao`.`zt_project_AFTER_INSERT` AFTER INSERT ON `zt_project` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `content`) values ('insert','project',new.id);\n" +
"END;";
jdbcTemplate.execute(dropInsertTrigger);
jdbcTemplate.execute(createInsertTrigger);
String dropUpdateTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_project_AFTER_UPDATE`;\n";
String createUpdateTrigger = "CREATE TRIGGER `zentao`.`zt_project_AFTER_UPDATE` AFTER UPDATE ON `zt_project` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('update','project',new.id);\n" +
"END;";
jdbcTemplate.execute(dropUpdateTrigger);
jdbcTemplate.execute(createUpdateTrigger);
String dropDeleteTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_project_AFTER_DELETE`;\n";
String createDeleteTrigger = "CREATE TRIGGER `zentao`.`zt_project_AFTER_DELETE` AFTER DELETE ON `zt_project` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('delete','project',old.id);\n" +
"END;";
jdbcTemplate.execute(dropDeleteTrigger);
jdbcTemplate.execute(createDeleteTrigger);
}
/**
* 创建UserView表的触发器
*/
public void createUserViewTrigger() {
String dropInsertTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_userview_AFTER_INSERT`;\n";
String createInsertTrigger = "CREATE TRIGGER `zentao`.`zt_userview_AFTER_INSERT` AFTER INSERT ON `zt_userview` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `content`) values ('insert','userview',new.account);\n" +
"END;";
jdbcTemplate.execute(dropInsertTrigger);
jdbcTemplate.execute(createInsertTrigger);
String dropUpdateTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_userview_AFTER_UPDATE`;\n";
String createUpdateTrigger = "CREATE TRIGGER `zentao`.`zt_userview_AFTER_UPDATE` AFTER UPDATE ON `zt_userview` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('update','userview',new.account);\n" +
"END;";
jdbcTemplate.execute(dropUpdateTrigger);
jdbcTemplate.execute(createUpdateTrigger);
String dropDeleteTrigger = "DROP TRIGGER IF EXISTS `zentao`.`zt_userview_AFTER_DELETE`;\n";
String createDeleteTrigger = "CREATE TRIGGER `zentao`.`zt_userview_AFTER_DELETE` AFTER DELETE ON `zt_userview` FOR EACH ROW\n" +
"BEGIN\n" +
"insert into `zentao`.`kt_change` (`type`,`table`, `key`) values ('delete','userview',old.account);\n" +
"END;";
jdbcTemplate.execute(dropDeleteTrigger);
jdbcTemplate.execute(createDeleteTrigger);
}
}
package org.matrix.remote.task;
import lombok.extern.slf4j.Slf4j;
import org.matrix.remote.service.DataCollector;
import org.matrix.remote.service.DataAllCollector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Scheduled;
......@@ -16,7 +16,7 @@ public class DataCollectorTask implements CommandLineRunner {
@Autowired
private DataCollector dataCollector;
private DataAllCollector dataAllCollector;
/**
* 同步禅道取的用户和项目数据
......@@ -25,13 +25,12 @@ public class DataCollectorTask implements CommandLineRunner {
@Scheduled(cron = "0 0 */1 * * ?")
public void collect() {
log.info("同步禅道用户项目数据...");
dataCollector.execute();
dataAllCollector.execute();
log.info("同步结束....");
}
@Override
public void run(String... args) {
collect();
}
}
......@@ -2,7 +2,8 @@ package org.matrix.ktzentao;
import org.junit.jupiter.api.Test;
import org.matrix.ZentaoApplication;
import org.matrix.remote.service.DataCollector;
import org.matrix.remote.service.DataAllCollector;
import org.matrix.remote.service.TriggerInitial;
import org.matrix.remote.task.DataCollectorTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
......@@ -11,12 +12,15 @@ import org.springframework.boot.test.context.SpringBootTest;
class KtZentaoApplicationTests {
@Autowired
DataCollector dataCollector;
DataAllCollector dataAllCollector;
@Autowired
DataCollectorTask dataCollectorTask;
@Autowired
TriggerInitial triggerInitial;
@Test
void test() {
dataCollectorTask.collect();
triggerInitial.createProjectTrigger();
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论