프로젝트를 진행하는데에 있어 소켓통신을 활용해 데이터를 주고 받아야 하는 환경이 있어 미리 학습을 하며 기록합니다.

목차

  1. Netty의 기본적인 흐름
  2. Config
  3. NettyServerSocket
  4. NettyChannelInitializer
    3-1. Decoder
  5. Handler
  6. Chat Service

Netty 란?


유지관리 가능한 고성능 프로토콜 서버 및 클라이언트의 신속한 개발을 위한 비동기 이벤트 기반 네트워크 애플리케이션 프레임워크 입니다.

  • Netty의 공식문서를 참고하여 프로젝트를 만들기전 간단한 코드를 통해 확인해보고 갑시다!
    • 준비단계에서 알 수 있는 것은 보통 프로그래밍을 하게되면 hello world를 찍어본다. 하지만 netty는 처음시작이 hello world가 아닌 discard server이다.
  • discard server란?
    • 수신된 데이터를 응답없이 폐기하는 프로토콜이다.
    • 기본적으로 ChannelInboundHandlerAdapter를 상속 받아 채널로 들어오는 통신 정보에 대한 설정을 하게 된다.



package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}


  • Spring boot환경에서 간단한 소켓통신 구성을 해보겠습니다. 아래는 해당 구성을 진행하는데 필요한 기술 스펙입니다.

    • Spring boot : 3.1.2 version
    • Netty-All: 4.1.94.final
    • lombok
    • Java: 17 version


  • 만들어지는 프로젝트의 기본 패키지 구성은 위의 이미지와 같습니다.


Netty의 기본적인 흐름



기본적으로 Netty의 흐름은 위의 그림과 같은 흐름입니다. 이 때 우리의 채팅서버는 config -> NettyServerSocket -> NettyChannelInitializer -> TestHandler -> TestDecoder 등의 구성으로 이루어져 있습니다. 해당 흐름도와 관계를 보며 어떻게 이루어지는 것인지 알아보도록 하겠습니다.


1. Config

해당 config에는 기본 흐름중 첫번째인 Bootstrap부분이 포함되어 있고 네티 서버의 설정을 도와주는 녀석입니다.



/**
 *  ApplicationStartupTask
 *  서버가 실행이 되고 Netty server를 시작하는 역할
 */

@Component
@RequiredArgsConstructor
public class ApplicationStartupTask implements ApplicationListener<ApplicationReadyEvent> {
    // NettyServerSocket 의존성 주입
    private final NettyServerSocket nettyServerSocket;

    // (1)
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        nettyServerSocket.start();
    }
}


  • 주석 해석 및 정리
    • (1): 어플리케이션이 완전히 준비된 후 실행되는 이벤트 핸들러,
      시작될 때 별도의 초기화 작업이 필요한 경우 이런 패턴을 사용하여 가능



/**
 *  NettyConfiguration
 *  Netty의 전반적인 설정이 들어있는 클래스입니다.
 *  호스트라던지 포트등 기본 값으로 들어가는 값들의 설정과 연결에 관한 설정입니다.
 * 
 */
 
 @Configuration
@RequiredArgsConstructor
public class NettyConfiguration {

    @Value("${server.host}")
    private String host;
    @Value("${server.port}")
    private int port;
    @Value("${server.netty.boss-count}")
    private int bossCount;
    @Value("${server.netty.worker-count}")
    private int workerCount;
    @Value("${server.netty.keep-alive}")
    private boolean keepAlive;
    @Value("${server.netty.backlog}")
    private int backlog;

    @Bean
    public ServerBootstrap serverBootstrap(NettyChannelInitializer nettyChannelInitializer) {
        // ServerBootstrap : 서버 설정을 도와주는 class
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup(), workerGroup()) // (1) 
                .channel(NioServerSocketChannel.class)    // (2)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childHandler(nettyChannelInitializer);   // (3)
        serverBootstrap.option(ChannelOption.SO_BACKLOG, backlog);

        return serverBootstrap;
    }

    @Bean(destroyMethod = "shutdownGracefully")
    public NioEventLoopGroup bossGroup() {
        return new NioEventLoopGroup(bossCount); // (4)
    }

    @Bean(destroyMethod = "shutdownGracefully") // (5)
    public NioEventLoopGroup workerGroup() {
        return new NioEventLoopGroup(workerCount); // (6)
    }

    @Bean
    public InetSocketAddress tcpPort() {
        // (7)
        return new InetSocketAddress(host, port); // 적절한 포트 번호와 호스트 정보로 변경해주세요.
    }
}


  • 주석 해석 및 정리
    • (1): 루프 그룹 설정
    • (2): 비동기 네트워킹을 위한 채널을 설정
    • (3): 사용자 정의 채널 초기화 클래스 설정
    • (4): 들어오는 연결을 수락하는 역할을 담당함. 거기서 사용 될 스레드 수를 정의
    • (5): 스프링 컨테이너가 종료될 때 해당 메서드가 호출되도록 한다.
      네티의 이벤트 루프가 안전히 종료되게 해줍니다.
    • (6): 연결된 클라이언트와의 데이터 통신을 처리하는 역할을 담당 . 거기서 사용될 스레드수를 정의
    • (7): 여기서 포트 번호와 호스트 정보를 생성하여 반환합니다.


