提交 08c93622 authored 作者: xc's avatar xc

xc

上级 41163a20
......@@ -47,5 +47,11 @@
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.zjty.switches.controller;
import com.alibaba.fastjson.JSONObject;
import com.zjty.switches.exchange.ChangeData;
import com.zjty.switches.service.SwitchService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
......@@ -30,8 +31,6 @@ public class TestController {
Date start = new Date();
switchService.testData(id);
Date end = new Date();
// double se = (end.getTime() - start.getTime())/1000;
// System.out.printf("need time++++++++++++++++++++++++++++++++:" + se);
return ResponseEntity.ok(end.getTime() - start.getTime());
}
@PostMapping("/delete")
......@@ -41,7 +40,23 @@ public class TestController {
Date start = new Date();
switchService.delectData(id);
Date end = new Date();
double se = end.getTime() - start.getTime();
return ResponseEntity.ok(end.getTime() - start.getTime());
}
@PostMapping("/update")
public ResponseEntity update(@RequestBody JSONObject jsonObject){
String a=jsonObject.get("id").toString();
int id=Integer.parseInt(a);
Date start = new Date();
switchService.updateData(id);
Date end = new Date();
return ResponseEntity.ok(end.getTime() - start.getTime());
}
@PostMapping("/add")
public ResponseEntity addObj(@RequestBody ChangeData changeData) {
System.out.println(changeData);
return ResponseEntity.ok(changeData);
}
}
package com.zjty.switches.exchange;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ChangeData {
/**
* 操作类型,insert update delete
*/
private String eventType;
/**
* 表名
*/
private String tableName;
/**
* 字段名,value
*/
private List<Columns> columns;
}
package com.zjty.switches.exchange;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Columns {
/**
* 字段名
*/
private String name;
/**
* 值
*/
private String value;
/**
* 是否主键
*/
private boolean isKey;
}
package com.zjty.switches.service;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.zjty.switches.exchange.ChangeData;
import com.zjty.switches.exchange.Columns;
import com.zjty.switches.utils.HttpUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
......@@ -14,8 +18,10 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Component
......@@ -47,7 +53,7 @@ public class CanalClientExample implements CommandLineRunner {
/**
*数据备份
*/
public void canal() {
private void canal() {
// 创建链接
// CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
// 11111), "example", "", "");
......@@ -88,7 +94,7 @@ public class CanalClientExample implements CommandLineRunner {
Date end = new Date();
se = se + (end.getTime() - start.getTime());
System.out.println("need time++++++++++++++++++++++++++++++++:" + se);
// System.out.println("need time++++++++++++++++++++++++++++++++:" + se);
}
......@@ -111,6 +117,8 @@ public class CanalClientExample implements CommandLineRunner {
*/
private void printEntry(List<CanalEntry.Entry> entrys) {
String sql = null;
// System.out.println(entrys);
// System.out.println("===========================================");
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
......@@ -126,33 +134,49 @@ public class CanalClientExample implements CommandLineRunner {
CanalEntry.EventType eventType = rowChage.getEventType();
String tableName = entry.getHeader().getTableName();
String dbName = entry.getHeader().getSchemaName();
// System.out.println("-------------------------"+dbName);
int dataCount = 0;
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (!tableName.contains("change_ds_")){
tableName = "change_ds_"+tableName;
}else {
tableName = "change_"+tableName.split("_")[2];
}
sql = getSql(eventType, tableName, rowData, dbName);
// System.out.println(sql);
switchService.getChangeData(sql, tableName, dbName);
// System.out.println(rowData);
getChangeData(eventType, tableName, rowData);
}
}
}
private void getChangeData(CanalEntry.EventType eventType, String tableName, CanalEntry.RowData rowData) {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
if (columns.size() == 0 || StringUtils.isBlank(tableName)) {
return;
}
ChangeData changeData = new ChangeData();
List<Columns> list = new ArrayList<>();
changeData.setEventType(eventType.toString());
changeData.setTableName(tableName);
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
Columns columns1 = new Columns();
columns1.setName(column.getName());
columns1.setValue(column.getValue());
columns1.setKey(column.getIsKey());
list.add(columns1);
}
changeData.setColumns(list);
String jsonStr = JSONObject.toJSONString(changeData);
System.out.println(jsonStr);
try {
HttpUtils.sendPost("http://localhost:8086/producer/receive", jsonStr);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 解析canal数据,合成sql语句
* @param eventType 操作类型
......
......@@ -10,4 +10,5 @@ public interface SwitchService {
void getChangeData(String sql, String tableName, String dbName);
void testData(int num);
void delectData(int num);
void updateData(int num);
}
......@@ -70,13 +70,26 @@ public class SwitchServiceImpl implements SwitchService {
for (int i = 0; i < num; i++) {
String sql = "delete from change_ds_Student where id=" + i;
executeSQL(sql);
String sql1 = "delete from change_Student where id=" + i;
// String sql1 = "delete from change_Student where id=" + i;
//
// if (connection == null) {
// connection = jdbcConnet.connetToTest3();
// }
// Statement st = connection.createStatement();
// st.execute(sql1);
//
}
}catch (Exception e){
e.printStackTrace();
}
}
if (connection == null) {
connection = jdbcConnet.connetToTest3();
}
Statement st = connection.createStatement();
st.execute(sql1);
@Override
public void updateData(int num) {
try {
for (int i = 0; i < num; i++) {
String sql = "update change_ds_Student set name='李四' where id=" + i;
executeSQL(sql);
}
}catch (Exception e){
......
package com.zjty.switches.utils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
public class HttpUtils {
public static String sendPost(String url, String jsonArgs) throws UnsupportedEncodingException {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
//设置请求体
HttpHeaders headers = new HttpHeaders();
MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
//设置请求数据
HttpEntity<String> formEntity = new HttpEntity<>(jsonArgs, headers);
String result = restTemplate.postForObject(url, formEntity, String.class);
return result;
}
public static String sendGet(String url) throws UnsupportedEncodingException {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
//restTemplate.getForObject(url, String.class);
return restTemplate.getForObject(url, String.class);
}
public static String sendGet3(String url) throws UnsupportedEncodingException {
SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
requestFactory.setConnectTimeout(1000 * 2);
requestFactory.setReadTimeout(1000 * 2);
RestTemplate restTemplate = new RestTemplate(requestFactory);
restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
//restTemplate.getForObject(url, String.class);
return restTemplate.getForObject(url, String.class);
}
public static String sendGet2(String url, String s) throws UnsupportedEncodingException {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
String result = restTemplate.getForObject(url, String.class, s);
restTemplate.getForObject(url, String.class, s);
return result;
}
}
logging.file=./log/canalDemo.log
server.port=8094
server.port=8080
spring.main.allow-bean-definition-overriding=true
##连接中心数据库数据库mysql
spring.datasource.url=jdbc:mysql://localhost:3336/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.url=jdbc:mysql://localhost:3306/canaltest2?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
##前置数据库
url=jdbc:mysql://localhost:3336/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
url=jdbc:mysql://localhost:3306/canaltest?useSSL=false&serverTimezone=UTC&characterEncoding=UTF-8
driver=com.mysql.jdbc.Driver
dbusername=root
dbpassword=root
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论