網絡編程:Java數據報之失序和丟包

發表于:2007-06-10來源:作者:點擊數: 標簽:
習慣了TCP編程,認為UDP可以包辦這些問題是錯誤的。一個UDP應用程序要承擔 可靠性 方面的全部工作,包括報文的丟失、重復、時延、亂序以及連接失效等問題。 通常我們在可靠性好,傳輸時延小的局域網上 開發 測試 ,一些問題不容易暴露,但在大型互聯網上卻會

 

 

 

 

 

 

 

 

習慣了TCP編程,認為UDP可以包辦這些問題是錯誤的。一個UDP應用程序要承擔可靠性方面的全部工作,包括報文的丟失、重復、時延、亂序以及連接失效等問題。

  通常我們在可靠性好,傳輸時延小的局域網上開發測試,一些問題不容易暴露,但在大型互聯網上卻會出現錯誤。

  UDP協議把遞送的可靠性責任推到了上層即應用層,下面簡單編寫了幾個類來專門處理兩個問題:亂序和丟包。

  四個類:DataPacket 類,PacketHeader類,PacketBody類 ,DataEntry類,位于同一個文件DataPacket .java中。

  DataPacket 類相當于一個門面模式,提供給外部使用,通信數據也在這個類中處理。

package com.skysoft.pcks;

import java.io.*;
import java.net.*;
import java.util.*;

public class  DataPacket {
  InputStream is;
  OutputStream os;
  PacketHeader header;
  PacketBody body;
  ArrayList al;
  public static final int DataSwapSize = 64532;

  /**
   * 在接收數據報使用
   */
  public DataPacket() {
    header = new PacketHeader();
    body = new PacketBody();
    al = new ArrayList();
  }
  /**
   * 在發送數據報時使用,它調用報文分割操作.
   * @param file String  硬盤文件
   */
  public DataPacket(String file) {
    this();
    try {
      is = new FileInputStream(file);
      header.CalcHeaderInfo(is.available());
      this.madeBody();
      is.close();
      //this.Gereratedata();
    }
    catch (FileNotFoundException ex) {
      ex.printStackTrace();
    }
    catch (IOException ex1) {
      ex1.printStackTrace();
    }
  }
  /**
   * 在發送數據報時使用,它調用報文分割操作.
   * @param url URL url地址
   */
  public DataPacket(URL url) {
    this();
    try {
      //is = url.openStream();
      URLConnection conn=url.openConnection();
      is=conn.getInputStream();
      int total=conn.getContentLength();
      header.CalcHeaderInfo(total);
      this.madeBody();
      //System.out.println(total+":"+total);
      is.close();
    }
    catch (IOException ex) {
      ex.printStackTrace();
    }
  }
  /**
   * 為發送構造分組,使用PackageHeader處理了報頭格式,并為分組編序號.
   */
  private void madeBody() {
    al.clear();
    byte[] buffer;
    DataEntry de;
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        //is.skip(i * body.BODY_BUFFER_SIZE);
        header.ArrageSort(i);
        de = new DataEntry(PacketBody.BODY_BUFFER_SIZE);
        de.setSn(i);
        de.setStreamsize(header.getStreamsize());
        de.setFragmentcounter(header.getFragmentcounter());
        if (header.isWTailFragment(i)) {
          buffer = new byte[header.getMinfragment()];
          is.read(buffer, 0, buffer.length);
          header.setActByteSize(header.getMinfragment());
          de.setActByteSize(header.getMinfragment());
        }
        else {
          buffer = new byte[body.BODY_BUFFER_SIZE];
          is.read(buffer, 0, buffer.length);
        }
        //System.out.println("length-------"+i+" "+body.getBody().length+" "+header.getMinfragment());
        body.setBody(buffer);
        //System.out.println("length:" + i + " " + header.toString());
        bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
        bos.write(body.getBody(), 0, body.getBody().length);
        de.setBytes(bos.toByteArray());
        al.add(de);
      }
      catch (IOException ex) {
        ex.printStackTrace();
      }
    }
  }
  /**
   * 為發送構造分組,沒有考慮報頭格式,也沒有為分組編序號.
   */
  private void madeBody1() {
    al.clear();
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        if (header.isWTailFragment(i))
          is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                  header.getMinfragment());
        else
          is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                  body.BODY_BUFFER_SIZE);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
        bos.write(body.getBody(), header.HEADER_BUFFER_SIZE,
                  body.getBody().length);
        al.add(bos);
      }
      catch (IOException ex) {
        ex.printStackTrace();
      }
    }
  }
  /**
   * 在接收到報文后,對此報文執行組裝,并處理報文丟失和亂序情況.
   * @param b1 byte[]
   */
  public void Add(byte[] b1) {
    byte[] buffer = (byte[]) b1.clone();
    handlerText(buffer);
    DataEntry de = new DataEntry(buffer, header.getActByteSize());
    de.setSn(header.getSn());
    de.setStreamsize(header.getStreamsize());
    de.setFragmentcounter(header.getFragmentcounter());
    al.add(de);
  }
  private void handlerText(byte[] buffer) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    baos.write(buffer, 0, header.HEADER_BUFFER_SIZE);
    byte[] b=new byte[header.HEADER_BUFFER_SIZE];
    System.arraycopy(buffer,0,b,0,b.length);
    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    InputStreamReader isr = new InputStreamReader(bais);
    BufferedReader br = new BufferedReader(isr);
    try {
      header = new PacketHeader(br.readLine());
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
  }
 
  private String calFileSize(int size) {
    return size / 1024 + "K";
  }

  public ArrayList getDataPackets() {
    return al;
  }
