データの抽出
- English
- 日本語
重要 :このガイドは、Next-Gen WAF コンソールにアクセスできる Next-Gen WAF のお客様のみが対象です。
Next-Gen WAF は、攻撃や異常を含むリクエストを保存しますが、いくつかの条件が適用されます。このようなデータを一括で抽出して自社システムに取り込みたい場合、Fastly は最新データのフィードを利用できるリクエストフィード API エンドポイントを提供します。これは、たとえば1時間ごとの cron での呼び出しに適しています。
この機能は通常、セキュリティ・オペレーション・センター (SOC) チームが、Datadog や ELK などの商用システムのセキュリティ情報イベント管理 (SIEM) ソリューションにデータを自動的にインポートするために使用します。
データ抽出と検索の比較
Fastly では、リクエストデータを検索するための API エンドポイントが複数に分離されています。これはデータの一括抽出ではなく、特定の基準を満たすリクエストを特定することを意図した仕様です。
| 機能 | データ抽出 | 検索 |
| エンドポイントパス | /feed/requests | /requests |
| 返されたリクエスト | シグナル別にフィルタリングされたデータすべて | クエリシンタックス別に指定されたデータのみ |
| 最大件数 | 一度に1000件のすべてのリクエスト | 1000 件のリクエスト |
| 時間枠 | limit クエリパラメータに従ってページネーションされた、指定期間内のすべてのリクエスト | 一度に最大7日分のリクエスト |
| データ保持 | リクエストログデータ : 最大7日間。指標データ : 最大30日間。実際の保持期間は、権限に応じて異なります。 | リクエストログデータ : 最大7日間。指標データ : 最大30日間。実際の保持期間は、権限に応じて異なります。 |
注意 : エンドポイントが異なれば、保持アクセス権も異なる場合があります。
時間の制限
このエンドポイントを使用する場合、以下の制限が適用されます。
untilパラメータの最大値は過去5分です。これは、データパイプラインが受信リクエストを処理するために必要な時間です。詳細は以下をご覧ください。fromパラメータの最小値は過去24時間5分です。fromおよびuntilパラメータは、分単位で設定する必要があります。fromおよびuntilパラメータには、秒単位の詳細情報を含む Unix タイムスタンプが必要です (例:1445437680)。
遅延データ
実行中のすべてのエージェントのデータを収集して集計し、システムにデータを取り込み、分析して強化する時間を確保するために、5分間の遅延を設けています。この5分間の遅延は、タイムリーかつ完全なデータとトレードオフの関係にあります。
ページネーション
このエンドポイントは、一度に1,000件のリクエスト、または limit クエリパラメータで指定されたサイズのデータを返します。指定された時間内に1,000件を超えるリクエスト (デフォルト) または limit パラメータで定義された件数を超えるリクエストが含まれる場合、次のバッチを取得するための next URL が提供されます。各 next URL は、生成されてから1分間有効です。
取得したデータのサイズは、場合によって大きな差があります。URL のサイズ制限を超えないようにするには、Content-Type を application/x-www-form-urlencoded にして、 next パラメータとその値を POST リクエストの POST パラメータとして送信します。
並べ替え順序
データウェアハウジングの実装により、指定された時間内はこのエンドポイントから完全なデータが返されますが、ソートされる保証はありません。指定された時間内のすべてのデータが蓄積されたら、必要に応じて timestamp フィールドを使用して順序を並べ替えることができます。
レート制限
このエンドポイントへの同時接続の制限 :
各サイト (ワークスペース) につき2件
各企業 (アカウント) につき5件
使用例
このエンドポイントを使用する一般的な方法は、毎時5分に実行され、過去1時間分のデータを取得する cron を設定することです。以下の例では、過去1時間分の開始タイムスタンプと終了タイムスタンプを計算し、その結果を使用して API を呼び出します。
Python
import requestsimport osimport jsonimport calendarfrom datetime import datetime, timedelta, timezone
# Set up environment variablesNGWAF_EMAIL = os.getenv('NGWAF_USER_EMAIL')NGWAF_TOKEN = os.getenv('NGWAF_TOKEN')NGWAF_CORP = os.getenv('CORP_NAME')NGWAF_SITE = os.getenv('SITE_NAME')
if not NGWAF_EMAIL or not NGWAF_TOKEN or not NGWAF_CORP or not NGWAF_SITE: raise EnvironmentError("Please set NGWAF_EMAIL, NGWAF_TOKEN, NGWAF_CORP, and NGWAF_SITE environment variables.")
# Base URL for the APIbase_url = 'https://dashboard.signalsciences.net/api/v0'
# Set up headers with authenticationheaders = { 'x-api-user': NGWAF_EMAIL, 'x-api-token': NGWAF_TOKEN}
# Calculate UTC timestamps for the previous full houruntil_time = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)from_time = until_time - timedelta(hours=1)until_time = calendar.timegm(until_time.utctimetuple())from_time = calendar.timegm(from_time.utctimetuple())
# Set up the initial URL for the GET requestget_url = f'{base_url}/corps/{NGWAF_CORP}/sites/{NGWAF_SITE}/feed/requests?from={from_time}&until={until_time}'
# Debugging: print the URL and timestampsprint(f"Fetching data from: {get_url}")print(f"from_time: {from_time}, until_time: {until_time}")
def fetch_paginated_data(url): data_list = [] next_value = None
response_raw = requests.get(url, headers=headers) if response_raw.status_code != 200: raise RuntimeError(f"Failed to fetch data from {url}. Status Code: {response_raw.status_code}")
response = response_raw.json() data_list.extend(response.get('data', []))
next_uri = response.get('next', {}).get('uri', '')
while next_uri: next_value = next_uri.split('next=')[-1] # All subsequent requests are POSTs with pagination post_url = f'{base_url}/corps/{NGWAF_CORP}/sites/{NGWAF_SITE}/feed/requests' post_data = {'next': next_value} headers['Content-Type'] = 'application/x-www-form-urlencoded' post_response_raw = requests.post(post_url, headers=headers, data=post_data)
if post_response_raw.status_code != 200: raise RuntimeError(f"Failed to fetch paginated data from {post_url}. Status Code: {post_response_raw.status_code}")
post_response = post_response_raw.json() data_list.extend(post_response.get('data', []))
next_uri = post_response.get('next', {}).get('uri', '') if not next_uri: break
next_value = next_uri.split('next=')[-1]
return data_list
# Fetch datadata = fetch_paginated_data(get_url)
# Output the data or save to a file, etc.print(json.dumps(data, indent=4))