Mersenne Twister でランダムに行をシャッフル (4)

これで終わり - ny23の日記id:s-yata 氏にコメントを書いているうちに,(3) よりはまだこちらの方が良いかと思ったので,一応書いておきます.名前の付け方は適当です.引数を省略すると stdin を受け付けるよう改良.

// mt-shuf___.cc
#include <unistd.h>
#include <sys/resource.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <vector>
#include <algorithm>

// random number generator
#ifdef USE_MT
#include <tr1/random>
struct rand_ {
  std::tr1::variate_generator <std::tr1::mt19937,
                               std::tr1::uniform_int <size_t> > gen;
  rand_ () : gen (std::tr1::mt19937 (), std::tr1::uniform_int <size_t> ()) {};
  long operator () (long max) { return gen (max); }
};
#else
struct rand_ { long operator() (long max) const { return random () % max; } };
#endif

#ifndef BUF_SIZE       // default chunk size for shuffling (1M)
#define BUF_SIZE (1 << 20)
#endif

#ifndef OPEN_MAX       // default # file descriptors
#define OPEN_MAX 20
#endif

#ifndef MAX_LINE_LEN   // maximum length of lines
#define MAX_LINE_LEN 65536
#endif

#ifndef TMP_DIR
#define TMP_DIR "/tmp" // default directory to store temporary files
#endif

void shuffle_lines (FILE * fp, char * buf, size_t size, rand_ &gen) {
  // load (partial) data on memory
  std::fseek (fp, 0, SEEK_SET); // flush here
  std::fread (&buf[0], sizeof (char), size, fp);
  // shuffle
  std::vector <const char *> lns;
  for (char *p (&buf[0]), *end (&buf[size]); p != end; *p = '\0', ++p) {
    lns.push_back (p);
    while (*p != '\n') ++p;
  }
  std::random_shuffle (lns.begin (), lns.end (), gen);
  // overwrite & recover position
  for (std::vector <const char *>::iterator it = lns.begin ();
       it != lns.end (); ++it)
    std::fprintf (stdout, "%s\n", *it);
}

int main (int argc, char** argv) {
  // random number generator
  rand_ gen;
  // handle options
  size_t       buf_size = BUF_SIZE;
  const char * tmp_dir  = TMP_DIR;
  char       * in       = 0;
  for (int i = 1; i < argc; ++i) {
    if (std::strcmp (argv[i], "-h") == 0) {
      std::fprintf (stderr, "Usage: %s [-S size] [-T tmp_dir] [in]\n", argv[0]);
      return (0);
    } else if (std::strcmp (argv[i], "-S") == 0) {
      buf_size = 1;
      char &c = argv[++i][std::strlen (argv[i])-1];
      char * err;
      switch (c) {
        case 'G': buf_size <<= 10;
        case 'M': buf_size <<= 10;
        case 'K': buf_size <<= 10; c = '\0';
        default:  buf_size *= std::strtol (argv[i], &err, 10);
          if (*err)
            {
            std::fprintf (stderr, "invalid size in -S argument: %s\n", argv[i]);
            std::exit (1);
          }
      }
    } else if (std::strcmp (argv[i], "-T") == 0) {
      tmp_dir = argv[++i];
    } else {
      in = argv[i];
    }
  }
  char tmp_fn[std::strlen (tmp_dir) + 12];
  // divide & conquer
  // prepare bin
  FILE * fp = in ? std::fopen (in, "rb") : stdin;
  if (! fp) {
    std::fprintf (stderr, "no such file: %s\n", in);
    std::exit (1);
  }
  struct rlimit rlim;
  size_t ntmp_limit
    = (getrlimit (RLIMIT_NOFILE, &rlim) == 0 ? rlim.rlim_cur : OPEN_MAX) - 4;
  // set # temporary files
  size_t ntmp = ntmp_limit;
  if (in) {
    std::fseek (fp, 0, SEEK_END);
    ntmp = std::min (std::ftell (fp) / buf_size + 1, ntmp);
    std::fseek (fp, 0, SEEK_SET); // seek
    std::fclose (fp);
    fp = std::fopen (in, "r");
  }
  std::vector <FILE*> tmpfps;
  tmpfps.reserve (ntmp);
  for (size_t i = 0; i < ntmp; ++i) {
    std::sprintf (&tmp_fn[0], "%s/shufXXXXXX", tmp_dir);
    tmpfps.push_back (fdopen (mkstemp (tmp_fn), "w+"));
    unlink (tmp_fn);
  }
  // divide & conquer
  char line[MAX_LINE_LEN];
  while ((std::fgets (&line[0], MAX_LINE_LEN, fp)) != NULL) {
    FILE * writer = tmpfps[gen (ntmp)];
    size_t read = std::strlen (&line[0]);
    if (line[read-1] != '\n') {
      if (std::feof (fp)) {
        std::fwrite (&line[0], sizeof (char), read, writer);
        std::fprintf (writer, "\n");
        std::fprintf (stderr, "WARNING: line feeder is appended\n");
      } else {
        std::fprintf (stderr, "ERROR: Buffer Overflow; increase MAX_LINE_LEN\n");
        std::exit (1);
      }
    } else {
      std::fwrite (&line[0], sizeof (char), read, writer);
    }
  }
  std::fclose (fp);
  // conquer
  size_t size_max = 0;
  for (std::vector <FILE*>::iterator it = tmpfps.begin ();
       it != tmpfps.end (); ++it)
    size_max = std::max (size_max, static_cast <size_t> (std::ftell (*it)));
  char * buf = new char[size_max];
  for (std::vector <FILE*>::iterator it = tmpfps.begin ();
       it != tmpfps.end (); ++it)
    shuffle_lines (*it, &buf[0], std::ftell (*it), gen);
  delete [] buf;
  return 0;
};
// g++ -DUSE_MT -O2 -march=core2 -m64 -o mt-shuf___ mt-shuf___.cc
N=#lines (D=size) |mt-shuf___| 
------------------------------
10^5 (    2.35MB) |    0.05s |
10^6 (   23.35MB) |    0.47s |
10^7 (  234.08MB) |    4.75s |
10^8 (2,341.42MB) |   69.98s |

Mersenne Twister でランダムに行をシャッフル (2) - ny23の日記 に比べると,一時ファイルへの Sequential Write / Random Read がない分高速.ulimit -n 1024 すると,10^8 でも 51.38s とオンメモリ版 (mt-shuf; 51.02s) と同程度の速度.オンメモリ版は一定以上サイズが大きくなると global な random_shuffle でキャッシュミスが頻発するので遅くなるが,local に random_shuffle する場合,処理単位をキャッシュサイズ以下に抑えることができればキャッシュミスは防げるため差が縮まっているのだと思われる.