提交 5b84c715 authored 作者: xc's avatar xc

xc

上级 89e639e5
package com.zjty.switches.config;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
@ComponentScan
@Configuration
public class DataSourceConfig {
@Primary
@Bean(name = "remote1DataSource")
@Qualifier("remote1DataSource")
@ConfigurationProperties(prefix = "spring.datasource.remote1")
public DataSource primaryDataSource() {
return DruidDataSourceBuilder.create()
.build();
}
@Bean(name = "remote2DataSource")
@Qualifier("remote2DataSource")
@ConfigurationProperties(prefix = "spring.datasource.remote2")
public DataSource secondDataSource() {
return DruidDataSourceBuilder.create()
.build();
}
}
package com.zjty.switches.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@Component
public class JdbcConnet {
@Value("${driver}")
private String driver;
@Value("${url}")
private String url;
@Value("${dbusername}")
private String username;
@Value("${dbpassword}")
private String password;
public Connection connetToTest3(){
try {
Class.forName(driver);
return DriverManager.getConnection(url, username, password);
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public void closeConnet(Connection connection){
try{
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package com.zjty.switches.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
@SuppressWarnings({"SpringAutowiredFieldsWarningInspection", "SpringComponentScan"})
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryLocation",
transactionManagerRef = "transactionManagerLocation",
basePackages = {"com.ex"}
)
@EnableConfigurationProperties(JpaProperties.class)
public class Remote1DataConfig {
@Autowired
@Qualifier("remote1DataSource")
private DataSource locationDataSource;
@Autowired
private JpaProperties jpaProperties;
@Autowired
private HibernateProperties properties;
@Primary
@Bean(name = "entityManagerLocation")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactoryLocation(builder).getObject().createEntityManager();
}
@Primary
@Bean(name = "entityManagerFactoryLocation")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryLocation(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(locationDataSource)
.properties(properties.determineHibernateProperties(jpaProperties.getProperties(), new
HibernateSettings()))
.packages("com.zjm.synchronous.exchange.entity")
.persistenceUnit("locationPersistenceUnit")
.build();
}
private Map<String, String> getVendorProperties() {
return jpaProperties.getProperties();
}
@Primary
@Bean(name = "transactionManagerLocation")
public PlatformTransactionManager transactionManagerLocation(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactoryLocation(builder).getObject());
}
}
package com.zjty.switches.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
@SuppressWarnings("ALL")
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryRemote2",
transactionManagerRef = "transactionManagerRemote2",
basePackages = {"com.ex"}
)
@EnableConfigurationProperties(JpaProperties.class)
public class Remote2DataConfig {
@Autowired
@Qualifier("remote2DataSource")
private DataSource remoteDataSource;
@Autowired
private HibernateProperties properties;
@Bean(name = "entityManagerRemote2")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return entityManagerFactoryRemote2(builder).getObject().createEntityManager();
}
@Bean(name = "entityManagerFactoryRemote2")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryRemote2(EntityManagerFactoryBuilder builder) {
return builder
.dataSource(remoteDataSource)
.properties(properties.determineHibernateProperties(jpaProperties.getProperties(), new
HibernateSettings()))
.packages(
"com.zjm.synchronous.bas.base.ls.entity")
.persistenceUnit("remotePersistenceUnit2")
.build();
}
@Autowired
private JpaProperties jpaProperties;
private Map<String, String> getVendorProperties() {
return jpaProperties.getProperties();
}
@Bean(name = "transactionManagerRemote2")
public PlatformTransactionManager transactionManagerRemote(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(entityManagerFactoryRemote2(builder).getObject());
}
}
...@@ -95,10 +95,7 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -95,10 +95,7 @@ public class CanalClientExample implements CommandLineRunner {
* @param entrys * @param entrys
*/ */
private void printEntry(List<CanalEntry.Entry> entrys) { private void printEntry(List<CanalEntry.Entry> entrys) {
int errorCount = 0;
String sql = null; String sql = null;
try {
for (CanalEntry.Entry entry : entrys) { for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue; continue;
...@@ -119,21 +116,21 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -119,21 +116,21 @@ public class CanalClientExample implements CommandLineRunner {
int dataCount = 0; int dataCount = 0;
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (!tableName.contains("change_ds_")){
tableName = "change_ds_"+tableName;
}else {
tableName = tableName.split("_")[2];
}
sql = getSql(eventType, tableName, rowData, dbName); sql = getSql(eventType, tableName, rowData, dbName);
System.out.println(sql); System.out.println(sql);
switchService.executeSQL(sql); switchService.getChangeData(sql, tableName, dbName);
dataCount++;
log.info("执行SQL成功:" + dataCount + "次," + sql);
} }
} }
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
errorCount++;
log.info("执行SQL失败:" + errorCount + "次," + sql);
} }
}
private static void printColumn(List<CanalEntry.Column> columns) { private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) { for (CanalEntry.Column column : columns) {
...@@ -195,7 +192,8 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -195,7 +192,8 @@ public class CanalClientExample implements CommandLineRunner {
} }
String format = "INSERT INTO %s (%s) VALUES (%s)"; String format = "INSERT INTO %s (%s) VALUES (%s)";
return String.format(format,dnName + "." + tableName,keys,values); return String.format(format,tableName,keys,values);
// return String.format(format,dnName + "." + tableName,keys,values);
} }
/** /**
...@@ -222,7 +220,7 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -222,7 +220,7 @@ public class CanalClientExample implements CommandLineRunner {
sets += column.getName() + "=" + getValue(column); sets += column.getName() + "=" + getValue(column);
} }
String format = "UPDATE %s SET %s WHERE %s"; String format = "UPDATE %s SET %s WHERE %s";
return String.format(format,dbName + "." + tableName,sets,where); return String.format(format,tableName,sets,where);
} }
/** /**
...@@ -242,7 +240,7 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -242,7 +240,7 @@ public class CanalClientExample implements CommandLineRunner {
} }
} }
String format = "DELETE FROM %s WHERE %s"; String format = "DELETE FROM %s WHERE %s";
return String.format(format,dbName + "." + tableName,where); return String.format(format,tableName,where);
} }
private String getValue(CanalEntry.Column column){ private String getValue(CanalEntry.Column column){
......
package com.zjty.switches.service; package com.zjty.switches.service;
public interface SwitchService { public interface SwitchService {
void executeSQL(String sql); /**
* 执行sql语句
* @param sql sql语句
* @param tableName 操作表名称
* @param dbName 操作数据库名称
*/
void getChangeData(String sql, String tableName, String dbName);
} }
package com.zjty.switches.service.impl; package com.zjty.switches.service.impl;
import com.zjty.switches.config.JdbcConnet;
import com.zjty.switches.service.SwitchService; import com.zjty.switches.service.SwitchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.Statement;
@Service @Service
@Slf4j
public class SwitchServiceImpl implements SwitchService { public class SwitchServiceImpl implements SwitchService {
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
@Override @Autowired
public void executeSQL(String sql) { private JdbcConnet jdbcConnet;
private Connection connection ;
private void executeSQL(String sql) {
jdbcTemplate.execute(sql); jdbcTemplate.execute(sql);
} }
@Override
public void getChangeData(String sql, String tableName, String dbName) {
String format = "%s同步到 %s %s: %s";
String addr = "";
try {
if (tableName.contains("change_ds_")){
//数据中心的数据
addr = "数据中心";
executeSQL(sql);
}else {
//前置数据库数据
addr = "前置数据库";
if (connection == null){
// JdbcConnet jdbcConnet = new JdbcConnet();
connection = jdbcConnet.connetToTest3();
}
Statement st = connection.createStatement();
st.execute(sql);
}
log.info(String.format(format,dbName, addr, "成功",sql));
}catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
log.info(String.format(format, dbName, addr, "失败",sql));
}
}
} }
logging.file=./log/canalDemo.log logging.file=./log/canalDemo.log
spring.main.allow-bean-definition-overriding=true spring.main.allow-bean-definition-overriding=true
##备份子数据库mysql ##连接中心数据库数据库mysql
spring.datasource.remote1.url=jdbc:mysql://localhost:3306/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 spring.datasource.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.remote1.driver-class-name=com.mysql.jdbc.Driver spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.remote1.username=root spring.datasource.username=root
spring.datasource.remote1.password=root spring.datasource.password=root
##前置数据库
pring.datasource.remote2.driver-class-name=com.mysql.cj.jdbc.Driver url=jdbc:mysql://localhost:3306/canaltest3?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.remote2.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 driver=com.mysql.jdbc.Driver
spring.datasource.remote2.username=root dbusername=root
spring.datasource.remote2.password=root dbpassword=root
###canal ###canal
canal.adderss=127.0.0.1 canal.adderss=127.0.0.1
...@@ -18,4 +18,5 @@ canal.port=11111 ...@@ -18,4 +18,5 @@ canal.port=11111
destination=example destination=example
username="" username=""
password="" password=""
subscribe.regex=canaltest\\..* ##subscribe.regex=canaltest\\..*
\ No newline at end of file subscribe.regex=canaltest\\..*,canaltest2\\..*
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论