今天遇到了一個廣告網絡比較現實的需求,如下:
最為一個廣告服務端,可以從publisher的app接收到很多的加載廣告的請求。。。這個時候可以將這些請求的數據發給一些中間的機構(exchange),然后由他們返回廣告的數據。。。因為請求量較大,而且要保證延遲不能太高,所以這里與這些中間機構進行通信的時候就只能采用長連接的方式了,不能每次請求都生成一次連接來進行http請求。。。
其實以前一般用netty開發都是作為服務端來的大致思路如下:
(1)創建一個eventLoopGroup,用于維護nio的io事件
(2)創建一個niosocketchanel,然后將其注冊到eventLoopGroup上面去,并未channel設置初始化的handler
(3)調用channel的connect方法發起與遠端的連接請求
(4)當鏈接建立以后,剛剛提到的初始化handler將會用于響應,為channel添加http的decode與encode的handler。。
(5)最后就可以開始進行http通信了。。
當然這里就不要主動的去斷開channel了,斷開還是讓對方的服務器去做吧,或者超時,或者什么的。。反正客戶端不會主動斷開。。。
其實只要上面的步驟想出來,接下來寫代碼用netty來實現一個基于長連接的http客戶端還算是很簡單 的。。直接上代碼吧:
[java] view plain copypackage fjs;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
public class Fjs {
public static AtomicInteger number = new AtomicInteger(0);
public static AtomicLong time = new AtomicLong(0);
public static void doIt(Channel channel) {
if (number.get() 《 50) {
number.incrementAndGet();
time.set(System.currentTimeMillis());
QueryStringEncoder encoder = new QueryStringEncoder(“http://www.baidu.com/oapi/reqAd.jsp?pub=923875870&adspace=65826983&adcount=1&response=HTML&devip=22.56.22.66&user=900&format=IMG&position=top&height=&width=&device=Mozilla%2F5.0%20%28Linux%3B%20Android%204.2.1%3B%20en-us%3B%20Nexus%204%20Build%2FJOP40D%29%20AppleWebKit%2F535.19%20%28KHTML%2C%20like%20Gecko%29%20Chrome%2F18.0.1025.166%20Mobile%20Safari%2F535.19&beacon=TRUE&phpsnip=104”);
URI uriGet = null;
try {
uriGet = new URI(encoder.toString());
} catch (URISyntaxException e) {
System.out.println(“我擦,,,,”);
}
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriGet.toASCIIString());
channel.pipeline().write(request);
channel.flush();
} else {
System.out.println(“over”);
}
}
public static void main(String args[]) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
NioSocketChannel channel = new NioSocketChannel(); //創建一個channel,待會用它來發起鏈接
channel.pipeline().addFirst(new InitHandler()); //為這個channel添加一個初始化的handler,用于響應待會channel建立成功
group.register(channel); //注冊這個channel
channel.connect(new InetSocketAddress(“www.baidu.com”, 80)); //調用connect方法
Thread.currentThread().sleep(Long.MAX_VALUE);
}
public static class InitHandler implements ChannelInboundHandler {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
}
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
// 當連接建立成功之后會調用這個方法初始化channel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.channel().pipeline().remove(this); //嗯,當前這個handler對這個channel就算是沒有用了,可以移除了。。。
ctx.channel().pipeline().addFirst(new HttpClientCodec()); //添加一個http協議的encoder與decoder
ctx.channel().pipeline().addLast(new ReponseHandler()); //添加用于處理http返回信息的handler
Fjs.doIt(ctx.channel());
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println(“disconnect ” + System.currentTimeMillis() / 1000);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
System.out.println(“read ” + System.currentTimeMillis() / 1000);
}
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
}
public void channelWritabilityChanged(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
System.out.println(“error ” + System.currentTimeMillis() / 1000);
}
}
}
這部分的內容基本上就囊括了上面提到的所有的步驟。。。而且寫了一個發起http請求的靜態方法。。。到這里基本上一個基于長連接的http客戶端就算差不多了。。。最后再來一個響應httpresponse的handler吧:
[java] view plain copypackage fjs;
import java.nio.charset.Charset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
public class ReponseHandler extends ChannelInboundHandlerAdapter{
ByteBuf buf = Unpooled.buffer();
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
DefaultHttpResponse response = (DefaultHttpResponse)msg;
}
if (msg instanceof HttpContent) {
DefaultLastHttpContent chunk = (DefaultLastHttpContent)msg;
buf.writeBytes(chunk.content());
if (chunk instanceof LastHttpContent) {
long now = System.currentTimeMillis();
long before = Fjs.time.get();
System.out.println(((double) now - (double)before) / 1000);
String xml = buf.toString(Charset.forName(“UTF-8”));
buf.clear();
Fjs.doIt(ctx.channel());
}
}
}
}
評論
查看更多