非線形分類器のミニバッチ分散オンライン学習器を実装してみた

ソフトウェアをいろいろ更新した - ny23の日記 で紹介した,

がようやく学会で発表されたようなので,非線形学習版の実装を置いておく.RBF カーネルを用いたオンライン学習器をミニバッチ+並列化したもの.実装したのはもう二ヶ月ぐらい前(論文の pdf を著者が公開した直後)なのだけど,流石に学会発表前に実装を出すのは気が引けたので公開を先延ばしにしていた.

// mpa1.cc ; GNU GPL version 2 copyright@id:ny23
#include <err.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <cmath>
#include <vector>
#include <numeric>
#include <map>
#include <algorithm>
#include <chrono>
#include <omp.h>

static const size_t MAX_LINE_LEN = 1 << 20;

typedef std::vector <std::pair <long, double> >  x_t;
template <typename T> using ex_base_t = std::pair <T, x_t>;
typedef ex_base_t <int>     ex_t;
typedef ex_base_t <double>  sv_t;
typedef std::vector <std::vector <std::pair <size_t, double> > >  f2ss_t;
struct result_t { int pp, pn, np, nn; };

// RBF kernel: exp (-gamma * |x-s|^2); assuming |s|^2 = |x|^2= 1
// note: linear kernel for negative gamma value
inline double kernel (const double dot, const double gamma)
{ return gamma < 0 ? dot : std::exp (-gamma * (2.0 - 2 * dot)); }

// compute margin for given example
double margin (const x_t& x, const f2ss_t& f2ss, const std::vector <double>& alph, const double gamma) {
  double m = 0.0;
  std::vector <double> dot (alph.size (), 0);
  for (const auto& f : x) {
    if (f.first >= static_cast <long> (f2ss.size ())) break;
    for (const auto& s : f2ss[f.first])
      dot[s.first] += f.second * s.second;
  }
  for (size_t i = 0; i < dot.size (); ++i)
    m += alph[i] * kernel (dot[i], gamma), dot[i] = 0.0;
  return m;
}

// read example from line and return label
int read_example (char*& p, x_t& x) {
  int y = static_cast <int> (std::strtol (p, &p, 10));
  double norm = 0.0;
  while (*++p && *p != '\n') {
    const long   i = std::strtol (p, &p, 10); ++p;
    const double v = std::strtod (p, &p);
    x.emplace_back (i, v);
    norm += v * v;
  }
  norm = std::sqrt (norm);
  for (auto& fn : x) fn.second /= norm;
  return y;
}

// read examples
std::vector <ex_t> read_data (const char* train) {
  char line[MAX_LINE_LEN];
  FILE* reader = std::fopen (train, "r");
  std::vector <ex_t> ex;
  for (x_t x; std::fgets (line, MAX_LINE_LEN, reader) != NULL; x.clear ()) {
    char* p = line;
    const int y = read_example (p, x);
    ex.emplace_back (y, x);
  }
  std::fclose (reader);
  return ex;
}

#ifndef USE_PERCEPTRON
double inner_product (const x_t& xi, const x_t& xj) {
  double dot = 0;
  auto it (xi.begin ()), jt (xj.begin ());
  while (it != xi.end () && jt != xj.end ()) {
    if (it->first == jt->first)
      dot += it->second * jt->second, ++it, ++jt;
    else if (it->first < jt->first)
      ++it;
    else
      ++jt;
  }
  return dot;
}

