2022-09-05

Amazon Athenaをgolangで使ってみる

1010real / okamotti

はじめに

こんにちは。フロントエンド/SETエンジニアの@1010realです。 最近はTypeScriptよりも、goを書いている時間のほうが増えてきました。

今回は、新機能の開発において、Amazon Athenaをgolangから使う機会があったので、それについてまとめてみました。

Athenaとは?

Amazon AthenaはAmazon S3内のデータに対し、RDSライクにSQLクエリを実行して結果を取得できるクエリサービスです。社内のエンジニアに聞いたところ、割と枯れているサービスらしいですね。 私は恥ずかしながら、存じ上げませんでした。

特徴

以下のような特徴を持っています。 - サーバレス(S3に対して直接クエリを実行できる) - Amazon RDSのようにデータベースが常時立ち上がっている必要がない - クエリでスキャンしたデータに対する従量課金

また今回開発した機能に対するユースケースは以下のようなものでした。 - 高頻度で常に送られるログを貯めて置かなければならない - ログを取り出す頻度はそれほど多くない(必要になったときのみ。週に1度あれば多い方) - 最新のログから数ヶ月前までを一気に取得したい(データを検索できる必要がある)

上記を加味した上で社内のSREチームと協議した結果、S3にデータを貯めておいて、必要なときにAthenaで検索したら、要件を満たした上でコストも抑えられるのではないかということで採用に至りました。

Athenaの挙動について

設定について解説する前に、Athenaにおける、SQLクエリの実行から結果出力までの挙動をまとめておきます。 Athenaではクエリが発行されると、非同期で処理を行い、結果をS3に書き込みます。実行完了を待って、出力されたファイルにアクセスすることでクエリ結果を確認できます。

使い方

データさえ用意できれば、あとは超簡単です。 1. S3にデータを保存します。 CSVでもJSONでもParquetでも良いです。GUIでテーブル作成時に指定できる形式はこんな感じです。 athena-data-format 2. クエリ結果の保存先(S3バケット)を指定します。(※1.で保存したデータとは別のバケットを指定) athena-create-table 3. テーブルを作成します。 GUI or SQLでテーブルを作成します。GUIの場合はAmazon Athena > クエリエディタ > エディタタブ内の作成ボタンから、S3バケットデータを選択して作成を開始します。 athena-create-table-gui 一部の複雑なテーブル定義はSQLからしか作れないものもありますが、GUIから作成しても最終的にSQLが作成されて実行されるので、そこから編集すればよいかと思います。 SQLでテーブルを作るには以下のように、クエリエディタ内でCREATE文を貼って実行すればOKです。 athena-outputlocation-setting 4. クエリを実行します。 athena-select-query 5. 出力先S3バケットを確認します。デフォルトではUnsavedフォルダ配下に日付で区切られて結果が保存されていると思います。 athena-output-csv

aws-sdk-goで利用する

ここまでの設定が問題なくできていて、クエリ結果が取得できていれば、golangからAthenaを使う準備は完了です。 取り急ぎ、動かしてみたいという方は、以下と同じコードをこちらに 上げてあるので、README.md観ながら動かしてみてください。 AWSのクレデンシャルを環境変数にexportした後、OUTPUT_LOCATION と SQL(queryString)を適切に設定すればOKなはずです。(regionだけap-northeast-1 でハードコードしちゃったので必要ならそこも変更してください。

サンプルコード

package main

import (
    "errors"
    "fmt"
    "os"
    "strings"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

// todo: replace output location (one of S3 URI)
const OUTPUT_LOCATION = "s3://xxxxx"

// todo: replace Athena Query (SQL for Athena)
var queryString = strings.Join([]string{
    "SELECT *",
    "FROM \"database_name_for_athena\".\"table_name_for_athena\"",
    "LIMIT 100;"}, " ")

var (
  client *athena.Athena
)

func main() {

    // Athenaクライアントを初期化 (1)
    client = initAthenaClient()

    // athenaクエリ実行 (2)
    resultConf := &athena.ResultConfiguration{
        OutputLocation: aws.String(OUTPUT_LOCATION),
    }

    input := &athena.StartQueryExecutionInput{
        QueryString: &queryString,
        ResultConfiguration: resultConf,
    }

    sqeOutput, err := client.StartQueryExecution(input)
    if err != nil {
        fmt.Println(err.Error())
    }

    // 実行完了を待つ(ステータスを監視) (3)
    executionInput := &athena.GetQueryExecutionInput{
        QueryExecutionId: sqeOutput.QueryExecutionId,
    }
L:

    for {
        gqeOutput, err := client.GetQueryExecution(executionInput)
        if err != nil {
            fmt.Println(err.Error())
        }
        fmt.Printf("%s\n", *gqeOutput.QueryExecution.Status.State) // for debug

        // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
        switch *gqeOutput.QueryExecution.Status.State {
            case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
                time.Sleep(1 * time.Second)
            case athena.QueryExecutionStateSucceeded:
                break L
            case athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled:
            default:
                fmt.Println(errors.New(gqeOutput.String()))
        }
    }

  // 結果を取得 (4)
    var (
        token *string = nil
        maxResult int64 = 50
    )

    for {
        gqrinput := &athena.GetQueryResultsInput{MaxResults: &maxResult, NextToken: token, QueryExecutionId: sqeOutput.QueryExecutionId}

        results, err := client.GetQueryResults(gqrinput)
        if err != nil {
            fmt.Println(err.Error())
        }

        parsedResults, err := parseResults(results, token)
        if err != nil {
            fmt.Println(err.Error())
        }

        // do something for parsedResults
        for _, v := range parsedResults {
            fmt.Println(v)
        }

        // NextTokenがnilなら終了(全結果取得済み)
        token = results.NextToken
        if token == nil {
            break
        }
    }
}

func initAthenaClient() *athena.Athena {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),
        os.Getenv("AWS_SECRET_ACCESS_KEY"),
        "",
    )
    conf := aws.Config{
        Region: aws.String("ap-northeast-1"),
        Credentials: cred,
    }
    sess := session.Must(session.NewSession(&conf))
    return athena.New(sess)
}

