首页 > 编程 > Java > 正文

java使用多线程读取超大文件

2019-11-26 08:39:43
字体:
来源:转载
供稿:网友

接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

  (文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.Observable; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午3:12 * 读取文件 */public class ReadFile extends Observable {  private int bufSize = 1024; // 换行符 private byte key = "/n".getBytes()[0]; // 当前行数 private long lineNum = 0; // 文件编码,默认为gb2312 private String encode = "gb2312"; // 具体业务逻辑监听器 private ReaderFileListener readerListener;  public void setEncode(String encode) { this.encode = encode; }  public void setReaderListener(ReaderFileListener readerListener) { this.readerListener = readerListener; }  /** * 获取准确开始位置 * @param file * @param position * @return * @throws Exception */ public long getStartNum(File file, long position) throws Exception { long startNum = position; FileChannel fcin = new RandomAccessFile(file, "r").getChannel(); fcin.position(position); try {  int cache = 1024;  ByteBuffer rBuffer = ByteBuffer.allocate(cache);  // 每次读取的内容  byte[] bs = new byte[cache];  // 缓存  byte[] tempBs = new byte[0];  String line = "";  while (fcin.read(rBuffer) != -1) {  int rSize = rBuffer.position();  rBuffer.rewind();  rBuffer.get(bs);  rBuffer.clear();  byte[] newStrByte = bs;  // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面  if (null != tempBs) {   int tL = tempBs.length;   newStrByte = new byte[rSize + tL];   System.arraycopy(tempBs, 0, newStrByte, 0, tL);   System.arraycopy(bs, 0, newStrByte, tL, rSize);  }  // 获取开始位置之后的第一个换行符  int endIndex = indexOf(newStrByte, 0);  if (endIndex != -1) {   return startNum + endIndex;  }  tempBs = substring(newStrByte, 0, newStrByte.length);  startNum += 1024;  } } catch (Exception e) {  e.printStackTrace(); } finally {  fcin.close(); } return position; }  /** * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾 * @param fullPath * @param start * @param end * @throws Exception */ public void readFileByLine(String fullPath, long start, long end) throws Exception { File fin = new File(fullPath); if (fin.exists()) {  FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();  fcin.position(start);  try {  ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);  // 每次读取的内容  byte[] bs = new byte[bufSize];  // 缓存  byte[] tempBs = new byte[0];  String line = "";  // 当前读取文件位置  long nowCur = start;  while (fcin.read(rBuffer) != -1) {   nowCur += bufSize;    int rSize = rBuffer.position();   rBuffer.rewind();   rBuffer.get(bs);   rBuffer.clear();   byte[] newStrByte = bs;   // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面   if (null != tempBs) {   int tL = tempBs.length;   newStrByte = new byte[rSize + tL];   System.arraycopy(tempBs, 0, newStrByte, 0, tL);   System.arraycopy(bs, 0, newStrByte, tL, rSize);   }   // 是否已经读到最后一位   boolean isEnd = false;   // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置   if (end > 0 && nowCur > end) {   // 缓存长度 - 当前已经读取位数 - 最后位数   int l = newStrByte.length - (int) (nowCur - end);   newStrByte = substring(newStrByte, 0, l);   isEnd = true;   }   int fromIndex = 0;   int endIndex = 0;   // 每次读一行内容,以 key(默认为/n) 作为结束符   while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {   byte[] bLine = substring(newStrByte, fromIndex, endIndex);   line = new String(bLine, 0, bLine.length, encode);   lineNum++;   // 输出一行内容,处理方式由调用方提供   readerListener.outLine(line.trim(), lineNum, false);   fromIndex = endIndex + 1;   }   // 将未读取完成的内容放到缓存中   tempBs = substring(newStrByte, fromIndex, newStrByte.length);   if (isEnd) {   break;   }  }  // 将剩下的最后内容作为一行,输出,并指明这是最后一行  String lineStr = new String(tempBs, 0, tempBs.length, encode);  readerListener.outLine(lineStr.trim(), lineNum, true);  } catch (Exception e) {  e.printStackTrace();  } finally {  fcin.close();  }  } else {  throw new FileNotFoundException("没有找到文件:" + fullPath); } // 通知观察者,当前工作已经完成 setChanged(); notifyObservers(start+"-"+end); }  /** * 查找一个byte[]从指定位置之后的一个换行符位置 * * @param src * @param fromIndex * @return * @throws Exception */ private int indexOf(byte[] src, int fromIndex) throws Exception {  for (int i = fromIndex; i < src.length; i++) {  if (src[i] == key) {  return i;  } } return -1; }  /** * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[] * * @param src * @param fromIndex * @param endIndex * @return * @throws Exception */ private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception { int size = endIndex - fromIndex; byte[] ret = new byte[size]; System.arraycopy(src, fromIndex, ret, 0, size); return ret; } }

读文件线程

/** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午4:50 * To change this template use File | Settings | File Templates. */public class ReadFileThread extends Thread {  private ReaderFileListener processPoiDataListeners; private String filePath; private long start; private long end;  public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) { this.setName(this.getName()+"-ReadFileThread"); this.start = start; this.end = end; this.filePath = file; this.processPoiDataListeners = processPoiDataListeners; }  @Override public void run() { ReadFile readFile = new ReadFile(); readFile.setReaderListener(processPoiDataListeners); readFile.setEncode(processPoiDataListeners.getEncode());// readFile.addObserver(); try {  readFile.readFileByLine(filePath, start, end + 1); } catch (Exception e) {  e.printStackTrace(); } }}

具体业务逻辑监听

/** * Created with Okey * User: Okey * Date: 13-3-14 * Time: 下午3:19 * NIO逐行读数据回调方法 */public abstract class ReaderFileListener {  // 一次读取行数,默认为500 private int readColNum = 500;  private String encode;  private List<String> list = new ArrayList<String>();  /** * 设置一次读取行数 * @param readColNum */ protected void setReadColNum(int readColNum) { this.readColNum = readColNum; }  public String getEncode() { return encode; }  public void setEncode(String encode) { this.encode = encode; }  /** * 每读取到一行数据,添加到缓存中 * @param lineStr 读取到的数据 * @param lineNum 行号 * @param over 是否读取完成 * @throws Exception */ public void outLine(String lineStr, long lineNum, boolean over) throws Exception { if(null != lineStr)  list.add(lineStr); if (!over && (lineNum % readColNum == 0)) {  output(list);  list.clear(); } else if (over) {  output(list);  list.clear(); } }  /** * 批量输出 * * @param stringList * @throws Exception */ public abstract void output(List<String> stringList) throws Exception; }

线程调度

import java.io.File;import java.io.FileInputStream;import java.io.IOException; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-1 * Time: 下午6:03 * To change this template use File | Settings | File Templates. */public class BuildData { public static void main(String[] args) throws Exception { File file = new File("E://1396341974289.csv"); FileInputStream fis = null; try {  ReadFile readFile = new ReadFile();  fis = new FileInputStream(file);  int available = fis.available();  int maxThreadNum = 50;  // 线程粗略开始位置  int i = available / maxThreadNum;  for (int j = 0; j < maxThreadNum; j++) {  // 计算精确开始位置  long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);  long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;  // 具体监听实现  ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");  new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();  } } catch (IOException e) {  e.printStackTrace(); } catch (Exception e) {  e.printStackTrace(); } }}

现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表