// simple QP-solver by Hildreth (1959)
std::vector <double> hildreth (const std::vector <ex_t>& ex, std::vector <std::pair <size_t, double> >& ls, const double gamma, const double c) {
  static const size_t MAX_ITER = 10000;
  static const double EPS      = 1e-8;
  static const double ZERO     = 1e-16;
  const size_t nc = ls.size (); // # constraints
  std::vector <bool>   is_computed (nc, false);
  std::vector <double> t (nc, 0.0), F (nc), kkt (nc);
  std::vector <std::vector <double> > A (nc, std::vector <double> (nc));
  //
  std::sort (ls.begin (), ls.end ()); // just for assuring the same results
  for (size_t i = 0; i < nc; ++i) {
    const ex_t&  xi  = ex[ls[i].first];
    const double dot = inner_product (xi.second, xi.second);
    A[i][i] = xi.first * xi.first * kernel (dot, gamma);
    kkt[i] = F[i] = ls[i].second;
  }
  auto max_kkt = std::max_element (kkt.begin (), kkt.end ());
  size_t max_i = max_kkt - kkt.begin ();
  for (size_t iter = 0; *max_kkt >= EPS && iter < MAX_ITER; ++iter) {
    double diff_t = A[max_i][max_i] <= ZERO ? 0 : F[max_i] / A[max_i][max_i];
    double try_t  = t[max_i] + diff_t;
    double add_t  = try_t < 0 ? -1.0 * t[max_i] : (try_t > c ? c - t[max_i] : diff_t);
    // double add_t  = try_t < 0 ? -1.0 * t[max_i] : diff_t;
    t[max_i] += add_t;
    if (! is_computed[max_i]) {
      const ex_t& yx = ex[ls[max_i].first];
      for (size_t i = 0; i < nc; i++) {
        if (i == max_i) continue;
        const ex_t&  yxi  = ex[ls[i].first];
        const double dot = inner_product (yx.second, yxi.second);
        A[max_i][i] = yx.first * yxi.first * kernel (dot, gamma);
        is_computed[max_i] = true;
      }
    }
    for (size_t i = 0; i < nc; ++i) {
      F[i] -= add_t * A[max_i][i];
      kkt[i] = t[i] > c - ZERO ? -F[i] : (t[i] > ZERO ? std::fabs (F[i]) : F[i]);
      // kkt[i] = t[i] > ZERO ? std::fabs (F[i]) : F[i];
    }
    max_kkt = std::max_element (kkt.begin (), kkt.end ());
    max_i = max_kkt - kkt.begin ();
  }
  return t;
}
#endif

// distributed passive aggressive
void train (std::vector <ex_t> &ex, f2ss_t& f2ss, std::vector <double>& alph, const long iter, const double gamma, const double c, const long m, const long n) {
  const int batch_num = ex.size () / m + (ex.size () % m ? 1 : 0);
  //
  std::map <const x_t, size_t> smap; // avoid to register identical SV
  // std::vector <size_t> smap (ex.size (), 0);
  std::vector <double> alpha; // for averaging
  std::vector <std::pair <size_t, double> > ls (std::max (n, 1L));
  size_t nround = 0;
  for (int i = 0; i < iter; ++i) {
    // std::random_shuffle (ex.begin (), ex.end ());
    for (int j = 0; j < batch_num; ++j) { // mini-batch training
      const long nex = std::min (m, static_cast <long> (ex.size ()) - j * m);
      ls.clear ();
#pragma omp parallel for num_threads (n) if (n)
      for (long k = j * m; k < j * m + nex; ++k) { // distributed training
        double m = margin (ex[k].second, f2ss, alph, gamma);
        m *= ex[k].first;
#pragma omp critical
#ifdef USE_PERCEPTRON
        if (m <= 0) ls.emplace_back (k, 0);
#else
        if (m <  1) ls.emplace_back (k, 1 - m);
#endif
      }
      ++nround;
      if (ls.empty ()) continue;
#ifdef USE_PERCEPTRON
      std::vector <double> t (ls.size (), 1.0 / ls.size ());
#else
      std::vector <double> t (hildreth (ex, ls, gamma, c));
#endif
      for (size_t k = 0; k < ls.size (); ++k) {
        if (std::fpclassify (t[k]) == FP_ZERO) continue;
        const ex_t& s = ex[ls[k].first];
        auto itb = smap.emplace (s.second, smap.size ());
        if (itb.second) {
          if (! s.second.empty ()) { // maybe empty feature vector
            if (f2ss.size () <= static_cast <size_t> (s.second.back ().first))
              f2ss.resize (s.second.back ().first + 1, f2ss_t::value_type ());
            for (const auto &f: s.second)
              f2ss[f.first].emplace_back (itb.first->second, f.second);
          }
          alph.push_back  (s.first * t[k]);
          alpha.push_back (s.first * t[k] * nround);
        } else {
          alph[itb.first->second]  += s.first * t[k];
          alpha[itb.first->second] += s.first * t[k] * nround;
        }
      }
    }
    std::fprintf (stderr, ".");
  }
  // end with averaging
  for (size_t i = 0; i < alph.size (); ++i)
    alph[i] -= alpha[i] / (nround + 1);
}

