Netty 基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

/**
*
* @author yingzheng
* @date 2025-09-18
*/
@Slf4j
public class WebSocketServer {

public void start() {
EventLoopGroup boss = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
EventLoopGroup worker = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());

try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketFrameHandler());
}
});
ChannelFuture f = b.bind(8000).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

public static void main(String[] args) {
new WebSocketServer().start();
}
}

@Slf4j
class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof TextWebSocketFrame) {
// 处理文本消息
String text = ((TextWebSocketFrame) frame).text();
log.info("收到文本消息: {}", text);
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到: " + text));

} else if (frame instanceof BinaryWebSocketFrame) {
// 处理二进制消息
log.info("收到二进制消息, 长度={}", frame.content().readableBytes());
ctx.channel().writeAndFlush(
new BinaryWebSocketFrame(frame.content().retain())
);

} else {
// 其他类型(Ping、Pong、Close)交由 WebSocketServerProtocolHandler 自动处理
log.debug("忽略的消息类型: {}", frame.getClass().getSimpleName());
}
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
Channel incoming = ctx.channel();
channels.add(incoming);
log.info("新连接: {}", incoming.id().asShortText());
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
Channel outcoming = ctx.channel();
channels.remove(outcoming);
log.info("断开连接: {}", ctx.channel().id().asShortText());
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Date;

/**
*
* @author yingzheng
* @date 2025-09-14
*/

class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ctx.close();
}
}

public class TimeClient {
public static void main(String[] args) throws Exception{
String host = "127.0.0.1";
int port = 8000;

EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());

try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
*
* @author yingzheng
* @date 2025-09-14
*/

class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.writeAndFlush(msg);
// ByteBuf in = (ByteBuf) msg;
// try {
// System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII));
// } finally {
// ReferenceCountUtil.release(msg);
// }
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
cause.printStackTrace();
ctx.close();
}
}

class TimeHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);
f.addListener((ChannelFutureListener) future -> {
assert f == future;
ctx.close();
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
cause.printStackTrace();
}
}

public class DiscardServer {
private final int port;

public DiscardServer(int port) {
this.port = port;
}

public void run() throws Exception {
EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeHandler());
}
});
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
DiscardServer server = new DiscardServer(8000);
try {
server.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}