提交 db3a4e0c authored 作者: 黄夏豪's avatar 黄夏豪

feat(base): 修改了socket无法使用的BUG

refactor(base):修复了由于增加了TestData导致的无法编译的问题
上级 ed0709c1
......@@ -166,9 +166,14 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
......
......@@ -3,15 +3,18 @@ package org.matrix;
import lombok.extern.slf4j.Slf4j;
import org.matrix.actuators.datasource.DataSourceDTO;
import org.matrix.actuators.datasource.IDataSourceService;
import org.matrix.socket.SocketServer;
import org.matrix.util.SpringUtils;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* Hello world!
......@@ -51,7 +54,5 @@ public class BaseBootApplication implements CommandLineRunner {
public void run(String... args) throws Exception {
log.info("[初始化] 初始化数据源" + driverClassName);
dataSourceService.add(new DataSourceDTO("初始数据源",driverClassName,url,username,password));
//在spring容器启动后,取到已经初始化的SocketServer,启动Socket服务
new SocketServer(8009).start();
}
}
......@@ -13,6 +13,7 @@ import org.matrix.actuators.httpclient.HttpRequestDetail;
import org.matrix.actuators.httpclient.HttpResponseDetail;
import org.matrix.actuators.usecase.CaseActuator;
import org.matrix.database.entity.DynamicVariable;
import org.matrix.database.entity.TestCase;
import org.matrix.database.entity.TestCaseBTO;
import org.matrix.database.service.IConnectService;
import org.matrix.database.service.IDynamicVariableService;
......@@ -20,6 +21,8 @@ import org.matrix.database.service.ITestCaseService;
import org.matrix.database.service.ITestDataService;
import org.matrix.enums.DynamicVarType;
import org.matrix.exception.GlobalException;
import org.matrix.socket.LogQueue;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
......@@ -265,3 +268,4 @@ public class SqlExpActuator implements Actuator {
return sqlRegObjects;
}
}
......@@ -11,6 +11,7 @@ import org.matrix.actuators.checkpoint.CheckPoint;
import org.matrix.actuators.checkpoint.CheckPointResult;
import org.matrix.actuators.httpclient.HttpRequestDetail;
import org.matrix.actuators.httpclient.HttpResponseDetail;
import org.matrix.database.entity.TestCaseBTO;
import org.matrix.database.entity.TestData;
import org.matrix.socket.LogQueue;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -39,13 +40,15 @@ public class CaseActuator implements Actuator {
/**
* 执行测试用例
*/
public TestCaseExecuteResult executeTestCase(TestCase testCase, TestData testData, Long envId, Long projectId) {
public TestCaseExecuteResult executeTestCase(TestCaseBTO testCaseBTO, Long envId, Long projectId) {
LogQueue.add(Thread.currentThread().getId(),String.format("[用例解析器] 当前线程ID为: %S", Thread.currentThread().getId()));
//todo 李迪凡 执行前置动作
//执行测试用例的本体内容
HttpResponseDetail baseTestCaseResponseDetail = null;
TestCase testCase = testCaseBTO.getTestCase();
TestData testData = testCaseBTO.getTestData();
if (testCase.getType().equals(TestCaseTypeEnum.HTTP.getValue())) {
HttpRequestDetail httpRequestDetail = JSON.parseObject(testCase.getDetail(), HttpRequestDetail.class);
HttpRequestDetail httpRequestDetail = JSON.parseObject(testData.getDetail(), HttpRequestDetail.class);
baseTestCaseResponseDetail = httpClientActuator.sendHttpRequest(httpRequestDetail,envId,projectId);
}
//todo 李迪凡 执行测试后动作
......
package org.matrix.socket;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.matrix.actuators.usecase.CaseActuator;
import org.matrix.actuators.usecase.TestCaseExecuteResult;
import org.matrix.database.entity.TestCase;
import org.matrix.util.SpringUtils;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.matrix.socket.SocketHandler.*;
import static org.matrix.socket.SocketPool.add;
/**
* @author zhoujian
* 自定义封装的连接的客户端
*/
@Slf4j
@Data
public class ClientSocket implements Runnable{
private Socket socket;
private DataInputStream inputStream;
private DataOutputStream outputStream;
private Long key;
private String message;
private CaseActuator caseActuator;
ClientSocket(){
this.caseActuator = SpringUtils.getBean("caseActuator");
}
@Override
public void run() {
this.key = Thread.currentThread().getId();
add(this);
System.out.println("-----key:"+key);
while (true){
System.out.println("1111111");
byte[] bytes = new byte[1024];
try {
this.getInputStream().read(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
}
// System.out.println("意外停止了");
//接受socket消息,根据前端提交的消息来判断,是否应该执行下一步
// while (true){
// String message = onMessage(this);
// System.out.println("接受到了客户端传来的消息");
// System.out.println(message);
// String json = "{\n" +
// " \"url\":\"http://127.0.0.1:8080/test/sendMessage\",\n" +
// " \"method\":\"GET\",\n" +
// " \"requestType\":\"QUERY\",\n" +
// " \"headers\":[\n" +
// " {\n" +
// " \"name\":\"cookie\",\n" +
// " \"value\":\"123456\"\n" +
// " }\n" +
// " ],\n" +
// " \"requestBodies\":[\n" +
// " {\n" +
// " \"key\":\"tableName\",\n" +
// " \"type\":\"TEXT\",\n" +
// " \"value\":\"张三\"\n" +
// " }\n" +
// " ]\n" +
// "}";
// System.out.println(json);
// TestCase testCase = new TestCase();
// testCase.setName("name");
// testCase.setType(1);
// testCase.setDetail(json);
// testCase.setAbnormalCheckpoint(1);
// testCase.setContainCheckpoint("张三,李四");
// testCase.setNoContainCheckpoint("张三,李四");
// testCase.setJsonpathCheckpoint("contains({$..category},'${componentName}[0]')");
// TestCaseExecuteResult testCaseExecuteResult = caseActuator.executeTestCase(testCase,1L,1L);
// System.out.println(testCaseExecuteResult);
// if (!StringUtils.isEmpty(message)){
// break;
// }
// }
// //每5秒进行一次客户端连接,判断是否需要释放资源
// while (true){
// try {
// TimeUnit.SECONDS.sleep(5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// if (isSocketClosed(this)){
// log.info("客户端已关闭,其Key值为:{}", this.getKey());
// //关闭对应的服务端资源
// close(this);
// break;
// }
// }
}
}
\ No newline at end of file
package org.matrix.socket;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Component
public class HttpAuthHandler extends TextWebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
System.out.println(message);
session.sendMessage(new TextMessage("收到了"));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
}
}
......@@ -24,11 +24,11 @@ public class LogQueue {
LOG_MAP.put(threadId,logList);
}
//将log发送出去
sendMessage(threadId,log);
// sendMessage(threadId,log);
}
public static void sendMessage(Long threadId,String log){
SocketHandler.sendMessage(SocketPool.get(threadId),log);
}
// public static void sendMessage(Long threadId,String log){
// SocketHandler.sendMessage(SocketPool.get(threadId),log);
// }
}
package org.matrix.socket;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import static org.matrix.socket.SocketPool.*;
/**
* Socket操作处理类
* @author huangxiahao
*/
@Slf4j
public class SocketHandler{
/**
* 将连接的Socket注册到Socket池中
* @param socket
* @return
*/
public static ClientSocket register(Socket socket){
ClientSocket clientSocket = new ClientSocket();
clientSocket.setSocket(socket);
try {
clientSocket.setInputStream(new DataInputStream(socket.getInputStream()));
clientSocket.setOutputStream(new DataOutputStream(socket.getOutputStream()));
return clientSocket;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* 向指定客户端发送信息
* @param clientSocket
* @param message
*/
public static void sendMessage(ClientSocket clientSocket, String message){
try {
clientSocket.getOutputStream().write(message.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("发送信息异常:{}", e.getMessage());
close(clientSocket);
}
}
/**
* 获取指定客户端的上传信息
*/
public static String onMessage(ClientSocket clientSocket){
byte[] bytes = new byte[1024];
try {
clientSocket.getInputStream().read(bytes);
return new String(bytes, StandardCharsets.UTF_8);
} catch (IOException e) {
e.printStackTrace();
close(clientSocket);
}
return null;
}
/**
* 指定Socket资源回收
*/
public static void close(ClientSocket clientSocket){
log.info("进行资源回收");
if (clientSocket != null){
log.info("开始回收socket相关资源,其Key为{}", clientSocket.getKey());
remove(clientSocket.getKey());
Socket socket = clientSocket.getSocket();
try {
socket.shutdownInput();
socket.shutdownOutput();
} catch (IOException e) {
log.error("关闭输入输出流异常,{}", e.getMessage());
}finally {
try {
socket.close();
} catch (IOException e) {
log.error("关闭socket异常{}", e.getMessage());
}
}
}
}
/**
* 发送数据包,判断数据连接状态
* @param clientSocket
* @return
*/
public static boolean isSocketClosed(ClientSocket clientSocket){
try {
clientSocket.getSocket().sendUrgentData(1);
return false;
} catch (IOException e) {
return true;
}
}
}
\ No newline at end of file
......@@ -7,22 +7,22 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class SocketPool {
private static final ConcurrentHashMap<Long, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();
public static void add(ClientSocket clientSocket){
if (clientSocket != null && clientSocket.getKey()!=null) {
ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
}
}
public static void remove(Long key){
if (key!=null) {
ONLINE_SOCKET_MAP.remove(key);
}
}
public static ClientSocket get(Long key){
return ONLINE_SOCKET_MAP.get(key);
}
// private static final ConcurrentHashMap<Long, ClientSocket> ONLINE_SOCKET_MAP = new ConcurrentHashMap<>();
//
//
// public static void add(ClientSocket clientSocket){
// if (clientSocket != null && clientSocket.getKey()!=null) {
// ONLINE_SOCKET_MAP.put(clientSocket.getKey(), clientSocket);
// }
// }
//
// public static void remove(Long key){
// if (key!=null) {
// ONLINE_SOCKET_MAP.remove(key);
// }
// }
//
// public static ClientSocket get(Long key){
// return ONLINE_SOCKET_MAP.get(key);
// }
}
\ No newline at end of file
package org.matrix.socket;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.matrix.socket.SocketHandler.register;
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class SocketServer extends Thread{
private Integer port;
private boolean started;
private ServerSocket serverSocket;
private ExecutorService executorService = Executors.newCachedThreadPool();
public SocketServer(Integer port) {
this.port = port;
}
@Override
public void run() {
try {
serverSocket = new ServerSocket( port);
started = true;
log.info("Socket服务已启动,占用端口: {}", serverSocket.getLocalPort());
} catch (IOException e) {
log.error("端口冲突,异常信息:{}", e);
System.exit(0);
}
while (started){
try {
Socket socket = serverSocket.accept();
socket.setKeepAlive(true);
ClientSocket register = register(socket);
log.info("客户端已连接");
if (register != null){
executorService.submit(register);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
package org.matrix.socket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private HttpAuthHandler httpAuthHandler;
// @Autowired
// private MyInterceptor myInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(httpAuthHandler, "myWS")
.setAllowedOrigins("*");
}
}
\ No newline at end of file
......@@ -42,14 +42,8 @@ public class AppTest
String uuid = UUID.randomUUID().toString();
outputStream.write(uuid.getBytes());
while (true){
byte[] buff = new byte[1024];
inputStream.read(buff);
String buffer = new String(buff, "utf-8");
if (StringUtils.isNotBlank(buffer)){
System.out.println(buffer);
outputStream.write(uuid.getBytes());
}
String uuid1 = UUID.randomUUID().toString();
outputStream.write(uuid1.getBytes());
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论