result_t test (const std::vector <ex_t>& ex, const f2ss_t& f2ss, const std::vector <double>& alph, const double gamma, const long n) {
  int pp (0), pn (0), np (0), nn (0);
#pragma omp parallel for num_threads (n) if (n) reduction (+:pp,pn,np,nn)
  for (size_t i = 0; i < ex.size (); ++i) { // distributed testing
    const double m = margin (ex[i].second, f2ss, alph, gamma);
#pragma omp critical
    if (m >= 0) if (ex[i].first > 0) ++pp; else ++np;
    else        if (ex[i].first > 0) ++pn; else ++nn;
  }
  return { pp, pn, np, nn };
}

int main (int argc, char** argv) {
  if (argc < 7) ::errx (1, "Usage %s train test gamma c iter n", argv[0]);
  const double gamma = std::strtod (argv[3], NULL); // gamma in RBF kernel
  const double c     = std::strtod (argv[4], NULL); // ignored in perceptron
  const long   iter  = std::strtol (argv[5], NULL, 10);
  const long   m     = std::strtol (argv[6], NULL, 10); // size of mini-batch
  const long   n     = std::strtol (argv[7], NULL, 10); // # processors
  if (m < n) ::errx (1, "# proc must be larger than # batch");
  //
  f2ss_t f2ss;
  std::vector <double> alph; // coefficient of SVs
  {
    // read training examples
    std::fprintf (stderr, "read (train): ");
    auto start = std::chrono::steady_clock::now ();
    std::vector <ex_t> ex = read_data (argv[1]);
    auto d = std::chrono::steady_clock::now () - start;
    std::fprintf (stderr, "%.3fs\n", std::chrono::duration <double> (d).count ());
    // distributed training
    std::fprintf (stderr, "train: ");
    start = std::chrono::steady_clock::now ();
    train (ex, f2ss, alph, iter, gamma, c, m, n);
    d = std::chrono::steady_clock::now () - start;
    std::fprintf (stderr, "%.3fs\n", std::chrono::duration <double> (d).count ());
  }
  if (std::strcmp (argv[2], "-") == 0) return 0;
  {
    // read test examples
    std::fprintf (stderr, "read (test): ");
    auto start = std::chrono::steady_clock::now ();
    std::vector <ex_t> ex = read_data (argv[2]);
    auto d = std::chrono::steady_clock::now () - start;
    std::fprintf (stderr, "%.3fs\n", std::chrono::duration <double> (d).count ());
    // distributed testing
    std::fprintf (stderr, "test: ");
    start = std::chrono::steady_clock::now ();
    result_t r = test (ex, f2ss, alph, gamma, n);
    d = std::chrono::steady_clock::now () - start;
    std::fprintf (stderr, "%.3fs\n", std::chrono::duration <double> (d).count ());
    std::fprintf (stderr, "#SV: %ld\n", alph.size ());
    std::fprintf (stderr, "acc. %2.3f%% (pp %d) (pn %d) (np %d) (nn %d)\n",
                  (r.pp + r.nn) * 100.0 / (r.pp + r.pn + r.np + r.nn),
                  r.pp, r.pn, r.np, r.nn);
  }
  return 0;
}

特徴ベクトルを正規化しているのは,RBF カーネル内積だけから計算できるようにするため.これで転置インデクスを用いた高速化手法が適用しやすくなる.正規化しなくても高速化はできるが,その場合,特徴ベクトルのノルムを別途保持する必要が出てくる.200行以内に収めたかったが,solver がこれ以上短くできなかったので諦めた.ちょっと長いかな.学習を並列化してテストを並列化しないのは片手落ちなので,テストも OpenMP で並列化.
ijcnn1 データセットで評価した結果は以下.

> g++-4.8 -Wall -O2 -g -std=c++11 -fopenmp mpak1.cc -o mpak1

// Intel(R) Xeon(R) CPU E7- 4830  @ 2.13GHz; 32CPU 512GM RAM
// 1 CPU (baseline)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 1 0  
read (train): 0.351s
train: .....218.311s
read (test): 0.282s
test: 88.880s
#SV: 14396
acc. 99.146% (pp 8253) (pn 459) (np 324) (nn 82665)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 1 0  307.29s user 0.15s system 99% cpu 5:07.85 total

// 2 CPU (batch size = 256)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 2 
read (train): 0.349s
train: .....116.360s
read (test): 0.422s
test: 44.532s
#SV: 12933
acc. 99.144% (pp 8268) (pn 444) (np 341) (nn 82648)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 2  306.66s user 0.04s system 189% cpu 2:41.69 total