/**
 * 是否接收完畢,通過序號是否等于最大段數來判斷,這也許有問題,比如,正好是最后一個段丟失了,這樣
 * 這個包整個就丟失了.
 * @return
 */
  public boolean isFull() {
    return this.header.getSn() == this.header.getFragmentcounter() - 1 ? true : false;
  }
/**
 * 判斷是否只有一個段.
 * @return
 */
  public boolean isZero() {
    return this.header.getSn() == 0 ? true : false;
  }
/**
 * 該函數執行報文組裝,不考慮丟失的報文.
 * @return
 */
  private ByteArrayOutputStream fetchDataPackets() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer = null;
    DataEntry de;
    for (int i = 0; i < al.size(); i++) {
      try {
        de = this.getSnData(i);
        buffer = de.getByte();
        if (header.getStreamsize() == de.getStreamsize()) {
          bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize());
          System.out.println(de.toString() + " -- fetchDataPackets");
        }
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
    return bos;
  }

  /**
   * 該函數執行報文組裝,對于丟失的報文,寫入空報文.
   * @return ByteArrayOutputStream
   */
  private ByteArrayOutputStream fetchDataPackets_sn() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer;
    DataEntry de;
    for (int i = 0; i < header.getFragmentcounter(); i++) {
      try {
        de = this.getSnData(i);
        if (de == null) {
          de = seachDeData(i);
        }
        buffer = de.getByte();
        //System.out.println(de.getSn() + ":" + i);
        //handlerText(buffer);
        //bos.write(buffer, header.HEADER_BUFFER_SIZE,
        //          buffer.length - header.HEADER_BUFFER_SIZE);
        if (header.getStreamsize() == de.getStreamsize()) {
          bos.write(de.getByte(), header.HEADER_BUFFER_SIZE,
                    de.getActByteSize());
          //System.out.println(de.toString());
        }
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
    return bos;
  }

  /**
   * 對緩沖的數據包進行排序處理,即按順序提取同一幀的數據,如果沒有找到該序號的幀,則返回空值.
   * @param sn int 要找的幀序號.
   * @return DataEntry
   */
  private DataEntry getSnData(int sn) {
    DataEntry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      if (header.getStreamsize() == de.getStreamsize()) {
        if (sn == de.getSn())
          break;
        else
          de = null;
      }
    }
    return de;
  }

  /**
   * 按序號開始向前或者是向后尋找最近的幀片段,日后可以增加請求重發功能,通過開一個通信連接.
   * @param sn int
   * @return DataEntry
   */
  private DataEntry seachDeData(int sn) {
    DataEntry de = null;
    int initvalue, minvalue = 10000;
    DataEntry back, fore = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      if (header.getStreamsize() == de.getStreamsize()) {
        initvalue = Math.abs(de.getSn() - sn);
        if (de.getFragmentcounter() != de.getSn() && initvalue < minvalue) {
          minvalue = initvalue;
          fore = de;
        }
      }
    }
    return fore;
  }

  /**
   * 除去最后一幀外,隨機抽取一幀.
   * @return DataEntry
   */
  private DataEntry seachDeData() {
    DataEntry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      System.out.println("sky ::::" + de.getFragmentcounter() + ":" + de.getSn() +
                         ":" + i);
      if (header.getStreamsize() == de.getStreamsize()) {
        if (de.getFragmentcounter() != de.getSn()) {
          break;
        }
      }
    }
    return de;
  }
  /**
   * 生成組裝完的結果數據.因為用圖像來做測試,所以令其返回圖像.
   * @return Image
   */
  public java.awt.Image Gereratedata() {
     ByteArrayInputStream bis;
     java.awt.image.BufferedImage bimage = null;
     try {
       byte[] b = fetchDataPackets_sn().toByteArray();
       //fetchDataPackets_old1()
       bis = new ByteArrayInputStream(b);
       bimage = javax.imageio.ImageIO.read(bis);

     }
     catch (Exception ex1) {
       ex1.printStackTrace();
     }
     return bimage;
  }

  public static void main(String args[]) {
    DataPacket dp = new DataPacket("e:\\nature\\14.jpg");
  }
}
/**
 * 數據實體,充當臨時處理場所.
 * @author Administrator
 *
 */
class DataEntry {
  byte[] bytes;
  int fragmentcounter, sn, actbytesize;
  long streamsize;
  int minfragment;

  public DataEntry() {

  }

  public DataEntry(int size) {
    this.actbytesize = size;
  }

  public DataEntry(byte[] b, int i) {
    this.bytes = b;
    this.actbytesize = i;
  }

  public byte[] getByte() {
    return this.bytes;
  }

