限制内存的大文件内容排序 | LIXI.FUN
0%

限制内存的大文件内容排序

题目

有个文件,里面存的是 user 身份证号,文件有 4G,程序可以使用的内存只有 500M,重新排序之后输出到新文件

思路

  1. 大文件按行读取,分割成 xx 大小的文件(chunk)(内存可以放下的内容)
  2. 内存对每个 chunk 排序后写入单个 chunk file
  3. 每次读取每个 chunk 的一行比较,取出最小(大)值写入最终的文件

分析

  • 身份证号码,暂且认为是 18 位数字(或有 X)的字符串
  • 那么每一行就有 18 字节
  • 其实还要考虑每一行最后的 换行符 也是占存储空间的

构建 4G 的原始大文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.concurrent.ThreadLocalRandom;

/**
* 编译: javac -encoding utf-8 GenData.java
* 运行: java GenData
*/
public class GenData {

static long _B = 1;
static long _KB = _B * 1024;
static long _MB = _KB * 1024;
static long _GB = _MB * 1024;

public static void main(String[] args) {

long fileSize = 4 * _GB;
String fileName = "input.txt";

// 身份证按 18位的算
// 100000000000000000 18字节(B)
int digitalPerLine = 18;
System.out.println("digitalPerLine = " + digitalPerLine);

// 这里要注意存储到文件里面的时候换行符也是要计算存储容量的
long numberOfLines = fileSize / (digitalPerLine + System.lineSeparator().length()) + 1;
System.out.println("numberOfLines = " + numberOfLines);


try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
ThreadLocalRandom random = ThreadLocalRandom.current();

for (int i = 0; i < numberOfLines; i++) {
StringBuilder line = new StringBuilder();

// 避免第一位是 0,其实无所谓,最终是字符串形式的
line.append(random.nextInt(1, 10));
for (int j = 1; j < digitalPerLine; j++) {
line.append(random.nextInt(10));
}
writer.write(line.toString());
writer.newLine();
}
} catch (Exception ignore) {
}
}
}

拆分排序合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;

/**
* 编译: javac -encoding utf-8 Main.java
* 运行: java -Xmx500m Main 50
* 运行里面的 50 是 分块文件大小是 50M
*/
public class Main {

static long _B = 1;
static long _KB = _B * 1024;
static long _MB = _KB * 1024;
static long perChunkSize = 50 * _MB;

public static void main(String[] args) {
if (args.length != 0) {
perChunkSize = Integer.parseInt(args[0]) * _MB;
System.out.println("分块大小通过参数设置为: " + (perChunkSize / _MB) + " MB");
} else {
System.out.println("分块大小保持默认为: " + (perChunkSize / _MB) + " MB");
}

String oriFileName = "input.txt";
String outFileName = "out.txt";

List<String> chunkFileNames = divideIntoSortedChunks(oriFileName, perChunkSize);
mergeSortedChunks(chunkFileNames, outFileName);
}

private static List<String> divideIntoSortedChunks(String oriFileName, long availableMemory) {

List<String> chunkFileNames = new ArrayList<>();

List<String> tempChunk = new ArrayList<>();

try (BufferedReader reader = new BufferedReader(new FileReader(oriFileName))) {
int numberOfChunk = 0;
long tempChunkMemorySize = 0;

String line;
while ((line = reader.readLine()) != null) {

long lineSize = line.length() + System.lineSeparator().length();

if ((tempChunkMemorySize + lineSize >= availableMemory) && (!tempChunk.isEmpty())) {

System.out.println("已经收集到了完整的第 [" + numberOfChunk + "] 块");
String chunkFileName = sortAndWriteChunk(numberOfChunk, tempChunk);

chunkFileNames.add(chunkFileName);

tempChunk.clear();
tempChunkMemorySize = 0;
numberOfChunk++;
}
tempChunk.add(line);
tempChunkMemorySize += lineSize;
}

// 最后一个 chunk 如果有的话
if (!tempChunk.isEmpty()) {
String chunkFileName = sortAndWriteChunk(numberOfChunk, tempChunk);
chunkFileNames.add(chunkFileName);
}
System.out.println("共计处理 [" + (numberOfChunk + 1) + "] 块成功");

} catch (Exception ignore) {
}

return chunkFileNames;
}

private static String sortAndWriteChunk(int numberOfChunk, List<String> tempChunk) {
System.out.println("对第 [" + numberOfChunk + "] 开始排序");
Collections.sort(tempChunk);
System.out.println("对第 [" + numberOfChunk + "] 排序结束");
String chunkFileName = writeChunkToFile(tempChunk, numberOfChunk);

System.out.println("处理第 [" + numberOfChunk + "] 块成功");
return chunkFileName;
}

private static String getChunkFileName(int numberOfChunk) {
return "tempChunk" + numberOfChunk + ".txt";
}

private static String writeChunkToFile(List<String> chunk, int numberOfChunk) {
String chunkFileName = getChunkFileName(numberOfChunk);
try (BufferedWriter writer = new BufferedWriter(new FileWriter(chunkFileName))) {
for (String line : chunk) {
writer.write(line);
writer.newLine();
}
} catch (Exception ignore) {
}

return chunkFileName;
}

static class ChunkReader {
BufferedReader reader;
String currentLine;

public static ChunkReader init(BufferedReader reader) {
ChunkReader res = new ChunkReader();
res.reader = reader;
try {
res.currentLine = reader.readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
return res;
}
}

// 将排序后的小块合并成最终的输出文件
private static void mergeSortedChunks(List<String> chunkFileNames, String outputFile) {

System.out.println("开始合并排序");
PriorityQueue<ChunkReader> chunkReaderQueue = new PriorityQueue<>((o1, o2) -> {
if (o1.currentLine == null && o2.currentLine == null) {
return 0;
} else if (o1.currentLine == null) {
return 1;
} else if (o2.currentLine == null) {
return -1;
}
return o1.currentLine.compareTo(o2.currentLine);
});

List<ChunkReader> chunkReaders = new ArrayList<>();

try {
for (String chunkFileName : chunkFileNames) {
ChunkReader chunkReader = ChunkReader.init(new BufferedReader(new FileReader(chunkFileName)));
chunkReaders.add(chunkReader);
chunkReaderQueue.offer(chunkReader);
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFile))) {
while (!chunkReaderQueue.isEmpty()) {

ChunkReader chunkReader = chunkReaderQueue.poll();
if (chunkReader == null || chunkReader.reader == null) continue;

String line = chunkReader.currentLine;

if (line != null) {
writer.write(line);
writer.newLine();
chunkReader.currentLine = chunkReader.reader.readLine();
chunkReaderQueue.offer(chunkReader);
} else {
chunkReader.reader.close();
}
}

}

for (ChunkReader chunkReader : chunkReaders) {
chunkReader.reader.close();
}
} catch (Exception ignore) {
}

// 删除临时文件
for (String chunkFileName : chunkFileNames) {
File file = new File(chunkFileName);
if (!file.exists()) {
break;
}
file.delete();
}

System.out.println("结束合并排序");
}

}

问题

  • 按照 java -Xmx500m 去跑的话,设置的 chunk size 大于 100M 了,就 OOM 了,具体大于某个值,未测试
  • chunk size 设置成 50m 是可以跑完的
  • 用 jol 看了下,可能的原因是 java 内存利用率不高,例如内存对齐的空间浪费,小对象的对象头等等
  • 好吧,其实这儿我有点儿懵逼了。。。
觉得有收获就鼓励下作者吧