Netty를 활용한 양방향 소켓 통신 구성
최해혁 사원
프로젝트를 진행하는데에 있어 소켓통신을 활용해 데이터를 주고 받아야 하는 환경이 있어 미리 학습을 하며 기록합니다.
목차
Netty 란?
유지관리 가능한 고성능 프로토콜 서버 및 클라이언트의 신속한 개발을 위한 비동기 이벤트 기반 네트워크 애플리케이션 프레임워크 입니다.
- Netty의 공식문서를 참고하여 프로젝트를 만들기전 간단한 코드를 통해 확인해보고 갑시다!
- 준비단계에서 알 수 있는 것은 보통 프로그래밍을 하게되면 hello world를 찍어본다. 하지만 netty는 처음시작이 hello world가 아닌 discard server이다.
- discard server란?
- 수신된 데이터를 응답없이 폐기하는 프로토콜이다.
- 기본적으로 ChannelInboundHandlerAdapter를 상속 받아 채널로 들어오는 통신 정보에 대한 설정을 하게 된다.
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
Spring boot환경에서 간단한 소켓통신 구성을 해보겠습니다. 아래는 해당 구성을 진행하는데 필요한 기술 스펙입니다.
- Spring boot : 3.1.2 version
- Netty-All: 4.1.94.final
- lombok
- Java: 17 version
- 만들어지는 프로젝트의 기본 패키지 구성은 위의 이미지와 같습니다.
Netty의 기본적인 흐름
기본적으로 Netty의 흐름은 위의 그림과 같은 흐름입니다. 이 때 우리의 채팅서버는 config -> NettyServerSocket -> NettyChannelInitializer -> TestHandler -> TestDecoder 등의 구성으로 이루어져 있습니다. 해당 흐름도와 관계를 보며 어떻게 이루어지는 것인지 알아보도록 하겠습니다.
1. Config
해당 config에는 기본 흐름중 첫번째인 Bootstrap부분이 포함되어 있고 네티 서버의 설정을 도와주는 녀석입니다.
/**
* ApplicationStartupTask
* 서버가 실행이 되고 Netty server를 시작하는 역할
*/
@Component
@RequiredArgsConstructor
public class ApplicationStartupTask implements ApplicationListener<ApplicationReadyEvent> {
// NettyServerSocket 의존성 주입
private final NettyServerSocket nettyServerSocket;
// (1)
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
nettyServerSocket.start();
}
}
- 주석 해석 및 정리
- (1): 어플리케이션이 완전히 준비된 후 실행되는 이벤트 핸들러,
시작될 때 별도의 초기화 작업이 필요한 경우 이런 패턴을 사용하여 가능
/**
* NettyConfiguration
* Netty의 전반적인 설정이 들어있는 클래스입니다.
* 호스트라던지 포트등 기본 값으로 들어가는 값들의 설정과 연결에 관한 설정입니다.
*
*/
@Configuration
@RequiredArgsConstructor
public class NettyConfiguration {
@Value("${server.host}")
private String host;
@Value("${server.port}")
private int port;
@Value("${server.netty.boss-count}")
private int bossCount;
@Value("${server.netty.worker-count}")
private int workerCount;
@Value("${server.netty.keep-alive}")
private boolean keepAlive;
@Value("${server.netty.backlog}")
private int backlog;
@Bean
public ServerBootstrap serverBootstrap(NettyChannelInitializer nettyChannelInitializer) {
// ServerBootstrap : 서버 설정을 도와주는 class
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup(), workerGroup()) // (1)
.channel(NioServerSocketChannel.class) // (2)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(nettyChannelInitializer); // (3)
serverBootstrap.option(ChannelOption.SO_BACKLOG, backlog);
return serverBootstrap;
}
@Bean(destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount); // (4)
}
@Bean(destroyMethod = "shutdownGracefully") // (5)
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount); // (6)
}
@Bean
public InetSocketAddress tcpPort() {
// (7)
return new InetSocketAddress(host, port); // 적절한 포트 번호와 호스트 정보로 변경해주세요.
}
}
- 주석 해석 및 정리
- (1): 루프 그룹 설정
- (2): 비동기 네트워킹을 위한 채널을 설정
- (3): 사용자 정의 채널 초기화 클래스 설정
- (4): 들어오는 연결을 수락하는 역할을 담당함. 거기서 사용 될 스레드 수를 정의
- (5): 스프링 컨테이너가 종료될 때 해당 메서드가 호출되도록 한다.
네티의 이벤트 루프가 안전히 종료되게 해줍니다.- (6): 연결된 클라이언트와의 데이터 통신을 처리하는 역할을 담당 . 거기서 사용될 스레드수를 정의
- (7): 여기서 포트 번호와 호스트 정보를 생성하여 반환합니다.
2. NettyServerSocket
- Netty 서버의 시작 및 종료를 관리하는 역할을 합니다.
- 해당 클래스는 채널 이니셜라이져가 연결이 될 때 항상 초기화를 해줍니다.
/**
* NettyServerSocket
*
*
*/
@Slf4j
@RequiredArgsConstructor
@Component
public class NettyServerSocket {
private final ServerBootstrap serverBootstrap;
private final InetSocketAddress tcpPort;
private Channel serverChannel;
// (1)
public void start() {
try {
// (2)
ChannelFuture serverChannelFuture = serverBootstrap.bind(tcpPort).sync();
// (3)
serverChannel = serverChannelFuture.channel().closeFuture().sync().channel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@PreDestroy // (4)
public void stop() {
if (serverChannel != null) {
serverChannel.close();
serverChannel.parent().closeFuture();
}
}
}
- 주석 해석 및 정리
- (1): ServerBootStrap 을 사용하여 서버를 시작 주입된
tcpPort를 사용하여 지정된 주소의 포트에서 서버를 바인딩- (2): bind 호출후 sync를 호출하여 작업이 완료 될 때까지 대기 한다.
- (3): 필드에 바인딩 작업의 결과로 반환된 channel을 저장
- (4): 스프링 컨테이너에서 빈이 파괴되기 직전에 호출되는 메서드를 표시
3. NettyChannelInitializer
- 새로운 연결이 수립될 때 마다 채널 파이프라인을 초기화해주는 역할을 합니다.
- 덕분에 클라이언트와의 1:1 관계의 연결이 여러개 이루어 질 수 있습니다.
/**
* NettyChannelInitializer
*
*/
@Component
@RequiredArgsConstructor
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
private final TestHandler testHandler;
// (1)
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// (2)
ChannelPipeline pipeline = ch.pipeline();
TestDecoder testDecoder = new TestDecoder();
// (3)
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8), testDecoder, testHandler);
}
}
- 주석 해석 및 정리
- (1): 새로운 연결이 들어올 때마다 호출된다.
- (2): 채널 파이프라인을 들고옵니다.
- (3): StringDecoder 는 바이트 데이터를 UTF-8로 변경합니다.
testDecoder는 특정 길이만큼 읽고 문자열로 변환한다. testHandler는 연결관리 및 메시지 처리
3.1 . Decoder
- 해당 부분은 채널 이니셜라이져에 들어가는 부분입니다.
- 어떠한 형식으로 데이터들을 주고 받을지 정하는 부분입니다.
/**
* TestDecoder
*
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestDecoder extends ByteToMessageDecoder {
private final int DATA_LENGTH = 2048;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
log.info("readableBytes: {}" , in.readableBytes());
if (in.readableBytes() < DATA_LENGTH) {
return;
}
// (1)
ByteBuf buffer = in.readBytes(DATA_LENGTH);
String message = buffer.toString(StandardCharsets.UTF_8);
out.add(message); // String 객체를 out에 추가
buffer.release(); // (2)
}
}
- 주석 해석 및 정리
- (1): 버퍼의 내용을 String 객체로 변환
- (2): 버퍼의 참조 카운트를 감소시켜 메모리 누수를 방지
4. Handler
- Netty 서버의 가장 핵심적인 부분입니다.
- 클라이언트와 연결을 수립하고 데이터를 쓰고 읽는 등의 중요한 로직들이 담겨져있습니다.
/**
* TestHandler
* Netty server 의 핵심부분 클라이언트와의 연결 수립, 데이터 읽기 및 쓰기 , 예외처리등의 로직이 담겨있다.
*/
@Slf4j
@Component
@ChannelHandler.Sharable // (1)
public class TestHandler extends ChannelInboundHandlerAdapter {
private final ChatService chatService;
public TestHandler(ChatService chatService) {
this.chatService = chatService;
}
// 클라이언트가 서버에 연결 될 때 호출된다.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
chatService.addClient(ctx); // (2)
log.info("Client connected: " + ctx.channel().remoteAddress());
}
// 클라이언트가 서버에서 연결이 종료될 때 호출이 됩니다.
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
chatService.removeClient(ctx); // (3)
log.info("Client disconnected: " + ctx.channel().remoteAddress());
}
// 클라이언트로부터 메세지를 읽을 때 호출이 된다.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// (4)
log.info("channelRead called");
try {
String receivedMessage = (String) msg;
chatService.sendRequestToClient(ctx, receivedMessage);
log.info("received message: {}", receivedMessage);
} catch (Exception e) {
e.printStackTrace();
}
}
// 채널에서 예외가 발생할 때 호출이된다.
// 클라이언트와의 연결을 닫아버리고 예외정보를 출력합니다.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
- 주석 해석 및 정리
- (1): 여러 채널에서 핸들러 인스턴스를 공유할 수 있음을 나타낸다.
- (2): 해당 서비스에 클라이언트를 등록 로그에 연결정보를 출력
- (3): 클라이언트를 서비스에서 제거하고 로그에 연결해제 정보 출력
- (4): object 인 메세지를 String 타입으로 변환하고
sendRequestToClient 메서드를 호출하여 메세지를 보낸 클라이언트에게 답을 하는 형식입니다.
위의 과정을 진행하고 나면 Netty 의 서버구성은 완료됩니다.
양방향 소켓 통신으로 채팅테스트를 위해 아래의 과정은 채팅을 위한 클래스와 메서드들이기 때문에 참고사항입니다.
Chat Service
@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {
private final ClientMappingService clientMappingService;
private final Map<ChannelHandlerContext, Integer> clientsOrderMap = new ConcurrentHashMap<>();
private final Map<ChannelHandlerContext, ClientState> clientStates = new ConcurrentHashMap<>();
private ChannelHandlerContext currentChattingClient;
private int orderCounter = 0;
private enum ClientState {
NORMAL, CHATTING
}
// 클라이언트 연결 시 호출
public void addClient(ChannelHandlerContext ctx) {
clientMappingService.addClient(ctx);
clientsOrderMap.put(ctx, ++orderCounter);
listClients();
}
// 클라이언트 연결 종료 시 호출
public void removeClient(ChannelHandlerContext ctx) {
clientsOrderMap.remove(ctx);
}
// 연결된 클라이언트 목록 출력
public void listClients() {
if (clientsOrderMap.isEmpty()) {
log.info("현재 연결된 클라이언트가 없습니다.");
return;
}
log.info("연결된 클라이언트 목록: \n");
for (Map.Entry<ChannelHandlerContext, Integer> entry : clientsOrderMap.entrySet()) {
ChannelHandlerContext ctx = entry.getKey();
int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
int order = entry.getValue();
log.info("Port: " + port + ", Order: " + order + "\n");
}
}
// 특정 클라이언트에게 메시지 전송
public void sendRequestToClient(ChannelHandlerContext ctx, String receivedMessage) {
log.info("sendRequest: {}", receivedMessage);
int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
String requestMessage = "What do you want? (Port: " + port + ", Order: " + clientsOrderMap.get(ctx) + ")\n";
ByteBuf buf = Unpooled.copiedBuffer(requestMessage, CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
// 30초마다 클라이언트의 연결 상태 로깅
@Scheduled(fixedDelay = 30000)
public void logClientsStatus() {
for (Map.Entry<ChannelHandlerContext, Integer> entry : clientsOrderMap.entrySet()) {
ChannelHandlerContext ctx = entry.getKey();
int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
System.out.println("I'm live. Port: " + port + ", Order: " + entry.getValue());
}
}
// 10초마다 모든 클라이언트에게 현재 시각과 포트 뿌리기
@Scheduled(fixedRate = 10000)
public void sendTimeToClients() {
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
for (ChannelHandlerContext ctx : clientsOrderMap.keySet()) {
int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
String message = "Current Time: " + currentTime + ", Your Port: " + port + "\n";
ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
}
}
- 아래의 사진은 서버가 시작된 후 클라이언트와 연결될 당시의 콘솔 화면입니다.
- 다음 사진은 30초마다 클라이언트와의 연결상태를 로그로 프린트한 모습입니다.
- 여기서 확인되는 Port는 클라이언트의 port입니다.
- Order 부분은 서버와 연결된 순서를 나타냅니다.
- 다음은 클라이언트에서 서버에 메세지가 오면 나오는 서버측 콘솔 화면입니다.
- 클라이언트에서 받은 메세지를 다시 클라이언트에게 What do want? 라는 메세지를 붙여 다시 발송합니다
- 이 때 메세지를 보냈던 해당 클라이언트에게 메세지가 전송됩니다.
- 해당 클라이언트 포트번호와 자신이 서버에 몇번째로 연결됬는지 까지 같이 보냅니다.
- 다음 화면은 서버와 연결이 된 클라이언트 측 화면으로 10초마다 현재시간을 서버측에서 메세지로 보내줍니다.
- 자신이 보낸 메세지에 서버측에서 답이 올 때도 항상 현재시간을 10초에 한번씩 뿌려주는 건 멈추지 않습니다.