Mersenne Twister でランダムに行をシャッフル (2)

昨日の Mersenne Twister でランダムに行をシャッフル - ny23の日記の続き.折角なので,参考ページにあったアルゴリズムを実装してみた.ファイルの内容をランダムにばら撒き,ローカルで random_shuffle し,ランダムにファイルを選んで先頭から出力する.shuffling の厳密性は Random Permutations on Distributed, External and Hierarchical Memory のような手順で示せそう(Mersenne Twister でランダムに行をシャッフル (3) - ny23の日記で示して,更に無駄を省いて改良した).

// mt-shuf_.cc
#include <unistd.h>
#include <sys/resource.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <vector>
#include <algorithm>

// random number generator
#ifdef USE_MT
#include <tr1/random>
struct rand_ {
  std::tr1::variate_generator <std::tr1::mt19937,
                               std::tr1::uniform_int <size_t> > gen;
  rand_ () : gen (std::tr1::mt19937 (), std::tr1::uniform_int <size_t> ()) {};
  long operator () (long max) { return gen (max); }
};
#else
struct rand_ { long operator() (long max) const { return random () % max; } };
#endif

#ifndef BUF_SIZE       // default chunk size for shuffling (1M)
#define BUF_SIZE (1 << 20)
#endif

#ifndef OPEN_MAX       // default # file descriptors
#define OPEN_MAX 20
#endif

#ifndef MAX_LINE_LEN   // maximum length of lines
#define MAX_LINE_LEN 65536
#endif

#ifndef TMP_DIR
#define TMP_DIR "/tmp" // default directory to store temporary files
#endif

void shuffle_lines (FILE * fp, char * buf, size_t size, rand_ &gen) {
  // load (partial) data on memory
  std::fseek (fp, 0, SEEK_SET); // flush here
  std::fread (&buf[0], sizeof (char), size, fp);
  // shuffle
  std::vector <const char *> lns;
  for (char *p (&buf[0]), *end (&buf[size]); p != end; *p = '\0', ++p) {
    lns.push_back (p);
    while (*p != '\n') ++p;
  }
  std::random_shuffle (lns.begin (), lns.end (), gen);
  // overwrite & recover position
  std::fseek (fp, 0, SEEK_SET);
  for (std::vector <const char *>::iterator it = lns.begin ();
       it != lns.end (); ++it)
    std::fprintf (fp, "%s\n", *it);
  std::fseek (fp, 0, SEEK_SET);
}

