MichaelFreeman

netty学习

Channel

一个到实体的开放连接,如读操作和写操作。可以看作是出站和入站的载体。

source code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

/**
* Returns the globally unique identifier of this {@link Channel}.
*/
ChannelId id();

/**
* Return the {@link EventLoop} this {@link Channel} was registered to.
*/
EventLoop eventLoop();

/**
* Returns the parent of this channel.
*
* @return the parent channel.
* {@code null} if this channel does not have a parent channel.
*/
Channel parent();

/**
* Returns the configuration of this channel.
*/
ChannelConfig config();

/**
* Returns {@code true} if the {@link Channel} is open and may get active later
*/
boolean isOpen();

/**
* Returns {@code true} if the {@link Channel} is registered with an {@link EventLoop}.
*/
boolean isRegistered();

/**
* Return {@code true} if the {@link Channel} is active and so connected.
*/
boolean isActive();

/**
* Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.
*/
ChannelMetadata metadata();

/**
* Returns the local address where this channel is bound to. The returned
* {@link SocketAddress} is supposed to be down-cast into more concrete
* type such as {@link InetSocketAddress} to retrieve the detailed
* information.
*
* @return the local address of this channel.
* {@code null} if this channel is not bound.
*/
SocketAddress localAddress();

/**
* Returns the remote address where this channel is connected to. The
* returned {@link SocketAddress} is supposed to be down-cast into more
* concrete type such as {@link InetSocketAddress} to retrieve the detailed
* information.
*
* @return the remote address of this channel.
* {@code null} if this channel is not connected.
* If this channel is not connected but it can receive messages
* from arbitrary remote addresses (e.g. {@link DatagramChannel},
* use {@link DatagramPacket#recipient()} to determine
* the origination of the received message as this method will
* return {@code null}.
*/
SocketAddress remoteAddress();

/**
* Returns the {@link ChannelFuture} which will be notified when this
* channel is closed. This method always returns the same future instance.
*/
ChannelFuture closeFuture();

/**
* Returns {@code true} if and only if the I/O thread will perform the
* requested write operation immediately. Any write requests made when
* this method returns {@code false} are queued until the I/O thread is
* ready to process the queued write requests.
*/
boolean isWritable();

/**
* Get how many bytes can be written until {@link #isWritable()} returns {@code false}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code false} then 0.
*/
long bytesBeforeUnwritable();

/**
* Get how many bytes must be drained from underlying buffers until {@link #isWritable()} returns {@code true}.
* This quantity will always be non-negative. If {@link #isWritable()} is {@code true} then 0.
*/
long bytesBeforeWritable();

/**
* Returns an <em>internal-use-only</em> object that provides unsafe operations.
*/
Unsafe unsafe();

/**
* Return the assigned {@link ChannelPipeline}.
*/
ChannelPipeline pipeline();

/**
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
*/
ByteBufAllocator alloc();

@Override
Channel read();

@Override
Channel flush();
}

回调

一个回调可以理解为一个方法

demo

1
2
3
4
5
6
7
8
9
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
//当一个新的连接已经被建立时,channelActive(ChannelHandlerContext)将会被调用
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
System.out.println(
"Client " + ctx.channel().remoteAddress() + " connected");
}
}

Future

source code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface ChannelFuture extends Future<Void> {

/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();

@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;

@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;

@Override
ChannelFuture awaitUninterruptibly();

/**
* Returns {@code true} if this {@link ChannelFuture} is a void future and so not allow to call any of the
* following methods:
* <ul>
* <li>{@link #addListener(GenericFutureListener)}</li>
* <li>{@link #addListeners(GenericFutureListener[])}</li>
* <li>{@link #await()}</li>
* <li>{@link #await(long, TimeUnit)} ()}</li>
* <li>{@link #await(long)} ()}</li>
* <li>{@link #awaitUninterruptibly()}</li>
* <li>{@link #sync()}</li>
* <li>{@link #syncUninterruptibly()}</li>
* </ul>
*/
boolean isVoid();
}

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void addingChannelFutureListener(){
Channel channel = CHANNEL_FROM_SOMEWHERE; // get reference to pipeline;
ByteBuf someMessage = SOME_MSG_FROM_SOMEWHERE; // get reference to pipeline;
//...
io.netty.channel.ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(io.netty.channel.ChannelFuture f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}

事件和ChaneelHandler

入站事件

  • 连接已被激活或失活
  • 数据读取
  • 用户事件
  • 错误事件

出站事件

  • 打开或关闭远程节点的连接
  • 将数据写到或者冲刷到套接字

每个事件都被分配给ChannelHandler类中某个用户实现的方法。