type RowData map[string]string

func parseResults(res *athena.GetQueryResultsOutput, token *string) ([]RowData, error) {
    rds := []RowData{}
    rns := make([]string, len(res.ResultSet.ResultSetMetadata.ColumnInfo))

    for i, meta := range res.ResultSet.ResultSetMetadata.ColumnInfo {
        rns[i] = *meta.Name
    }

    for i, row := range res.ResultSet.Rows {
        if i == 0 && token == nil {
            // tokenなし(初回)リクエストの場合、header行が先頭に入ってくるため無視する
            continue
        }

        rd := RowData{}
        for j, data := range row.Data {
            rd[rns[j]] = *data.VarCharValue
        }
        rds = append(rds, rd)
    }

    return rds, nil
}

実行結果

result-aws-sdk-go-athena ※出力されるmapデータは異なると思います。

コードの解説(簡単に)

(1) Athenaクライアントを初期化

credentialを作成し、 athena.New() します。戻り値がclient(*athena.Athena)です。

(2) Athenaクエリ実行

client.StartQueryExecution(input)で、Athenaクエリを実行します。 これに渡す引数inputを実行するSQLとOutputLocationから作っています。

(3) 実行完了を待つ(ステータスを監視)

記事の最初に書きましたが、Athenaは処理を非同期に行い、完了するとCSVが出力される仕組みなので、処理の終了を待ちます。 client.GetQueryExecution(executionInput)で、ステータスを取得することができます。これに渡す引数は、(2)でAthenaクエリを実行した際の戻り値に含まれるQueryExecutionIdを用いて作ります。

(4) Athenaクライアントを初期化

client.GetQueryResults(gqrinput) で結果を取得します。これに渡す引数には、以下が含まれます。 - MaxResults: 一度に取得する件数 - Token: どこから取得するかを示すtoken。最初の呼び出しはnil - QueryExecutionId: Athenaクエリ実行時の識別ID 出力されたCSVの件数がMaxResultsを超えていた場合は、結果と一緒にNextTokenが取得できるので、取得したデータに対する処理が終わったら、このNextTokenを設定して呼び出すと、次のmaxResults件分のデータが取得できます。 全て取得済みであれば、NextTokenはnilで返ってきます。

TIPS

機能開発をする上で得たTIPSを以下に記録しておきます。

データのフラット化

ちなみに、今回使ったデータソースの中にはこんなデータがひたすら入っています sample data

measureDetailsの形式を観てもらいたいのですが、オブジェクトの配列となっていますが、このようなデータをフラット化してテーブル化することもAthenaでは可能です。 他にも、ネストされた配列のフラット化などもできるので、インプットデータの整形などのコストも低くなります。

FYI: https://dev.classmethod.jp/articles/glue-flatten-obj-array/ https://docs.aws.amazon.com/ja_jp/athena/latest/ug/flattening-arrays.html https://docs.aws.amazon.com/ja_jp/athena/latest/ug/rows-and-structs.html

権限が複雑

S3がデータソース、出力先なのと、データカタログとしてGlueも使用しているため、Athena, Glue, S3に対する各種権限が必要です。

本当に必要な権限はこれだけらしいです。 https://paihu.hatenablog.com/entry/2018/11/13/105134

通常のJSONファイルは扱えない

改行区切りのJSONデータなら扱える様です(JSON Lines形式と呼ぶらしい)

まとめ

いろんなフォーマットに対応しているし、実行するのに他のインフラも必要ないので、とにかく分析したいデータを突っ込んで、データベースとテーブルを定義してSQL叩けば、取りたい情報取れちゃうし、出力したCSVをインプットとしてまた使えるので、割と色々できるなと感じています。 もし活用できそうな機会があれば、試してみると良いかもしれません。

最新の記事