int main (int argc, char** argv) {
  if (argc < 2) {
    std::fprintf (stderr, "Usage: %s [-S size] [-T tmp_dir] in\n", argv[0]);
    std::exit (1);
  }
  // random number generator
  rand_ gen;
  // handle options
  size_t       buf_size = BUF_SIZE;
  const char * tmp_dir  = TMP_DIR;
  char       * in       = 0;
  for (int i = 1; i < argc; ++i) {
    if (std::strcmp (argv[i], "-S") == 0) {
      buf_size = 1;
      char &c = argv[++i][std::strlen (argv[i])-1];
      char * err;
      switch (c) {
        case 'G': buf_size <<= 10;
        case 'M': buf_size <<= 10;
        case 'K': buf_size <<= 10; c = '\0';
        default:  buf_size *= std::strtol (argv[i], &err, 10);
          if (*err)
            {
            std::fprintf (stderr, "invalid size in -S argument: %s\n", argv[i]);
            std::exit (1);
          }
      }
    } else if (std::strcmp (argv[i], "-T") == 0) {
      tmp_dir = argv[++i];
    } else {
      in = argv[i];
    }
  }
  if (! in) std::fprintf (stderr, "no file specified\n");
  char tmp_fn[std::strlen (tmp_dir) + 12];
  // divide & conquer
  // prepare bin
  FILE * fp = std::fopen (in, "rb");
  if (! fp) {
    std::fprintf (stderr, "no such file: %s\n", in);
    std::exit (1);
  }
  struct rlimit rlim;
  size_t ntmp_limit
    = (getrlimit (RLIMIT_NOFILE, &rlim) == 0 ? rlim.rlim_cur : OPEN_MAX) - 4;
  // set # temporary files
  std::fseek (fp, 0, SEEK_END);
  size_t ntmp = std::min (std::ftell (fp) / buf_size + 1, ntmp_limit);
  std::fseek (fp, 0, SEEK_SET);
  std::vector <FILE*> tmpfps;
  tmpfps.reserve (ntmp);
  for (size_t i = 0; i < ntmp; ++i) {
    std::sprintf (&tmp_fn[0], "%s/shufXXXXXX", tmp_dir);
    tmpfps.push_back (fdopen (mkstemp (tmp_fn), "w+"));
    unlink (tmp_fn);
  }
  std::fclose (fp);
  // divide
  char line[MAX_LINE_LEN];
  fp = std::fopen (in, "r");
  while ((std::fgets (&line[0], MAX_LINE_LEN, fp)) != NULL) {
    FILE * writer = tmpfps[gen (ntmp)];
    size_t read = std::strlen (&line[0]);
    if (line[read-1] != '\n') {
      if (std::feof (fp)) {
        std::fwrite (&line[0], sizeof (char), read, writer);
        std::fprintf (writer, "\n");
        std::fprintf (stderr, "WARNING: line feeder is appended\n");
      } else {
        std::fprintf (stderr, "ERROR: Buffer Overflow; increase MAX_LINE_LEN\n");
        std::exit (1);
      }
    } else {
      std::fwrite (&line[0], sizeof (char), read, writer);
    }
  }
  // conquer
  size_t size_max = 0;
  for (std::vector <FILE*>::iterator it = tmpfps.begin ();
       it != tmpfps.end (); ++it)
    size_max = std::max (size_max, static_cast <size_t> (std::ftell (*it)));
  char * buf = new char[size_max];
  for (std::vector <FILE*>::iterator it = tmpfps.begin ();
       it != tmpfps.end (); ++it)
    shuffle_lines (*it, &buf[0], std::ftell (*it), gen);
  delete [] buf;
  while (! tmpfps.empty ()) {
    size_t bin = gen (tmpfps.size ());
    FILE * reader = tmpfps[bin];
    if (std::fgets (&line[0], MAX_LINE_LEN, reader) != NULL) {
      std::fwrite (&line[0], sizeof (char), std::strlen (line), stdout);
    } else {
      std::fclose (reader);
      tmpfps.erase (tmpfps.begin () + bin, tmpfps.begin () + bin + 1);
    }
  }
  return 0;
};
// g++ -DUSE_MT -O2 -march=core2 -m64 -o mt-shuf_ mt-shuf_.cc

同時に開けるファイル数に上限 (`ulimit -n` - 4; STDIN, STDOUT, STDERR & reader/writer) があるので,一時ファイル数に制限を与えている(もっとスマートなやり方があるとは思うが).さっさと実験してしまおう(以下 mt-shuf より右はデータサイズ以上のメモリを消費).一時ファイルが(可能な限り)残らないように改良.

N=#lines (D=size) |  sort -R | sort -n | mt-shuf_| mt-shuf | GNU shuf |
----------------------------------------------------------------------|
10^5 (    2.35MB) |    0.96s |   0.22s |   0.07s |   0.03s |    0.03s |
10^6 (   23.35MB) |   12.43s |   2.69s |   0.64s |   0.34s |    0.46s |
10^7 (  234.08MB) |  146.43s |  30.98s |   6.60s |   4.30s |    5.94s |
10^8 (2,341.42MB) | 1750.84s | 385.07s | 126.59s |  51.02s |       NA |

10^8 で速度が落ちているのは,ulimit -n が 256 なため一時ファイルのサイズが大きくなったからのようだ (ulimit -n 1024 すると 72.08s).これなら sort の代替として十分使えるかな.