Kinesis Analyticsを実データで試して気になったことメモ

  • このエントリーをはてなブックマークに追加

サンプルは普通に動きますが、サンプルを動かしただけでは実際にハマりどころが分からないので、実際のデータを使ってみるよ。
調べながら書いた自分用のメモなので、非常に見にくいと思います。

https://aws.amazon.com/jp/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/
Kinesis Analyticsというものが発表されまして、Kinesisに対してContinuous Queryを実行できるものです。
端的に言えばNorikraみたいなものです。

PipelineDBにも似ている気はしますが、PipelineDBの最大の特徴である結果を保持しておいてSQLで取り出すという事は出来ないので、個人的には競合にはならなそうですね。
お手軽感で言えばConsumerを既に持っているのであれば、PipelineDBの方に軍配が上がります。(PipelineDBはkinesis connectorも実はあります)
まだ日本リージョンでは使えないので、OregonにKinesis Streamを作り、一部のproductionのデータをそちらにもコピーするようにしました。

気になった事を列挙していきます。

  • Kinesisに流れるデータは、CSVかJSONである必要がある
  • Source Streamを選択すると、実データを元にスキーマを推定して定義を作成する
    • 流れているIdがたまたま小さいと、TINYINTやSMALLINTとして扱われてしまう
  • error_stream という物が存在する
    • 定義したスキーマにそぐわないものは、このstreamに流れる
    • 他にもContinuous QueryでのエラーやRecordのOutOfOrderで流れるらしい
  • スキーマで定義できる型は次の通り
  • スキーマの名前に、
    • hwなどの1文字はNG
      • 属性名が1文字の場合はエラーが出て作成できない.. 😨
      • [a-zA-Z][a-zA-Z0-9_]+ このパターンじゃないと駄目
    • SQL:2008の予約語やKinesisの予約語もNG
      • 列名にtimestmap, rank とか駄目
  • 日本語が文字化ける
    • Raw stream sample ってタブを見ると、きちんと日本語が表示されている
    • スキーマ設定には、Record Formatという項目があって、UTF-8と書かれている(変更はできない模様)
    • Amazon Kinesis Analytics supports only Java single-byte CHARACTER SETs.Documentに書いてある
    • つまりマルチバイト文字駄目って事?
  • Timestamp型として定義する場合、Sourceは文字列, UNIXTIME 等何がOKなのか?
    • UNIXTIMEはNG
    • 文字列だったらOKだった
    • TIMEZONEは常にUTC
  • 定義したスキーマとは別に次の項目が付与される
    • PARTITION_KEY VARCHAR(512)
    • SEQUENCE_NUMBER VARCHAR(128)
    • SHARD_ID VARCHAR(64)
    • APPROXIMATE_ARRIVAL_TIME TIMESTAMP
  • Source Streamへの書き込みを止めていると、Save schema and update samplesのボタンを押してもNo rows in source streamと言われる事がある。
    • 一度通ったスキーマでも、次更新しようとするとNo rows in source streamって言われる。
  • 謎の5.0.1というバージョンが
  • S3のデータをJOIN
    • サンプルにはCompanyNameっていうS3のデータとJOINしているっぽいものがある
    • Consoleから設定はできない。APIを使う必要がある。
      • Amazon Kinesis Analytics console does not support managing reference data sources for your applications. You can use the AWS CLI to add reference data source to your application.
    • S3のデータ、マルチバイト文字列たぶん駄目だと思うけど、まだ試していない。
      • 後述の方法で登録はしてみたものの、エラーが出る。
  • 1個のApplicationにReferenceは1つまで
    • つまり、複数のS3のファイルとJOINする事はできない
  • ポチポチ弄ってると、ApplicationのStateがずっとUpdatingのままになる
    • 20分ほど放置しても、ずっと変わらず。
    • 何も操作できない..😨
  • PUMPのSQLで列名を参照する時にエラーになる
    • Schema定義の時に列名を全て大文字で定義しないと参照できない
      • じゃぁValidationの正規表現、小文字を許すべきではないのでは..😨
    • ダブルクォートで括れば良い

ここまでで、自分のデータでクエリをかけるところまではできた。

が、IAM Roleに権限を追加しても、未だにS3のファイル参照は出来ていない。
ひとまず調査はここまでにして、気になった事をサポートに投げています。

S3のデータのデータを取り込む

awscliを最新の状態にする(試した時は 1.10.56)

s3://bucket*/tamtam-test/kinesis_analytics/devices.csv

deviceType,name
IPHONE,あいぽん
IPOD,あいぽっど
IPAD,あいぱっど
ANDROID,あんどろいど
ANDROID_TABLET,あんどろいどタブレット
# version-idを見る
version_id=$(aws kinesisanalytics describe-application --application-name tamtam-test1 | jq .ApplicationDetail.ApplicationVersionId)

aws kinesisanalytics add-application-reference-data-source \
--application-name tamtam-test1 \
--current-application-version-id ${version_id} \
--reference-data-source "$(cat devices-input.json)"

devices-input.jsonの中身です。一部マスクしてます。
エラーが出て良くわからない場合は、--debugを付けて実行すると良いです。

{
"TableName": "Devices",
"S3ReferenceDataSource": {
"BucketARN": "arn:aws:s3:::***bucket**",
"FileKey": "tamtam-test/kinesis_analytics/devices.csv",
"ReferenceRoleARN": "arn:aws:iam::**account id**:role/service-role/kinesis-analytics-tamtam-test1"
},
"ReferenceSchema": {
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\n",
"RecordColumnDelimiter": ","
}
}
},
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"Name": "deviceType",
"Mapping": "deviceType",
"SqlType": "VARCHAR(64)"
},
{
"Name": "name",
"Mapping": "name",
"SqlType": "VARCHAR(64)"
}
]
}
}

Real-time Analyticsのタブには出てくるんだけど、中身を見ようとするとエラーになる。

参考) http://docs.aws.amazon.com/kinesisanalytics/latest/dev/app-add-reference-data.html

PUMPとか


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (deviceModel VARCHAR(128), deviceModelCount INTEGER);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
deviceModel,
COUNT(*) AS deviceModelCount
FROM "SOURCE_SQL_STREAM_001"
GROUP BY deviceModel, FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') SECOND / 10 TO SECOND);

これで参照できない。
どうやら参照する時に内部的に大文字に変換していて、スキーマ定義の列が大文字ではないので、そんな列はないぞと言われる。
いくつかのパターンを試してみましたが、スキーマ定義は大文字必須のようだ。

追記: 2016/08/14
サポートの人から教えて貰いまして、ダブルクォートで括らないと大文字に変換されるそうです。ダブルクォートで括れば、スキーマ定義が小文字のままでも参照できます。
http://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html

追記: 2016/09/04
何も操作できなかった問題は、GUIで色々と弄っていたらIAM Role(のポリシー)のPermissionがおかしくなってました。
しかも1つのApplicationに複数のポリシーが紐付いていて、その状態だと画面を開く度にポリシーのバージョンが更新されていってしまう。。
これはもうGUIの不具合という事で片付けて、深追いしないことにしました。

メモ

CURRENT_ROW_TIMESTAMP
W3C_LOG_PARSE

RANDOM_CUT_FOREST
http://jmlr.org/proceedings/papers/v48/guha16.pdf