2. NettyServerSocket


  • Netty 서버의 시작 및 종료를 관리하는 역할을 합니다.
  • 해당 클래스는 채널 이니셜라이져가 연결이 될 때 항상 초기화를 해줍니다.


/**
 *  NettyServerSocket
 * 
 * 
 */ 

@Slf4j
@RequiredArgsConstructor
@Component
public class NettyServerSocket {
    private final ServerBootstrap serverBootstrap;
    private final InetSocketAddress tcpPort;
    private Channel serverChannel;

    // (1)
    public void start() {
        try {
            // (2)
            ChannelFuture serverChannelFuture = serverBootstrap.bind(tcpPort).sync();

            // (3)
            serverChannel = serverChannelFuture.channel().closeFuture().sync().channel();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @PreDestroy // (4)
    public void stop() {
        if (serverChannel != null) {
            serverChannel.close();
            serverChannel.parent().closeFuture();
        }
    }
}


  • 주석 해석 및 정리
    • (1): ServerBootStrap 을 사용하여 서버를 시작 주입된
      tcpPort를 사용하여 지정된 주소의 포트에서 서버를 바인딩
    • (2): bind 호출후 sync를 호출하여 작업이 완료 될 때까지 대기 한다.
    • (3): 필드에 바인딩 작업의 결과로 반환된 channel을 저장
    • (4): 스프링 컨테이너에서 빈이 파괴되기 직전에 호출되는 메서드를 표시


3. NettyChannelInitializer


  • 새로운 연결이 수립될 때 마다 채널 파이프라인을 초기화해주는 역할을 합니다.
  • 덕분에 클라이언트와의 1:1 관계의 연결이 여러개 이루어 질 수 있습니다.


/**
 *  NettyChannelInitializer
 * 
 */ 
@Component
@RequiredArgsConstructor
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
    private final TestHandler testHandler;

    // (1)
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // (2)
        ChannelPipeline pipeline = ch.pipeline();

        TestDecoder testDecoder = new TestDecoder();
        // (3)
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8), testDecoder, testHandler);

    }
}


  • 주석 해석 및 정리
    • (1): 새로운 연결이 들어올 때마다 호출된다.
    • (2): 채널 파이프라인을 들고옵니다.
    • (3): StringDecoder 는 바이트 데이터를 UTF-8로 변경합니다.
      testDecoder는 특정 길이만큼 읽고 문자열로 변환한다. testHandler는 연결관리 및 메시지 처리


3.1 . Decoder


  • 해당 부분은 채널 이니셜라이져에 들어가는 부분입니다.
  • 어떠한 형식으로 데이터들을 주고 받을지 정하는 부분입니다.



/**
 *  TestDecoder
 * 
 */

@Slf4j
@Component
@RequiredArgsConstructor
public class TestDecoder extends ByteToMessageDecoder {
    private final int DATA_LENGTH = 2048;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        log.info("readableBytes: {}" , in.readableBytes());
        if (in.readableBytes() < DATA_LENGTH) {
            return;
        }

        // (1)
        ByteBuf buffer = in.readBytes(DATA_LENGTH);
        String message = buffer.toString(StandardCharsets.UTF_8);
        out.add(message); // String 객체를 out에 추가
        buffer.release(); // (2)
    }
}


  • 주석 해석 및 정리
    • (1): 버퍼의 내용을 String 객체로 변환
    • (2): 버퍼의 참조 카운트를 감소시켜 메모리 누수를 방지


4. Handler


  • Netty 서버의 가장 핵심적인 부분입니다.
  • 클라이언트와 연결을 수립하고 데이터를 쓰고 읽는 등의 중요한 로직들이 담겨져있습니다.



/**
 *  TestHandler
 *  Netty server 의 핵심부분 클라이언트와의 연결 수립, 데이터 읽기 및 쓰기 , 예외처리등의 로직이 담겨있다.
 */

@Slf4j
@Component
@ChannelHandler.Sharable // (1)
public class TestHandler extends ChannelInboundHandlerAdapter {

    private final ChatService chatService;

    public TestHandler(ChatService chatService) {
        this.chatService = chatService;
    }

    // 클라이언트가 서버에 연결 될 때 호출된다.
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        chatService.addClient(ctx); // (2)
        log.info("Client connected: " + ctx.channel().remoteAddress());
    }

    // 클라이언트가 서버에서 연결이 종료될 때 호출이 됩니다.
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        chatService.removeClient(ctx); // (3)
        log.info("Client disconnected: " + ctx.channel().remoteAddress());
    }

    // 클라이언트로부터 메세지를 읽을 때 호출이 된다.
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // (4)
        log.info("channelRead called");
        try {
            String receivedMessage = (String) msg;
            chatService.sendRequestToClient(ctx, receivedMessage);
            log.info("received message: {}", receivedMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    // 채널에서 예외가 발생할 때 호출이된다.
    // 클라이언트와의 연결을 닫아버리고 예외정보를 출력합니다.
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        cause.printStackTrace();
    }
}


  • 주석 해석 및 정리
    • (1): 여러 채널에서 핸들러 인스턴스를 공유할 수 있음을 나타낸다.
    • (2): 해당 서비스에 클라이언트를 등록 로그에 연결정보를 출력
    • (3): 클라이언트를 서비스에서 제거하고 로그에 연결해제 정보 출력
    • (4): object 인 메세지를 String 타입으로 변환하고
      sendRequestToClient 메서드를 호출하여 메세지를 보낸 클라이언트에게 답을 하는 형식입니다.