// 4 CPU (batch size = 256)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 4
read (train): 0.358s
train: .....53.695s
read (test): 0.599s
test: 20.578s
#SV: 12933
acc. 99.144% (pp 8268) (pn 444) (np 341) (nn 82648)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 4  290.27s user 0.04s system 385% cpu 1:15.26 total

// 8 CPU (batch size = 256)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 8
read (train): 0.349s
train: .....27.922s
read (test): 0.586s
test: 10.714s
#SV: 12933
acc. 99.144% (pp 8268) (pn 444) (np 341) (nn 82648)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 8  307.43s user 0.05s system 776% cpu 39.602 total

// 16 CPU (batch size = 256)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 16
read (train): 0.349s
train: .....15.029s
read (test): 0.618s
test: 6.251s
#SV: 12933
acc. 99.144% (pp 8268) (pn 444) (np 341) (nn 82648)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 16  325.13s user 0.07s system 1459% cpu 22.280 total

// 32 CPU (batch size = 256)
> time ./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 32
read (train): 0.352s
train: .....12.206s
read (test): 0.284s
test: 4.773s
#SV: 12933
acc. 99.144% (pp 8268) (pn 444) (np 341) (nn 82648)
./mpak1 ijcnn1 ijcnn1.t 20.0 0.5 5 256 32  520.24s user 0.13s system 2948% cpu 17.650 total

というわけで綺麗にスレッド分散の台数効果*1が出ている.また,バッチサイズを大きめにとることでオンライン学習にバッチ学習のフレーバーが入ってサポートベクタの数が減っている点は注目に値する.PA-I による(カーネルを用いた)非線形学習ではサポートベクタ数が増えがちなので,これは嬉しい.C++11 で線形分類器の分散オンライン学習器を実装してみた - ny23の日記で実装を紹介したIterative Parameter Mixingに比べると,線形学習を高速化するのにはあまり向かないが*2非線形学習や構造学習のような訓練例の分類コストの大きい学習はより効率的に高速化できそうだ.
しかし,この並列化手法もそうだけれど,自分の専門分野で「高速化」を謳う論文は,計算量が非線形であるなど,極端に遅いモデル*3を速くする,というものであることが多い.実用的には,最速のモデル*4を速くしないと解ける問題の規模は変わらないので,あまりインパクトがないと感じるのだけど(どうなのだろう).
決定的なモデルでほとんどの問題が解けるタスクで,必要以上に解空間を広げて「遅く」したモデルなんて,速くできて当たり前ではないか.計算の枝刈りができまくるのが目に見えている.なんだか自分で生み出した問題を自分で解いている感じがしてしまうのだが(しょうがないか)*5.低速・高精度の手法を提案→高速化で論文二本書くのは「身から出た錆」研究みたいでなんだかやるせない.最初から高速・高精度の手法(解空間を広げても遅くならないよう工夫)を提案したいな.
[追記]この研究の場合,構造学習を並列化するなら,古典的な Viterbi ベースの構造化パーセプトロン(や PA-I)でなくて,近年提案されているより高速なオンライン構造学習手法を高速化して欲しい.また,この記事で紹介したカーネルを用いた非線形分類器にしても,素性空間を陽に写像して線形分類器に変換・あるいは近似することで学習・分類を高速化手法があるので,本当はそちらで実験する方が望ましい*6(分類コストが下がってパラメタ更新コストが上がるので,台数効果に影響が出ると思われる).

*1:コア数と同じ数のスレッドを生成した場合に台数効果が出ていないのは,キャッシュなどでリソースの競合が起きているためと考えられる.

*2:線形学習ではパラメタ更新のコストが分類コストとほとんど同じなため,分類のみ並列化してもあまり意味が無い.学習が進むにつれパラメタの更新頻度は減ってくるので,全く意味が無いということはないと思うが.

*3:動的計画法による大域最適化とか.分類器で言えば,非線形分類や構造分類など.

*4:点予測や,構文解析の Shift-Reduce モデルなど.分類器で言えば,線形分類.

*5:とはいえ自分も人のことは言えない.ほとんど文脈自由文法で記述できる自然言語に対して,一部の事象を説明するために解析の計算量が大きい形式文法を導入して解析を高速化してみたり.

*6:RBF カーネルの場合は近似しかできないので,厳密解法を並列化するのにはかろうじてまだ価値があるが.