1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j;
@Slf4j public class WebSocketServer {
public void start() { EventLoopGroup boss = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); EventLoopGroup worker = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try { ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast(new WebSocketFrameHandler()); } }); ChannelFuture f = b.bind(8000).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { log.error(e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }
public static void main(String[] args) { new WebSocketServer().start(); } }
@Slf4j class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) { if (frame instanceof TextWebSocketFrame) { String text = ((TextWebSocketFrame) frame).text(); log.info("收到文本消息: {}", text); ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端收到: " + text));
} else if (frame instanceof BinaryWebSocketFrame) { log.info("收到二进制消息, 长度={}", frame.content().readableBytes()); ctx.channel().writeAndFlush( new BinaryWebSocketFrame(frame.content().retain()) );
} else { log.debug("忽略的消息类型: {}", frame.getClass().getSimpleName()); } }
@Override public void handlerAdded(ChannelHandlerContext ctx) { Channel incoming = ctx.channel(); channels.add(incoming); log.info("新连接: {}", incoming.id().asShortText()); }
@Override public void handlerRemoved(ChannelHandlerContext ctx) { Channel outcoming = ctx.channel(); channels.remove(outcoming); log.info("断开连接: {}", ctx.channel().id().asShortText()); } }
|