  public void setBytes(byte[] b) {
    this.bytes = b;
  }

  public void setStreamsize(long size) {
    this.streamsize = size;
  }

  public long getStreamsize() {
    return this.streamsize;
  }

  public int getMinfragment() {
    return minfragment;
  }

  public synchronized void setSn(int i) {
    this.sn = i;
  }

  public synchronized int getSn() {
    return sn;
  }

  public synchronized int getFragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setFragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setActByteSize(int size) {
    actbytesize = size;
  }

  public int getActByteSize() {
    return actbytesize;
  }

  public String toString() {
    return this.streamsize + "::" + this.fragmentcounter + "::" + this.sn +
        "::" + this.actbytesize + " recv DataEntry";
  }
}
/**
 * 報頭,處理報頭格式
 * @author Administrator
 *
 */
class PacketHeader implements Serializable{
  public static final int HEADER_BUFFER_SIZE = 1024;
  int fragmentcounter, sn;
  int actbytesize = PacketBody.BODY_BUFFER_SIZE;
  byte[] header; //= new byte[HEADER_BUFFER_SIZE];
  long streamsize;
  int minfragment;

  public PacketHeader() {

  }

  public PacketHeader(long l) {
    this.setStreamsize(l);

  }

  public PacketHeader(String s) {
    String[] tm = s.split("::");
    this.setActByteSize(Integer.parseInt(tm[3]));
    this.setSn(Integer.parseInt(tm[2]));
    this.setFragmentcounter(Integer.parseInt(tm[1]));
    this.setStreamsize(Long.parseLong(tm[0]));
  }

  /**
   * 根據文件的段的順序生成數據頭.
   * @param sn 文件序列
   */
  public void ArrageSort(int sn) {
    this.setSn(sn);
    this.setByte();
  }

  public void CalcHeaderInfo(long l) {
    this.setStreamsize(l);
    CalcHeaderInfo();
  }
  /**
   * 計算流要被分成的片段數量,并得出最小片段余量.
   */
  public void CalcHeaderInfo() {
    fragmentcounter = Math.round( (float) streamsize /
                                 PacketBody.BODY_BUFFER_SIZE);
    float critical = (float) streamsize / PacketBody.BODY_BUFFER_SIZE;
    if (critical - fragmentcounter < 0.5 && critical - fragmentcounter > 0)
      fragmentcounter++;
    minfragment = (int) (streamsize % PacketBody.BODY_BUFFER_SIZE);
  }

  public byte[] getHeader() {
    Long it = new Long(this.streamsize);
    return new byte[] {it.byteValue()};
  }

  public byte[] getByte() {
    return header; //this.toString().getBytes();
  }
  /**
   * 生成報頭字節,首先取得數據包頭 流尺寸::段片數::段順序::段實際尺寸 的字節形式,
   * 然后加入回車換行符號,對于1024字節中剩余的部分一律寫入元素為0的字節數組.
   */
  public void setByte() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer = this.toByte();
    try {
      bos.write(buffer);
      bos.write("\r\n".getBytes());
      bos.write(new byte[PacketHeader.HEADER_BUFFER_SIZE - buffer.length], 0,
                PacketHeader.HEADER_BUFFER_SIZE - buffer.length);
      header = bos.toByteArray();
    }
    catch (IOException ex) {
      ex.printStackTrace();
    }
  }

  public void setStreamsize(long size) {
    this.streamsize = size;
  }

  public long getStreamsize() {
    return this.streamsize;
  }

  public int getMinfragment() {
    return minfragment;
  }

  public synchronized void setSn(int i) {
    this.sn = i;
  }

  public int getSn() {
    return sn;
  }

  public int getFragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setFragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setActByteSize(int size) {
    actbytesize = size;
    setByte();
  }

  public int getActByteSize() {
    return actbytesize;
  }
  /**
   * 數據包頭的格式為:流尺寸::段片數::段順序::段實際尺寸
   * 報頭字節長度是可變化的,比如,可以加入流的具體信息如:流所屬文件的名稱,文件類型以及一些其他信息.
   * @return String
   */
  public String toString() {
    return streamsize + "::" + this.fragmentcounter + "::" + this.getSn() +
        "::" + this.getActByteSize();
  }

  public byte[] toByte() {
    return this.toString().getBytes();
  }
  /**
   * 是否為尾段
   * @param i int
   * @return boolean
   */
  public boolean isWTailFragment(int i) {
    return (i == fragmentcounter - 1) ? true : false;
  }

}
/**
 * 用戶數據區
 * @author Administrator
 *
 */
class PacketBody implements Serializable{
  public static final int BODY_BUFFER_SIZE = 63508; //65508
  byte[] body;

  public PacketBody() {
  }

  public void setBody(byte[] b) {
    this.body = b;
  }

  public byte[] getBody() {
    return body;
  }
}

  這個數據處理類,將在接下來使用





原文轉自:http://www.anti-gravitydesign.com

評論列表(網友評論僅供網友表達個人看法,并不表明本站同意其觀點或證實其描述)
国产97人人超碰caoprom_尤物国产在线一区手机播放_精品国产一区二区三_色天使久久综合给合久久97