Collections.shuffleのRandomについて

  • このエントリーをはてなブックマークに追加

Javaのjava.util.Collectionsにはshuffleという便利なメソッドがありますが、shuffle(List<?> list)はマルチスレッドで遅くなります。JDK8u121の実装では次のようになっています。

public static void shuffle(List<?> list) {
Random rnd = r;
if (rnd == null)
r = rnd = new Random(); // harmless race.
shuffle(list, rnd);
}
private static Random r;

java.util.Randomを共有して処理する事になるので、複数のスレッドでCollections.shuffleを呼び出すと処理が詰まるわけです。Collectionsにはshuffle(List<?> list, Random rnd)というメソッドがあるので、次のように使うのが良いと思います。

Collections.shuffle(list, ThreadLocalRandom.current())

別件ですが、複数のスレッドで同じListに対してCollections.shuffleを呼び出すと壊れます。(そんな事をする人は少ないと思いますが)

どれくらい遅いのか?

JMHで簡単に計ってみました。

Benchmark                               Mode  Cnt        Score   Error  Units
Shuffle.random_list100 thrpt 2 122031.999 ops/s
Shuffle.random_list1000 thrpt 2 11964.147 ops/s
Shuffle.random_list10000 thrpt 2 1306.136 ops/s
Shuffle.thread_local_random_list100 thrpt 2 2370353.238 ops/s
Shuffle.thread_local_random_list1000 thrpt 2 200793.729 ops/s
Shuffle.thread_local_random_list10000 thrpt 2 21029.163 ops/s
@Fork(1)
@Warmup(iterations=3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Measurement(iterations=2, time=10, timeUnit=TimeUnit.SECONDS)
@Threads(4)
@State(Scope.Thread)
public class Shuffle {

private List<Integer> list100 = new ArrayList<Integer>(100);
private List<Integer> list1000 = new ArrayList<Integer>(1000);
private List<Integer> list10000 = new ArrayList<Integer>(10000);
@Setup
public void setup() {
for (int i = 0; i < 100; i++) {
list100.add(ThreadLocalRandom.current().nextInt());
}
for (int i = 0; i < 1000; i++) {
list1000.add(ThreadLocalRandom.current().nextInt());
}
for (int i = 0; i < 10000; i++) {
list10000.add(ThreadLocalRandom.current().nextInt());
}
}

@Benchmark
public void random_list100() {
Collections.shuffle(list100);
}

@Benchmark
public void random_list1000() {
Collections.shuffle(list1000);
}

@Benchmark
public void random_list10000() {
Collections.shuffle(list10000);
}

@Benchmark
public void thread_local_random_list100() {
Collections.shuffle(list100, ThreadLocalRandom.current());
}

@Benchmark
public void thread_local_random_list1000() {
Collections.shuffle(list1000, ThreadLocalRandom.current());
}

@Benchmark
public void thread_local_random_list10000() {
Collections.shuffle(list10000, ThreadLocalRandom.current());
}

}

RubyのAWS-SDKとForkでハマったこと

  • このエントリーをはてなブックマークに追加

RubyでKinesis Streamからレコードを取得する処理を書いていたのですが、通信が入れ替わったりして別のShardのShardIteratorが混じってくる現象に悩まされていました。

結論から言うと、AWS-SDKは内部でグローバルなConnection Poolを作ります。親プロセスで1回でもAWS-SDKを使って通信をすると、Forkした後に子供にも内部のConnection Poolがそのまま継承されます。
親で作られたCollectionPool=FDが複数の子供で共有されてしまうので、それぞれのプロセスで通信が混線してしまったという訳です。

解決策としては、子プロセスの最初に次のコードを実行する必要があります。

Seahorse::Client::NetHttp::ConnectionPool.pools.each do |pool|
pool.empty!
end

AWS-SDKのバージョンがv2.9.5以降であれば次のコードで同じコードが実行されます。

Aws.empty_connection_pools!

関連チケットです https://github.com/aws/aws-sdk-ruby/issues/1438

他にも使用していたLevelDBやSequelでもForkSafeではないのでそれぞれ対策が必要です。

LevelDBの場合

こんな感じで書き込みが止まってしまいました。
こっちは色々と掃除しても何故か発生してしまうので、親プロセスで使うなという感じ。

#0  0x00007f1e7929a64d in poll () from /lib64/libc.so.6
#1 0x00007f1e7a06c69c in ?? () from /usr/lib64/libruby.so.2.3
#2 0x00007f1e79cb4dc5 in start_thread () from /lib64/libpthread.so.0
#3 0x00007f1e792a4c9d in clone () from /lib64/libc.so.6
Thread 1 (Thread 0x7f1e7a56b740 (LWP 75934)):
#0 0x00007f1e79cb86d5 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f1e6db494ed in leveldb::port::CondVar::Wait() () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#2 0x00007f1e6db1f4aa in leveldb::DBImpl::MakeRoomForWrite(bool) () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#3 0x00007f1e6db22d38 in leveldb::DBImpl::Write(leveldb::WriteOptions const&, leveldb::WriteBatch*) () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#4 0x00007f1e6db1f829 in leveldb::DB::Put(leveldb::WriteOptions const&, leveldb::Slice const&, leveldb::Slice const&) () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#5 0x00007f1e6db1f869 in leveldb::DBImpl::Put(leveldb::WriteOptions const&, leveldb::Slice const&, leveldb::Slice const&) () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#6 0x00007f1e6db1c7a8 in leveldb_put () from /data/app/smart-ad-rtm/vendor/bundle/ruby/2.3/gems/leveldb-0.1.9/ext/leveldb/libleveldb.so
#7 0x00007f1e6dd5fcec in ffi_call_unix64 () from /usr/lib64/libffi.so.6

https://github.com/google/leveldb/issues/169#issuecomment-260458097

Sequelの場合

接続を切りましょう

DB.disconnect

http://www.rubydoc.info/github/jeremyevans/sequel/Sequel%2FDatabase%3Adisconnect

まとめ

ライブラリの中身を気にしながらForkを考えるのは難しい。

PostgreSQLの関数の引数は100個まで

  • このエントリーをはてなブックマークに追加

100個以上指定するとこんなエラーが出るみたいです。(PostgreSQL 9.5.3で確認)
解決策としては、1つのARRAYとして渡せと。

org.postgresql.util.PSQLException: ERROR: cannot pass more than 100 arguments to a function

そんな無茶な設計をするんじゃないよって事ですが、可変長引数を引数を受け取るこんな関数を定義していました。

CREATE OR REPLACE FUNCTION my_function(VARIADIC ids bigint[]) 
RETURNS TABLE (
fuga_id BIGINT,
hoge BIGINT,
fuga NUMERIC
) AS $$
SELECT fuga_id, hoge, SUM(fuga)
FROM xxxx
WHERE fuga_id = ANY ($1)
$$ LANGUAGE SQL;

この関数をこんな感じで叩いていました。

SELECT * FROM my_function(1);
SELECT * FROM my_function(1,2,3,4,5);

参考: http://stackoverflow.com/questions/17421265/postgresql-how-to-pass-more-than-100-arguments-to-a-function

DynamoDBでHTTPS通信をやめたい

  • このエントリーをはてなブックマークに追加

DynamoDBに限った話ではなく、AWS-SDK全般に言える事なのですが、デフォルトはHTTPS通信を行います。

これはClientConfigurationクラスの設定によるものなのですが、SSL処理はまぁ比較的重いものです。特に高負荷環境においては結構気になるものです。
DynamoDBを例に取ると、AmazonDynamoDBClient.setRegion(日本リージョン)をした時にendpointとしてhttps://dynamodb.ap-northeast-1.amazonaws.comを設定します。この時にClientConfiguration.getProtocol()でHTTPSのスキーマを設定しています。
なので、setRegionする前に、ClientConfiguration.setRegion(Protocol.HTTP)をすればHTTPで通信する事ができます。

これを調べていた過程で気になったのは、EC2インスタンスからアクセスした場合、Internal通信なのかPublic通信なのか。ものによっては(RDSとかは)Internal通信になるとかあるのですがDynamoDBの場合は、Publicアクセスっぽい。Publicインターフェースを持っている/いないに関わらず、DNSはPublicIPを返してきます。

DynamoDBにはVPC Endpointが無いのがなぁ..
https://forums.aws.amazon.com/message.jspa?messageID=701530

(SDKのソースを見てたらS3はHTTPSじゃないとダメだよってassertがあるっぽい)

参考: https://docs.aws.amazon.com/ja_jp/general/latest/gr/rande.html

Electronの代替技術

  • このエントリーをはてなブックマークに追加

Web+DBにも特集が組まれたElectronが気になっています。
WEBの技術でGUIクライアントアプリが作れるのですが、どうにもNode.jsというのが..

個人的にはQt5とかWPF/C#とかで書きたいのですが、それだとWindowsだけになっちゃうのが辛い。
似たようなものでNW.jsを見つけました。
が、これだったらElectronの方がよさそう。

でもNode.JSかぁ..(´・ω・`)

PostgreSQLでSQLの勉強

  • このエントリーをはてなブックマークに追加

色々な事を考えていたら、ハマったのでその記録。

Postgresのバージョンは9.5.3です。(正確にはPipelineDBの0.9.5です)
例えば、以下のテーブルがあります。

CREATE TABLE imp   (campaign_id BIGINT, campaign_creative_id BIGINT,   imp_count BIGINT, CONSTRAINT   pk_imp1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE vimp (campaign_id BIGINT, campaign_creative_id BIGINT, vimp_count BIGINT, CONSTRAINT pk_vimp1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE click (campaign_id BIGINT, campaign_creative_id BIGINT, click_count BIGINT, CONSTRAINT pk_click1 PRIMARY KEY (campaign_id, campaign_creative_id));
CREATE TABLE conv (campaign_id BIGINT, campaign_creative_id BIGINT, conv_count BIGINT, CONSTRAINT pk_conv1 PRIMARY KEY (campaign_id, campaign_creative_id));

これを次のようにまとめたいとします。なんとなくやりたい事は理解してもらえると思います。

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
;

期待する結果としては次のようになります。

INSERT INTO imp VALUES(100, 1000, 111);
INSERT INTO imp VALUES(100, 1001, 221);
INSERT INTO imp VALUES(101, 1000, 333);
INSERT INTO vimp VALUES(100, 1000, 30);
INSERT INTO click VALUES(102, 1004, 5);
INSERT INTO conv VALUES(102, 1004, 1);
INSERT INTO conv VALUES(103, 1005, 2);
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
102 | 1004 | 0 | 0 | 5 | 1
103 | 1005 | 0 | 0 | 0 | 2

ただし欲しい情報は全部ではないので、campaign_idで絞り込みます。
最終的にはFUNCTIONにして可変長引数としてbigint[]を受け取り、それをcampaign_id = ANY(campaign_ids)みたいな事をしますが、問題を簡単にするためにひとまず1件のcampaign_idを指定します。

いくつかのパターンを試す

ダミーデータで実行してもあまり意味が無いので、手元にあるテーブルを使います。
テーブル名が上記のサンプルからがらりと変わりますが、適宜脳内変換をして頂ければ…

愚直にWHERE句に条件を書く

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
WHERE
campaign_id = 102

imp.campaign_idではなくcampaign_id
WHERE句に書いたフィルタリングは結合の後に実行されます。

 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
102 | 1004 | 0 | 0 | 5 | 1
Hash Full Join  (cost=257.71..361.29 rows=8 width=96) (actual time=0.051..0.056 rows=1 loops=1)
Hash Cond: ((COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id) = conv.campaign_id) AND (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id) = conv.campaign_creative_id))
Filter: (COALESCE(COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id), conv.campaign_id) = 102)
Rows Removed by Filter: 3
-> Merge Full Join (cost=243.22..335.01 rows=1570 width=72) (actual time=0.024..0.028 rows=4 loops=1)
Merge Cond: ((click.campaign_id = (COALESCE(imp.campaign_id, vimp.campaign_id))) AND (click.campaign_creative_id = (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))))
-> Index Scan using pk_click1 on click (cost=0.15..71.70 rows=1570 width=24) (actual time=0.003..0.003 rows=1 loops=1)
-> Sort (cost=243.06..246.99 rows=1570 width=48) (actual time=0.019..0.020 rows=3 loops=1)
Sort Key: (COALESCE(imp.campaign_id, vimp.campaign_id)), (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=0.30..159.72 rows=1570 width=48) (actual time=0.006..0.010 rows=3 loops=1)
Merge Cond: ((imp.campaign_id = vimp.campaign_id) AND (imp.campaign_creative_id = vimp.campaign_creative_id))
-> Index Scan using pk_imp1 on imp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.002 rows=3 loops=1)
-> Index Scan using pk_vimp1 on vimp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.003 rows=1 loops=1)
-> Hash (cost=14.37..14.37 rows=8 width=24) (actual time=0.013..0.013 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> Bitmap Heap Scan on conv (cost=4.21..14.37 rows=8 width=24) (actual time=0.011..0.011 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_conv1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.008..0.008 rows=1 loops=1)
Index Cond: (campaign_id = 102)
Planning time: 0.295 ms
Execution time: 0.119 ms

WHERE句に全部条件を書いてみる

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN click
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN conv
USING (campaign_id, campaign_creative_id)
WHERE
imp.campaign_id = 102
OR vimp.campaign_id = 102
OR click.campaign_id = 102
OR conv.campaign_id = 102
;

※今回のケースではANDで繋げるのはNG。とてもひどいことになった気がする。

Merge Full Join  (cost=418.50..510.92 rows=31 width=96) (actual time=0.034..0.035 rows=1 loops=1)
Merge Cond: ((conv.campaign_id = (COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id))) AND (conv.campaign_creative_id = (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id))))
Filter: ((imp.campaign_id = 102) OR (vimp.campaign_id = 102) OR (click.campaign_id = 102) OR (conv.campaign_id = 102))
Rows Removed by Filter: 4
-> Index Scan using pk_conv1 on conv (cost=0.15..71.70 rows=1570 width=24) (actual time=0.006..0.006 rows=2 loops=1)
-> Sort (cost=418.35..422.28 rows=1570 width=72) (actual time=0.022..0.022 rows=4 loops=1)
Sort Key: (COALESCE(COALESCE(imp.campaign_id, vimp.campaign_id), click.campaign_id)), (COALESCE(COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id), click.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=243.22..335.01 rows=1570 width=72) (actual time=0.015..0.018 rows=4 loops=1)
Merge Cond: ((click.campaign_id = (COALESCE(imp.campaign_id, vimp.campaign_id))) AND (click.campaign_creative_id = (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))))
-> Index Scan using pk_click1 on click (cost=0.15..71.70 rows=1570 width=24) (actual time=0.001..0.001 rows=1 loops=1)
-> Sort (cost=243.06..246.99 rows=1570 width=48) (actual time=0.012..0.012 rows=3 loops=1)
Sort Key: (COALESCE(imp.campaign_id, vimp.campaign_id)), (COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id))
Sort Method: quicksort Memory: 25kB
-> Merge Full Join (cost=0.30..159.72 rows=1570 width=48) (actual time=0.005..0.008 rows=3 loops=1)
Merge Cond: ((imp.campaign_id = vimp.campaign_id) AND (imp.campaign_creative_id = vimp.campaign_creative_id))
-> Index Scan using pk_imp1 on imp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.002..0.003 rows=3 loops=1)
-> Index Scan using pk_vimp1 on vimp (cost=0.15..71.70 rows=1570 width=24) (actual time=0.001..0.002 rows=1 loops=1)
Planning time: 0.194 ms
Execution time: 0.084 ms

JOINにONを使って、そこで絞り込む

https://www.postgresql.jp/document/9.5/html/queries-table-expressions.html

この理由はON句の中の制約は結合の前に処理され、一方WHERE句の中の制約は結合の後に処理されることによります。 これは内部結合には影響がありませんが、外部結合には大きな影響があります。

JOINは結合前にフィルタリングされる。

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (imp.campaign_id = click.campaign_id AND imp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (imp.campaign_id = conv.campaign_id AND imp.campaign_creative_id = conv.campaign_creative_id)
;

あれ..この時点で結果が違うぞ..

 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
102 | 1004 | 0 | 0 | 5 | 0
102 | 1004 | 0 | 0 | 0 | 1
103 | 1005 | 0 | 0 | 0 | 2

こうでした。

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (vimp.campaign_id = click.campaign_id AND vimp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (click.campaign_id = conv.campaign_id AND click.campaign_creative_id = conv.campaign_creative_id)
;
 campaign_id | campaign_creative_id | imp_count | vimp_count | click_count | conv_count
-------------+----------------------+-----------+------------+-------------+------------
102 | 1004 | 0 | 0 | 5 | 1
100 | 1000 | 111 | 30 | 0 | 0
100 | 1001 | 221 | 0 | 0 | 0
101 | 1000 | 333 | 0 | 0 | 0
103 | 1005 | 0 | 0 | 0 | 2

これを元にフィルタリング。

SELECT
COALESCE(imp.campaign_id, vimp.campaign_id, click.campaign_id, conv.campaign_id) as campaign_id,
COALESCE(imp.campaign_creative_id, vimp.campaign_creative_id, click.campaign_creative_id, conv.campaign_creative_id) as campaign_creative_id,
COALESCE(imp.imp_count, 0) as imp_count,
COALESCE(vimp.vimp_count, 0) as vimp_count,
COALESCE(click.click_count, 0) as click_count,
COALESCE(conv.conv_count, 0) as conv_count
FROM
imp
FULL OUTER JOIN vimp
ON (imp.campaign_id = 102 AND vimp.campaign_id = 102 AND imp.campaign_id = vimp.campaign_id AND imp.campaign_creative_id = vimp.campaign_creative_id)
FULL OUTER JOIN click
ON (vimp.campaign_id = 102 AND click.campaign_id = 102 AND vimp.campaign_id = click.campaign_id AND vimp.campaign_creative_id = click.campaign_creative_id)
FULL OUTER JOIN conv
ON (click.campaign_id = 102 AND conv.campaign_id = 102 AND click.campaign_id = conv.campaign_id AND click.campaign_creative_id = conv.campaign_creative_id)
;

期待する結果にならないな.. FULL OUTER JOINは難しい。
そもそも何をしたいかと言うと、WHERE句に書くとJOINした後のフィルタリングなので、JOINする前に必要な情報だけに削ってからJOINしたい。
という事は、campaign_id=102で絞った各テーブルを結合する事になる。

WITH構文

WITH 
fimp AS (SELECT * FROM imp WHERE campaign_id = 102),
fvimp AS (SELECT * FROM vimp WHERE campaign_id = 102),
fclick AS (SELECT * FROM click WHERE campaign_id = 102),
fconv AS (SELECT * FROM conv WHERE campaign_id = 102)
SELECT
COALESCE(fimp.campaign_id, fvimp.campaign_id, fclick.campaign_id, fconv.campaign_id) as campaign_id,
COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id, fclick.campaign_creative_id, fconv.campaign_creative_id) as campaign_creative_id,
COALESCE(fimp.imp_count, 0) as imp_count,
COALESCE(fvimp.vimp_count, 0) as vimp_count,
COALESCE(fclick.click_count, 0) as click_count,
COALESCE(fconv.conv_count, 0) as conv_count
FROM
fimp
FULL OUTER JOIN fvimp
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN fclick
USING (campaign_id, campaign_creative_id)
FULL OUTER JOIN fconv
USING (campaign_id, campaign_creative_id)
;
Hash Full Join  (cost=58.33..58.70 rows=8 width=96) (actual time=0.071..0.075 rows=1 loops=1)
Hash Cond: ((COALESCE(COALESCE(fimp.campaign_id, fvimp.campaign_id), fclick.campaign_id) = fconv.campaign_id) AND (COALESCE(COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id), fclick.campaign_creative_id) = fconv.campaign_creative_id))
CTE fimp
-> Bitmap Heap Scan on imp (cost=4.21..14.37 rows=8 width=24) (actual time=0.004..0.004 rows=0 loops=1)
Recheck Cond: (campaign_id = 102)
-> Bitmap Index Scan on pk_imp1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.003..0.003 rows=0 loops=1)
Index Cond: (campaign_id = 102)
CTE fvimp
-> Bitmap Heap Scan on vimp (cost=4.21..14.37 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
Recheck Cond: (campaign_id = 102)
-> Bitmap Index Scan on pk_vimp1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.014..0.014 rows=0 loops=1)
Index Cond: (campaign_id = 102)
CTE fclick
-> Bitmap Heap Scan on click (cost=4.21..14.37 rows=8 width=24) (actual time=0.003..0.003 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_click1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.002..0.002 rows=1 loops=1)
Index Cond: (campaign_id = 102)
CTE fconv
-> Bitmap Heap Scan on conv (cost=4.21..14.37 rows=8 width=24) (actual time=0.010..0.010 rows=1 loops=1)
Recheck Cond: (campaign_id = 102)
Heap Blocks: exact=1
-> Bitmap Index Scan on pk_conv1 (cost=0.00..4.21 rows=8 width=0) (actual time=0.008..0.008 rows=1 loops=1)
Index Cond: (campaign_id = 102)
-> Hash Full Join (cost=0.56..0.86 rows=8 width=72) (actual time=0.042..0.042 rows=1 loops=1)
Hash Cond: ((COALESCE(fimp.campaign_id, fvimp.campaign_id) = fclick.campaign_id) AND (COALESCE(fimp.campaign_creative_id, fvimp.campaign_creative_id) = fclick.campaign_creative_id))
-> Hash Full Join (cost=0.28..0.51 rows=8 width=48) (actual time=0.028..0.028 rows=0 loops=1)
Hash Cond: ((fimp.campaign_id = fvimp.campaign_id) AND (fimp.campaign_creative_id = fvimp.campaign_creative_id))
-> CTE Scan on fimp (cost=0.00..0.16 rows=8 width=24) (actual time=0.004..0.004 rows=0 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 8kB
-> CTE Scan on fvimp (cost=0.00..0.16 rows=8 width=24) (actual time=0.014..0.014 rows=0 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.005..0.005 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> CTE Scan on fclick (cost=0.00..0.16 rows=8 width=24) (actual time=0.003..0.003 rows=1 loops=1)
-> Hash (cost=0.16..0.16 rows=8 width=24) (actual time=0.017..0.017 rows=1 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 9kB
-> CTE Scan on fconv (cost=0.00..0.16 rows=8 width=24) (actual time=0.014..0.015 rows=1 loops=1)
Planning time: 0.513 ms
Execution time: 0.347 ms

あれ..随分遅くなった..
件数が少ないからかしら。

データを多く投入してみる

各テーブルに10万件

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO imp SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO vimp SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO click SELECT * FROM tmp;

WITH RECURSIVE tmp(c1,c2,c3) AS (
SELECT 1001, 10000,floor(random()*10000000)
UNION ALL
SELECT c1+1, c2+2,floor(random()*10000000)
FROM tmp WHERE c1 < 1000 + 100000
)
INSERT INTO conv SELECT * FROM tmp;

なんだかんだでWITH構文(CTE)使う方法が一番良いっぽい。
最終的には、WITHで絞り込んだ情報をJOINすると、240msくらいから0.1msくらいまで高速にできました。

とはいえ、やりたいことはこれではなく、この続きでまた躓くことになるのです。
(PostgresではなくPipelineDBの不具合によって..)

参照

Kinesis Analyticsを実データで試して気になったことメモ

  • このエントリーをはてなブックマークに追加

サンプルは普通に動きますが、サンプルを動かしただけでは実際にハマりどころが分からないので、実際のデータを使ってみるよ。
調べながら書いた自分用のメモなので、非常に見にくいと思います。

https://aws.amazon.com/jp/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/
Kinesis Analyticsというものが発表されまして、Kinesisに対してContinuous Queryを実行できるものです。
端的に言えばNorikraみたいなものです。

PipelineDBにも似ている気はしますが、PipelineDBの最大の特徴である結果を保持しておいてSQLで取り出すという事は出来ないので、個人的には競合にはならなそうですね。
お手軽感で言えばConsumerを既に持っているのであれば、PipelineDBの方に軍配が上がります。(PipelineDBはkinesis connectorも実はあります)
まだ日本リージョンでは使えないので、OregonにKinesis Streamを作り、一部のproductionのデータをそちらにもコピーするようにしました。

気になった事を列挙していきます。

  • Kinesisに流れるデータは、CSVかJSONである必要がある
  • Source Streamを選択すると、実データを元にスキーマを推定して定義を作成する
    • 流れているIdがたまたま小さいと、TINYINTやSMALLINTとして扱われてしまう
  • error_stream という物が存在する
    • 定義したスキーマにそぐわないものは、このstreamに流れる
    • 他にもContinuous QueryでのエラーやRecordのOutOfOrderで流れるらしい
  • スキーマで定義できる型は次の通り
  • スキーマの名前に、
    • hwなどの1文字はNG
      • 属性名が1文字の場合はエラーが出て作成できない.. 😨
      • [a-zA-Z][a-zA-Z0-9_]+ このパターンじゃないと駄目
    • SQL:2008の予約語やKinesisの予約語もNG
      • 列名にtimestmap, rank とか駄目
  • 日本語が文字化ける
    • Raw stream sample ってタブを見ると、きちんと日本語が表示されている
    • スキーマ設定には、Record Formatという項目があって、UTF-8と書かれている(変更はできない模様)
    • Amazon Kinesis Analytics supports only Java single-byte CHARACTER SETs.Documentに書いてある
    • つまりマルチバイト文字駄目って事?
  • Timestamp型として定義する場合、Sourceは文字列, UNIXTIME 等何がOKなのか?
    • UNIXTIMEはNG
    • 文字列だったらOKだった
    • TIMEZONEは常にUTC
  • 定義したスキーマとは別に次の項目が付与される
    • PARTITION_KEY VARCHAR(512)
    • SEQUENCE_NUMBER VARCHAR(128)
    • SHARD_ID VARCHAR(64)
    • APPROXIMATE_ARRIVAL_TIME TIMESTAMP
  • Source Streamへの書き込みを止めていると、Save schema and update samplesのボタンを押してもNo rows in source streamと言われる事がある。
    • 一度通ったスキーマでも、次更新しようとするとNo rows in source streamって言われる。
  • 謎の5.0.1というバージョンが
  • S3のデータをJOIN
    • サンプルにはCompanyNameっていうS3のデータとJOINしているっぽいものがある
    • Consoleから設定はできない。APIを使う必要がある。
      • Amazon Kinesis Analytics console does not support managing reference data sources for your applications. You can use the AWS CLI to add reference data source to your application.
    • S3のデータ、マルチバイト文字列たぶん駄目だと思うけど、まだ試していない。
      • 後述の方法で登録はしてみたものの、エラーが出る。
  • 1個のApplicationにReferenceは1つまで
    • つまり、複数のS3のファイルとJOINする事はできない
  • ポチポチ弄ってると、ApplicationのStateがずっとUpdatingのままになる
    • 20分ほど放置しても、ずっと変わらず。
    • 何も操作できない..😨
  • PUMPのSQLで列名を参照する時にエラーになる
    • Schema定義の時に列名を全て大文字で定義しないと参照できない
      • じゃぁValidationの正規表現、小文字を許すべきではないのでは..😨
    • ダブルクォートで括れば良い

ここまでで、自分のデータでクエリをかけるところまではできた。

が、IAM Roleに権限を追加しても、未だにS3のファイル参照は出来ていない。
ひとまず調査はここまでにして、気になった事をサポートに投げています。

S3のデータのデータを取り込む

awscliを最新の状態にする(試した時は 1.10.56)

s3://bucket*/tamtam-test/kinesis_analytics/devices.csv

deviceType,name
IPHONE,あいぽん
IPOD,あいぽっど
IPAD,あいぱっど
ANDROID,あんどろいど
ANDROID_TABLET,あんどろいどタブレット
# version-idを見る
version_id=$(aws kinesisanalytics describe-application --application-name tamtam-test1 | jq .ApplicationDetail.ApplicationVersionId)

aws kinesisanalytics add-application-reference-data-source \
--application-name tamtam-test1 \
--current-application-version-id ${version_id} \
--reference-data-source "$(cat devices-input.json)"

devices-input.jsonの中身です。一部マスクしてます。
エラーが出て良くわからない場合は、--debugを付けて実行すると良いです。

{
"TableName": "Devices",
"S3ReferenceDataSource": {
"BucketARN": "arn:aws:s3:::***bucket**",
"FileKey": "tamtam-test/kinesis_analytics/devices.csv",
"ReferenceRoleARN": "arn:aws:iam::**account id**:role/service-role/kinesis-analytics-tamtam-test1"
},
"ReferenceSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": ","
}
}
},
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"Name": "deviceType",
"Mapping": "deviceType",
"SqlType": "VARCHAR(64)"
},
{
"Name": "name",
"Mapping": "name",
"SqlType": "VARCHAR(64)"
}
]
}
}

Real-time Analyticsのタブには出てくるんだけど、中身を見ようとするとエラーになる。

参考) http://docs.aws.amazon.com/kinesisanalytics/latest/dev/app-add-reference-data.html

PUMPとか


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (deviceModel VARCHAR(128), deviceModelCount INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
deviceModel,
COUNT(*) AS deviceModelCount
FROM "SOURCE_SQL_STREAM_001"
GROUP BY deviceModel, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

これで参照できない。
どうやら参照する時に内部的に大文字に変換していて、スキーマ定義の列が大文字ではないので、そんな列はないぞと言われる。
いくつかのパターンを試してみましたが、スキーマ定義は大文字必須のようだ。

追記: 2016/08/14
サポートの人から教えて貰いまして、ダブルクォートで括らないと大文字に変換されるそうです。ダブルクォートで括れば、スキーマ定義が小文字のままでも参照できます。
http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html

追記: 2016/09/04
何も操作できなかった問題は、GUIで色々と弄っていたらIAM Role(のポリシー)のPermissionがおかしくなってました。
しかも1つのApplicationに複数のポリシーが紐付いていて、その状態だと画面を開く度にポリシーのバージョンが更新されていってしまう。。
これはもうGUIの不具合という事で片付けて、深追いしないことにしました。

メモ

CURRENT_ROW_TIMESTAMP
W3C_LOG_PARSE

RANDOM_CUT_FOREST
http://jmlr.org/proceedings/papers/v48/guha16.pdf

IDFAが起動する度に変わる件について

  • このエントリーをはてなブックマークに追加

iOSアプリのテストをしていて、起動時にIDFAが毎回変わっている気がする。
これではターゲティングが出来ない😨 という事で、調べてみたところ、以下のページを見つけました。

http://dev.tapjoy.com/faq/testflight-idfa-and-testing-tapjoy/
http://stackoverflow.com/questions/26648840/ios-testflight-beta-app-get-new-advertising-identifier-in-each-run

Namely, each time a Testflight-distributed app asks for the the Identifier for Advertisers (IDFA), it will get a different IDFA.

TestFlightの場合、IDFAを取得する度に異なるIDが返ってくると.. まじかよっ😨
アプリでは起動時にIDFAを取得してキャッシュしていたので、起動する度に変わっていたように見えていただけなのね。

Stream Processing Casual Talksで発表しました

  • このエントリーをはてなブックマークに追加

こちらの勉強会です。
http://connpass.com/event/35264/

参加者がガチ勢ばっかりだったので、マジびびる..
途中で海外のMeetup方式になってマジびびる…

スライドはここに置いておきます。
http://www.slideshare.net/tamrin69/pipelinedb

streamのpostgres上の実装とか、continuous viewはviewなので実体が別に_mrelテーブル(とシーケンステーブル)が存在するとか言いたかったのですが、時間が無くなってしまい殆ど駆け足でした。
知りたい事があれば個別に聞いてください。
また、弊社のランチを食べながら話をしたいという方もTwitterでコンタクトくださいませ。

一部の人にはStream Processingよりも重複排除の方が引っかかったようで、発表後や懇親会ではそういうお話が出来て楽しかったです。

個人的には、予測モデルの生成のような難しい事をするのであればそれに適したミドルウェアを使えば良いと思いますが、単純にKPIを素早く出したいだけだったらPipelineDBが優れているなと思いました。
(そもそも同じ土俵で戦う製品では無いし)

主催者の方、および会場提供のY!さんありがとうございました。
ほんとガチな人しか居なかったし、発表もガチな内容しか無かったので楽しかったです。

Lavernaの開発環境を整える

  • このエントリーをはてなブックマークに追加

LavernaはElectronベースのOSSです。
web-sequence図やdot形式の図が描画できたら良いなぁと思って、ちょっと開発環境を整えてみます。

READMEにある通りですが、

npm install -g bower
npm install -g gulp

git clone git@github.com:Laverna/laverna.git
cd laverna

npm install
bower install
(cd test && bower install)

みたいな感じです。

起動する時は、以下のように起動します。

electron .

この場合はブラウザが立ち上がりますが、アプリとして起動する場合は、以下のように--noServerを付けます。

electron . --noServer

一番最初に --noServerを付けて起動するとアプリの中身が何も表示されなかったので、最初は普通に起動する必要があるっぽいです。
開発モード(Inspectorとか表示される)をONにして起動する場合は、--devをつけます。

electron . --noServer --dev

修正したソースが反映されない

Macの場合、~/Library/Application Support/laverna/にキャッシュが保存されているので、これを消します。
Localeを編集した時、一向に反映される気配が無くて、インストールし直してもビルドし直しても駄目で、これが原因でした。