提交 46071f70 authored 作者: xc's avatar xc

xc

上级 481fc0a1
......@@ -10,6 +10,42 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>ds-switch</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.zjty.switches;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SwitchApplication {
public static void main(String[] args) {
SpringApplication.run(SwitchApplication.class, args);
}
}
package com.zjty.switches.config;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@ComponentScan
@Configuration
public class DataSourceConfig {
@Primary
@Bean(name = "remote1DataSource")
@Qualifier("remote1DataSource")
@ConfigurationProperties(prefix = "spring.datasource.remote1")
public DataSource primaryDataSource() {
return DruidDataSourceBuilder.create()
.build();
}
@Bean(name = "remote2DataSource")
@Qualifier("remote2DataSource")
@ConfigurationProperties(prefix = "spring.datasource.remote2")
public DataSource secondDataSource() {
return DruidDataSourceBuilder.create()
.build();
}
}
package com.zjty.switches.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
@SuppressWarnings({"SpringAutowiredFieldsWarningInspection", "SpringComponentScan"})
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryLocation",
transactionManagerRef = "transactionManagerLocation",
basePackages = {"com.ex"}
)
@EnableConfigurationProperties(JpaProperties.class)
public class Remote1DataConfig {
@Autowired
@Qualifier("remote1DataSource")
private DataSource locationDataSource;
@Autowired
private JpaProperties jpaProperties;
@Autowired
private HibernateProperties properties;
@Primary
@Bean(name = "entityManagerLocation")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactoryLocation(builder).getObject().createEntityManager();
}
@Primary
@Bean(name = "entityManagerFactoryLocation")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryLocation(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(locationDataSource)
.properties(properties.determineHibernateProperties(jpaProperties.getProperties(), new
HibernateSettings()))
.packages("com.zjm.synchronous.exchange.entity")
.persistenceUnit("locationPersistenceUnit")
.build();
}
private Map<String, String> getVendorProperties() {
return jpaProperties.getProperties();
}
@Primary
@Bean(name = "transactionManagerLocation")
public PlatformTransactionManager transactionManagerLocation(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactoryLocation(builder).getObject());
}
}
package com.zjty.switches.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
@SuppressWarnings("ALL")
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryRemote2",
transactionManagerRef = "transactionManagerRemote2",
basePackages = {"com.ex"}
)
@EnableConfigurationProperties(JpaProperties.class)
public class Remote2DataConfig {
@Autowired
@Qualifier("remote2DataSource")
private DataSource remoteDataSource;
@Autowired
private HibernateProperties properties;
@Bean(name = "entityManagerRemote2")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactoryRemote2(builder).getObject().createEntityManager();
}
@Bean(name = "entityManagerFactoryRemote2")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryRemote2(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(remoteDataSource)
.properties(properties.determineHibernateProperties(jpaProperties.getProperties(), new
HibernateSettings()))
.packages(
"com.zjm.synchronous.bas.base.ls.entity")
.persistenceUnit("remotePersistenceUnit2")
.build();
}
@Autowired
private JpaProperties jpaProperties;
private Map<String, String> getVendorProperties() {
return jpaProperties.getProperties();
}
@Bean(name = "transactionManagerRemote2")
public PlatformTransactionManager transactionManagerRemote(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactoryRemote2(builder).getObject());
}
}
package com.zjty.switches.service;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
@Slf4j
public class CanalClientExample implements CommandLineRunner {
@Autowired
SwitchService switchService;
@Value("${canal.adderss}")
private String adderss;
@Value("${canal.port}")
private int port;
@Value("${destination}")
private String destination;
@Value("${username}")
private String username;
@Value("${password}")
private String password;
@Value("${subscribe.regex}")
private String regex;
@Override
public void run(String... args) throws Exception {
log.info("canal开始————");
canal();
}
/**
*数据备份
*/
public void canal() {
// 创建链接
// CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
// 11111), "example", "", "");
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(adderss,
port), destination, username, password);
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
// connector.subscribe(".*\\..*"); //所有表
// connector.subscribe("canaltest\\..*"); //canaltest下所有表
connector.subscribe(regex); //canaltest下所有表
connector.rollback();
int totalEmptyCount = 120;
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} catch (Exception e) {
e.printStackTrace();
}
finally {
connector.disconnect();
}
}
/**
* 提取canal数据,并添加到子数据库
* @param entrys
*/
private void printEntry(List<CanalEntry.Entry> entrys) {
int errorCount = 0;
String sql = null;
try {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
String tableName = entry.getHeader().getTableName();
String dbName = entry.getHeader().getSchemaName();
System.out.println("-------------------------"+dbName);
int dataCount = 0;
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
sql = getSql(eventType, tableName, rowData, dbName);
System.out.println(sql);
switchService.executeSQL(sql);
dataCount++;
log.info("执行SQL成功:" + dataCount + "次," + sql);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
errorCount++;
log.info("执行SQL失败:" + errorCount + "次," + sql);
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
/**
* 解析canal数据,合成sql语句
* @param eventType 操作类型
* @param tableName 修改表名
* @param rowData 修改数据
* @return sql语句
*/
private String getSql(CanalEntry.EventType eventType, String tableName, CanalEntry.RowData rowData, String dbname){
String sql = null;
switch (eventType) {
case INSERT:
sql = getInsertSql(tableName,rowData.getAfterColumnsList(), dbname);
break;
case UPDATE:
sql = getUpdateSql(tableName,rowData.getAfterColumnsList(), dbname);
break;
case DELETE:
sql = getDeleteSql(tableName,rowData.getBeforeColumnsList(), dbname);
break;
default:
break;
}
return sql;
}
/**
* 插入
* @param tableName
* @param columns
* @return
*/
private String getInsertSql(String tableName,List<CanalEntry.Column> columns, String dnName){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String keys = "";
String values = "";
// System.out.println(columns);
for(int i=0;i<columns.size();i++){
if(i != 0) {
keys += ",";
values += ",";
}
keys += columns.get(i).getName();
// if (columns.get(i).getMysqlType().equals("int")){
// values += Integer.parseInt(columns.get(i).getValue());
// }else {
// values += getValue(columns.get(i));
// }
values += getValue(columns.get(i));
}
String format = "INSERT INTO %s (%s) VALUES (%s)";
return String.format(format,dnName + "." + tableName,keys,values);
}
/**
*
* 更新
* @param tableName
* @param columns
* @return
*/
private String getUpdateSql(String tableName,List<CanalEntry.Column> columns, String dbName){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String sets = "";
String where = "";
for(CanalEntry.Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
if(!StringUtils.isBlank(sets)) {
sets += ",";
}
sets += column.getName() + "=" + getValue(column);
}
String format = "UPDATE %s SET %s WHERE %s";
return String.format(format,dbName + "." + tableName,sets,where);
}
/**
* 删除
* @param tableName
* @param columns
* @return
*/
private String getDeleteSql(String tableName,List<CanalEntry.Column> columns, String dbName){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String where = "";
for(CanalEntry.Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
}
}
String format = "DELETE FROM %s WHERE %s";
return String.format(format,dbName + "." + tableName,where);
}
private String getValue(CanalEntry.Column column){
if(column.getIsNull()){
return "null";
}
return String.format("'%s'",column.getValue());
}
}
package com.zjty.switches.service;
public interface SwitchService {
void executeSQL(String sql);
}
package com.zjty.switches.service.impl;
import com.zjty.switches.service.SwitchService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
@Service
public class SwitchServiceImpl implements SwitchService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void executeSQL(String sql) {
jdbcTemplate.execute(sql);
}
}
logging.file=./log/canalDemo.log
spring.main.allow-bean-definition-overriding=true
##备份子数据库mysql
spring.datasource.remote1.url=jdbc:mysql://localhost:3306/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.remote1.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.remote1.username=root
spring.datasource.remote1.password=root
pring.datasource.remote2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.remote2.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.remote2.username=root
spring.datasource.remote2.password=root
###canal
canal.adderss=127.0.0.1
canal.port=11111
destination=example
username=""
password=""
subscribe.regex=canaltest\\..*
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论