Mersenne Twister でランダムに行をシャッフル (2)
昨日の Mersenne Twister でランダムに行をシャッフル - ny23の日記の続き.折角なので,参考ページにあったアルゴリズムを実装してみた.ファイルの内容をランダムにばら撒き,ローカルで random_shuffle し,ランダムにファイルを選んで先頭から出力する.shuffling の厳密性は Random Permutations on Distributed, External and Hierarchical Memory のような手順で示せそう(Mersenne Twister でランダムに行をシャッフル (3) - ny23の日記で示して,更に無駄を省いて改良した).
// mt-shuf_.cc #include <unistd.h> #include <sys/resource.h> #include <cstdio> #include <cstdlib> #include <cstring> #include <vector> #include <algorithm> // random number generator #ifdef USE_MT #include <tr1/random> struct rand_ { std::tr1::variate_generator <std::tr1::mt19937, std::tr1::uniform_int <size_t> > gen; rand_ () : gen (std::tr1::mt19937 (), std::tr1::uniform_int <size_t> ()) {}; long operator () (long max) { return gen (max); } }; #else struct rand_ { long operator() (long max) const { return random () % max; } }; #endif #ifndef BUF_SIZE // default chunk size for shuffling (1M) #define BUF_SIZE (1 << 20) #endif #ifndef OPEN_MAX // default # file descriptors #define OPEN_MAX 20 #endif #ifndef MAX_LINE_LEN // maximum length of lines #define MAX_LINE_LEN 65536 #endif #ifndef TMP_DIR #define TMP_DIR "/tmp" // default directory to store temporary files #endif void shuffle_lines (FILE * fp, char * buf, size_t size, rand_ &gen) { // load (partial) data on memory std::fseek (fp, 0, SEEK_SET); // flush here std::fread (&buf[0], sizeof (char), size, fp); // shuffle std::vector <const char *> lns; for (char *p (&buf[0]), *end (&buf[size]); p != end; *p = '\0', ++p) { lns.push_back (p); while (*p != '\n') ++p; } std::random_shuffle (lns.begin (), lns.end (), gen); // overwrite & recover position std::fseek (fp, 0, SEEK_SET); for (std::vector <const char *>::iterator it = lns.begin (); it != lns.end (); ++it) std::fprintf (fp, "%s\n", *it); std::fseek (fp, 0, SEEK_SET); } int main (int argc, char** argv) { if (argc < 2) { std::fprintf (stderr, "Usage: %s [-S size] [-T tmp_dir] in\n", argv[0]); std::exit (1); } // random number generator rand_ gen; // handle options size_t buf_size = BUF_SIZE; const char * tmp_dir = TMP_DIR; char * in = 0; for (int i = 1; i < argc; ++i) { if (std::strcmp (argv[i], "-S") == 0) { buf_size = 1; char &c = argv[++i][std::strlen (argv[i])-1]; char * err; switch (c) { case 'G': buf_size <<= 10; case 'M': buf_size <<= 10; case 'K': buf_size <<= 10; c = '\0'; default: buf_size *= std::strtol (argv[i], &err, 10); if (*err) { std::fprintf (stderr, "invalid size in -S argument: %s\n", argv[i]); std::exit (1); } } } else if (std::strcmp (argv[i], "-T") == 0) { tmp_dir = argv[++i]; } else { in = argv[i]; } } if (! in) std::fprintf (stderr, "no file specified\n"); char tmp_fn[std::strlen (tmp_dir) + 12]; // divide & conquer // prepare bin FILE * fp = std::fopen (in, "rb"); if (! fp) { std::fprintf (stderr, "no such file: %s\n", in); std::exit (1); } struct rlimit rlim; size_t ntmp_limit = (getrlimit (RLIMIT_NOFILE, &rlim) == 0 ? rlim.rlim_cur : OPEN_MAX) - 4; // set # temporary files std::fseek (fp, 0, SEEK_END); size_t ntmp = std::min (std::ftell (fp) / buf_size + 1, ntmp_limit); std::fseek (fp, 0, SEEK_SET); std::vector <FILE*> tmpfps; tmpfps.reserve (ntmp); for (size_t i = 0; i < ntmp; ++i) { std::sprintf (&tmp_fn[0], "%s/shufXXXXXX", tmp_dir); tmpfps.push_back (fdopen (mkstemp (tmp_fn), "w+")); unlink (tmp_fn); } std::fclose (fp); // divide char line[MAX_LINE_LEN]; fp = std::fopen (in, "r"); while ((std::fgets (&line[0], MAX_LINE_LEN, fp)) != NULL) { FILE * writer = tmpfps[gen (ntmp)]; size_t read = std::strlen (&line[0]); if (line[read-1] != '\n') { if (std::feof (fp)) { std::fwrite (&line[0], sizeof (char), read, writer); std::fprintf (writer, "\n"); std::fprintf (stderr, "WARNING: line feeder is appended\n"); } else { std::fprintf (stderr, "ERROR: Buffer Overflow; increase MAX_LINE_LEN\n"); std::exit (1); } } else { std::fwrite (&line[0], sizeof (char), read, writer); } } // conquer size_t size_max = 0; for (std::vector <FILE*>::iterator it = tmpfps.begin (); it != tmpfps.end (); ++it) size_max = std::max (size_max, static_cast <size_t> (std::ftell (*it))); char * buf = new char[size_max]; for (std::vector <FILE*>::iterator it = tmpfps.begin (); it != tmpfps.end (); ++it) shuffle_lines (*it, &buf[0], std::ftell (*it), gen); delete [] buf; while (! tmpfps.empty ()) { size_t bin = gen (tmpfps.size ()); FILE * reader = tmpfps[bin]; if (std::fgets (&line[0], MAX_LINE_LEN, reader) != NULL) { std::fwrite (&line[0], sizeof (char), std::strlen (line), stdout); } else { std::fclose (reader); tmpfps.erase (tmpfps.begin () + bin, tmpfps.begin () + bin + 1); } } return 0; }; // g++ -DUSE_MT -O2 -march=core2 -m64 -o mt-shuf_ mt-shuf_.cc
同時に開けるファイル数に上限 (`ulimit -n` - 4; STDIN, STDOUT, STDERR & reader/writer) があるので,一時ファイル数に制限を与えている(もっとスマートなやり方があるとは思うが).さっさと実験してしまおう(以下 mt-shuf より右はデータサイズ以上のメモリを消費).一時ファイルが(可能な限り)残らないように改良.
N=#lines (D=size) | sort -R | sort -n | mt-shuf_| mt-shuf | GNU shuf | ----------------------------------------------------------------------| 10^5 ( 2.35MB) | 0.96s | 0.22s | 0.07s | 0.03s | 0.03s | 10^6 ( 23.35MB) | 12.43s | 2.69s | 0.64s | 0.34s | 0.46s | 10^7 ( 234.08MB) | 146.43s | 30.98s | 6.60s | 4.30s | 5.94s | 10^8 (2,341.42MB) | 1750.84s | 385.07s | 126.59s | 51.02s | NA |
10^8 で速度が落ちているのは,ulimit -n が 256 なため一時ファイルのサイズが大きくなったからのようだ (ulimit -n 1024 すると 72.08s).これなら sort の代替として十分使えるかな.