위의 과정을 진행하고 나면 Netty 의 서버구성은 완료됩니다.
양방향 소켓 통신으로 채팅테스트를 위해 아래의 과정은 채팅을 위한 클래스와 메서드들이기 때문에 참고사항입니다.

Chat Service


@Slf4j
@Service
@RequiredArgsConstructor
public class ChatService {

    private final ClientMappingService clientMappingService;
    private final Map<ChannelHandlerContext, Integer> clientsOrderMap = new ConcurrentHashMap<>();
    private final Map<ChannelHandlerContext, ClientState> clientStates = new ConcurrentHashMap<>();
    private ChannelHandlerContext currentChattingClient;
    private int orderCounter = 0;

    private enum ClientState {
        NORMAL, CHATTING
    }

    // 클라이언트 연결 시 호출
    public void addClient(ChannelHandlerContext ctx) {
        clientMappingService.addClient(ctx);
        clientsOrderMap.put(ctx, ++orderCounter);
        listClients();
    }

    // 클라이언트 연결 종료 시 호출
    public void removeClient(ChannelHandlerContext ctx) {
        clientsOrderMap.remove(ctx);
    }

    // 연결된 클라이언트 목록 출력
    public void listClients() {
        if (clientsOrderMap.isEmpty()) {
            log.info("현재 연결된 클라이언트가 없습니다.");
            return;
        }

        log.info("연결된 클라이언트 목록: \n");
        for (Map.Entry<ChannelHandlerContext, Integer> entry : clientsOrderMap.entrySet()) {
            ChannelHandlerContext ctx = entry.getKey();
            int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
            int order = entry.getValue();
            log.info("Port: " + port + ", Order: " + order + "\n");
        }
    }

    // 특정 클라이언트에게 메시지 전송
    public void sendRequestToClient(ChannelHandlerContext ctx, String receivedMessage) {
        log.info("sendRequest: {}", receivedMessage);
        int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
        String requestMessage = "What do you want? (Port: " + port + ", Order: " + clientsOrderMap.get(ctx) + ")\n";
        ByteBuf buf = Unpooled.copiedBuffer(requestMessage, CharsetUtil.UTF_8);
        ctx.writeAndFlush(buf);
    }

    // 30초마다 클라이언트의 연결 상태 로깅
    @Scheduled(fixedDelay = 30000)
    public void logClientsStatus() {
        for (Map.Entry<ChannelHandlerContext, Integer> entry : clientsOrderMap.entrySet()) {
            ChannelHandlerContext ctx = entry.getKey();
            int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
            System.out.println("I'm live. Port: " + port + ", Order: " + entry.getValue());
        }
    }

    // 10초마다 모든 클라이언트에게 현재 시각과 포트 뿌리기
    @Scheduled(fixedRate = 10000)
    public void sendTimeToClients() {
        String currentTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
        for (ChannelHandlerContext ctx : clientsOrderMap.keySet()) {
            int port = ((InetSocketAddress) ctx.channel().remoteAddress()).getPort();
            String message = "Current Time: " + currentTime + ", Your Port: " + port + "\n";
            ByteBuf buf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);
            ctx.writeAndFlush(buf);
        }
    }
}


  • 아래의 사진은 서버가 시작된 후 클라이언트와 연결될 당시의 콘솔 화면입니다.




  • 다음 사진은 30초마다 클라이언트와의 연결상태를 로그로 프린트한 모습입니다.
    • 여기서 확인되는 Port는 클라이언트의 port입니다.
    • Order 부분은 서버와 연결된 순서를 나타냅니다.




  • 다음은 클라이언트에서 서버에 메세지가 오면 나오는 서버측 콘솔 화면입니다.
  • 클라이언트에서 받은 메세지를 다시 클라이언트에게 What do want? 라는 메세지를 붙여 다시 발송합니다
    • 이 때 메세지를 보냈던 해당 클라이언트에게 메세지가 전송됩니다.
    • 해당 클라이언트 포트번호와 자신이 서버에 몇번째로 연결됬는지 까지 같이 보냅니다.




  • 다음 화면은 서버와 연결이 된 클라이언트 측 화면으로 10초마다 현재시간을 서버측에서 메세지로 보내줍니다.
    • 자신이 보낸 메세지에 서버측에서 답이 올 때도 항상 현재시간을 10초에 한번씩 뿌려주는 건 멈추지 않습니다.