提交 a13ddd26 authored 作者: zhangshuang's avatar zhangshuang

Merge branch 'master' of git.yfzx.zjtys.com.cn:912-system/data/DataSwitchSys

...@@ -8,6 +8,7 @@ class FtpApplicationTests { ...@@ -8,6 +8,7 @@ class FtpApplicationTests {
@Test @Test
void contextLoads() { void contextLoads() {
} }
} }
package com.zjty.switches.controller; package com.zjty.switches.controller;
import com.alibaba.fastjson.JSONObject;
import com.zjty.switches.service.SwitchService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import java.util.Date;
/** /**
* @author LJJ cnljj1995@gmail.com * @author LJJ cnljj1995@gmail.com
...@@ -12,9 +15,33 @@ import org.springframework.web.bind.annotation.RestController; ...@@ -12,9 +15,33 @@ import org.springframework.web.bind.annotation.RestController;
@RestController @RestController
@RequestMapping("/test") @RequestMapping("/test")
public class TestController { public class TestController {
@Autowired
private SwitchService switchService;
@GetMapping("/asd") @GetMapping("/asd")
public ResponseEntity test() { public ResponseEntity test() {
return ResponseEntity.ok("success"); return ResponseEntity.ok("success");
} }
@PostMapping("/insert")
public ResponseEntity insert(@RequestBody JSONObject jsonObject){
String a=jsonObject.get("id").toString();
int id=Integer.parseInt(a);
Date start = new Date();
switchService.testData(id);
Date end = new Date();
// double se = (end.getTime() - start.getTime())/1000;
// System.out.printf("need time++++++++++++++++++++++++++++++++:" + se);
return ResponseEntity.ok(end.getTime() - start.getTime());
}
@PostMapping("/delete")
public ResponseEntity deleted(@RequestBody JSONObject jsonObject){
String a=jsonObject.get("id").toString();
int id=Integer.parseInt(a);
Date start = new Date();
switchService.delectData(id);
Date end = new Date();
double se = end.getTime() - start.getTime();
return ResponseEntity.ok(end.getTime() - start.getTime());
}
} }
...@@ -5,6 +5,7 @@ import com.alibaba.otter.canal.client.CanalConnectors; ...@@ -5,6 +5,7 @@ import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.Message;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -14,6 +15,8 @@ import org.springframework.jdbc.core.JdbcTemplate; ...@@ -14,6 +15,8 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List; import java.util.List;
@Component @Component
@Slf4j @Slf4j
...@@ -56,18 +59,24 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -56,18 +59,24 @@ public class CanalClientExample implements CommandLineRunner {
try { try {
connector.connect(); connector.connect();
// connector.subscribe(".*\\..*"); //所有表 // connector.subscribe(".*\\..*"); //所有表
// connector.subscribe("canaltest\\..*"); //canaltest下所有表 // connector.subscribe("canaltest.student,canaltest2\\..*"); //canaltest下所有表
connector.subscribe(regex); //canaltest下所有表 connector.subscribe(regex); //canaltest下所有表
connector.rollback(); connector.rollback();
int totalEmptyCount = 120; int totalEmptyCount = 120;
long se = 0;
while (true) { while (true) {
Date start = new Date();
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId(); long batchId = message.getId();
int size = message.getEntries().size(); int size = message.getEntries().size();
if (batchId == -1 || size == 0) { if (batchId == -1 || size == 0) {
emptyCount++; emptyCount++;
System.out.println("empty count : " + emptyCount); // System.out.println("empty count : " + emptyCount);
try { try {
if (emptyCount > 10){
se = 0;
}
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
...@@ -76,10 +85,16 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -76,10 +85,16 @@ public class CanalClientExample implements CommandLineRunner {
emptyCount = 0; emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries()); printEntry(message.getEntries());
Date end = new Date();
se = se + (end.getTime() - start.getTime());
System.out.println("need time++++++++++++++++++++++++++++++++:" + se);
} }
connector.ack(batchId); // 提交确认 connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据 // connector.rollback(batchId); // 处理失败, 回滚数据
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -112,17 +127,17 @@ public class CanalClientExample implements CommandLineRunner { ...@@ -112,17 +127,17 @@ public class CanalClientExample implements CommandLineRunner {
CanalEntry.EventType eventType = rowChage.getEventType(); CanalEntry.EventType eventType = rowChage.getEventType();
String tableName = entry.getHeader().getTableName(); String tableName = entry.getHeader().getTableName();
String dbName = entry.getHeader().getSchemaName(); String dbName = entry.getHeader().getSchemaName();
System.out.println("-------------------------"+dbName); // System.out.println("-------------------------"+dbName);
int dataCount = 0; int dataCount = 0;
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (!tableName.contains("change_ds_")){ if (!tableName.contains("change_ds_")){
tableName = "change_ds_"+tableName; tableName = "change_ds_"+tableName;
}else { }else {
tableName = tableName.split("_")[2]; tableName = "change_"+tableName.split("_")[2];
} }
sql = getSql(eventType, tableName, rowData, dbName); sql = getSql(eventType, tableName, rowData, dbName);
System.out.println(sql); // System.out.println(sql);
switchService.getChangeData(sql, tableName, dbName); switchService.getChangeData(sql, tableName, dbName);
} }
......
...@@ -8,4 +8,6 @@ public interface SwitchService { ...@@ -8,4 +8,6 @@ public interface SwitchService {
* @param dbName 操作数据库名称 * @param dbName 操作数据库名称
*/ */
void getChangeData(String sql, String tableName, String dbName); void getChangeData(String sql, String tableName, String dbName);
void testData(int num);
void delectData(int num);
} }
...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Service; ...@@ -9,6 +9,7 @@ import org.springframework.stereotype.Service;
import java.sql.Connection; import java.sql.Connection;
import java.sql.Statement; import java.sql.Statement;
import java.util.Date;
@Service @Service
@Slf4j @Slf4j
...@@ -54,5 +55,34 @@ public class SwitchServiceImpl implements SwitchService { ...@@ -54,5 +55,34 @@ public class SwitchServiceImpl implements SwitchService {
} }
} }
@Override
public void testData(int num){
for (int i = 0; i < num; i++){
String name = "张三" + i;
String sql = "INSERT INTO change_ds_Student (id,age,name) VALUES ( " + i + "," +i + ",'" + name + "')";
executeSQL(sql);
}
}
@Override
public void delectData(int num) {
try {
for (int i = 0; i < num; i++) {
String sql = "delete from change_ds_Student where id=" + i;
executeSQL(sql);
String sql1 = "delete from change_Student where id=" + i;
if (connection == null) {
connection = jdbcConnet.connetToTest3();
}
Statement st = connection.createStatement();
st.execute(sql1);
}
}catch (Exception e){
e.printStackTrace();
}
}
} }
logging.file=./log/canalDemo.log logging.file=./log/canalDemo.log
server.port=8094
spring.main.allow-bean-definition-overriding=true spring.main.allow-bean-definition-overriding=true
##连接中心数据库数据库mysql ##连接中心数据库数据库mysql
spring.datasource.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 spring.datasource.url=jdbc:mysql://localhost:3336/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=root spring.datasource.password=root
##前置数据库 ##前置数据库
url=jdbc:mysql://localhost:3306/canaltest3?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 url=jdbc:mysql://localhost:3336/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
driver=com.mysql.jdbc.Driver driver=com.mysql.jdbc.Driver
dbusername=root dbusername=root
dbpassword=root dbpassword=root
...@@ -19,4 +21,4 @@ destination=example ...@@ -19,4 +21,4 @@ destination=example
username="" username=""
password="" password=""
##subscribe.regex=canaltest\\..* ##subscribe.regex=canaltest\\..*
subscribe.regex=canaltest\\..*,canaltest2\\..* subscribe.regex=canaltest.student,canaltest2\\..*
\ No newline at end of file \ No newline at end of file
package com.zjty.switches;
import com.zjty.switches.service.CanalClientExample;
import com.zjty.switches.service.SwitchService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestData {
@Autowired
private SwitchService switchService;
@Test
public void testInsert(){
switchService.testData(12);
}
}
logging.file=./log/canalDemo.log logging.file=./log/canalDemo.log
server.port=8099
spring.main.allow-bean-definition-overriding=true spring.main.allow-bean-definition-overriding=true
##连接中心数据库数据库mysql ##连接中心数据库数据库mysql
spring.datasource.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 spring.datasource.url=jdbc:mysql://localhost:3336/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=ljj123456 spring.datasource.password=root
##前置数据库 ##前置数据库
url=jdbc:mysql://localhost:3306/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8 url=jdbc:mysql://localhost:3336/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
driver=com.mysql.jdbc.Driver driver=com.mysql.jdbc.Driver
dbusername=root dbusername=root
dbpassword=ljj123456 dbpassword=root
###canal ###canal
canal.adderss=127.0.0.1 canal.adderss=127.0.0.1
...@@ -19,7 +21,7 @@ destination=example ...@@ -19,7 +21,7 @@ destination=example
username="" username=""
password="" password=""
##subscribe.regex=canaltest\\..* ##subscribe.regex=canaltest\\..*
subscribe.regex=canaltest\\..*,canaltest2\\..* subscribe.regex=canaltest.student,canaltest2\\..*
## ftp config ## ftp config
spring.servlet.multipart.max-file-size=200MB spring.servlet.multipart.max-file-size=200MB
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论