GA4をAPI経由でスプレッドシートにアップするスクリプト、Python版も作成しました。作成した時期はGAS版よりもこちらが先になります。
いろんな方のスクリプトを調べながら試行錯誤しながら作ったので、元ネタは何だったかわからなくなってしまいました。
大変申し訳ないです。
やってることはGAS版と同じで、測定期間も前月の初日〜末日にしていますが、この時は「○○日前〜○○日前」の形にわざわざ変換してから設定しています。
理由は不明。なんでこんなことしたんだろ?
# 今日を取得
today = datetime.datetime.today()
# 当月1日の値を出す
thismonth = datetime.datetime(today.year, today.month, 1)
# 前月1日の値を出す (2024/1/3修正)
# lastmonth_1st = datetime.datetime(today.year, today.month -1, 1)
if today.month == 1:
lastmonth_1st = datetime.datetime(today.year -1, 12, 1)
else:
lastmonth_1st = datetime.datetime(today.year, today.month -1, 1)
# 前月末日の値を出す
lastmonth_end = thismonth + datetime.timedelta(days=-1)
class GoogleAnalytics4Report:
def __init__(self, propertyId):
self._propertyId = propertyId
self._metrics = []
self._dimensions = []
self._dateRange = [startdate, enddate]
self._filters = None
self._result = []
フィルタ処理(↓下記スクリプト)はAPIでは行わず、Python に取り込んだ後のデータで行いました。面倒くさかったのでw
こういうとき「Pythonって楽でいいなー」と思います。
GASの場合はデータ取得・送信時のデータ変換があるため、GAS上でのデータ加工は、初級レベルの自分にはかなり面倒に感じました。
さてところで、フィルタ2つめ「ランディングページが /movie/ を含む」を設定していて初めて知ったのですが、Pythonの「.str.contains」って「含む」じゃなくて「正規表現一致」だったんですね。こいつはありがたい!
# フィルタ処理はAPI上ではなく、Python取り込み後のデータで実施
df = df[df['デフォルトチャネルグループ'].str.contains('Organic')]
df = df[df['ランディングページ'].str.contains('\/movie\/')]
df = df[df['セッション'] >= 10]
あと、Python版では実行ログも出力するようにしました。なんとなく。
# スクリプト実行ログ
sh_log = ss.worksheet("スクリプト実行ログ")
next_row2 = next_available_row(sh_log)
log_text = 'スクリプト実行 : ' + str(datetime.datetime.now())
sh_log.update_cell(next_row2, 1, log_text)
それ以外はだいたいGAS版と同じ考え方です。
全体では、下記のようになります。
以下の注意書きは前回のGAS版のときにも書きましたが、一応こちらにも書いておきます。
スクリプトを実行すると、シート名「YYYY-MM-DD〜YYYY-MM-DD」の新規シートを作成し、前月のレポートを出力します。
もし同名のシートが作成済みの場合は、そのまま上書きしてしまうのでご注意ください。
(同一シートの末尾にデータ追加していくスクリプトも作成可能)なお、定期実行される場合は、毎月3日0時ぐらいに設定するのをお勧めします。データ反映に24時間以上かかっている事例を複数見ていますので、くれぐれもご注意ください。
▼今回作成したスクリプト(全体)
#!/usr/local/bin/python3.7
# coding:utf-8
from google.analytics.data_v1beta.types import DateRange
from google.analytics.data_v1beta.types import Dimension
from google.analytics.data_v1beta.types import Filter
from google.analytics.data_v1beta.types import FilterExpression
from google.analytics.data_v1beta.types import FilterExpressionList
from google.analytics.data_v1beta.types import Metric
from google.analytics.data_v1beta.types import RunReportRequest
from google.analytics.data_v1beta import BetaAnalyticsDataClient
import datetime
import time
import pandas as pd
import os
import gspread
from google.oauth2.service_account import Credentials
from gspread_dataframe import get_as_dataframe, set_with_dataframe
# 今日を取得
today = datetime.datetime.today()
# 当月1日の値を出す
thismonth = datetime.datetime(today.year, today.month, 1)
# 前月1日の値を出す (2024/1/3修正)
# lastmonth_1st = datetime.datetime(today.year, today.month -1, 1)
if today.month == 1:
lastmonth_1st = datetime.datetime(today.year -1, 12, 1)
else:
lastmonth_1st = datetime.datetime(today.year, today.month -1, 1)
# 前月末日の値を出す
lastmonth_end = thismonth + datetime.timedelta(days=-1)
startdate = str((today - lastmonth_1st).days) + 'daysAgo'
enddate = str((today - lastmonth_end).days) + 'daysAgo'
class GoogleAnalytics4Report:
def __init__(self, propertyId):
self._propertyId = propertyId
self._metrics = []
self._dimensions = []
self._dateRange = [startdate, enddate]
self._filters = None
self._result = []
def setMetrics(self, metrics):
self._metrics = metrics
return self
def setDimensions(self, dimensions):
self._dimensions = dimensions
return self
def setDateRange(self, dateRange):
self._dateRange = dateRange
return self
def setFilters(self, **kwargs):
self._filters = kwargs
return self
def run(self, client):
def filterExprBuilder(filters):
if isinstance(filters, list):
return FilterExpressionList(expressions=[FilterExpression(filter=filter) for filter in filters])
return FilterExpression(filter=filters)
dimension_filter = None
if self._filters is not None:
dimension_filter = FilterExpression(
**dict(map(lambda filter: (filter[0], filterExprBuilder(filter[1]) if filter[0] != 'filter' else filter[1]), self._filters.items()))
)
request = RunReportRequest(
property=f"properties/{self._propertyId}",
metrics=[Metric(name=name) for name in self._metrics],
dimensions=[Dimension(name=name) for name in self._dimensions],
date_ranges=[DateRange(start_date=self._dateRange[0], end_date=self._dateRange[1])],
dimension_filter=dimension_filter
)
response = client.run_report(request)
print(f"{response.row_count} rows received")
self._result = [self._parseRow(row) for row in response.rows]
return self
def getResult(self):
return self._result
def getRecords(self):
columns = [('dimensions', col) for col in self._dimensions]
columns.extend([('metrics', col) for col in self._metrics])
return [dict([(name, item[i][name]) for i, name in columns]) for item in self._result]
def _parseRow(self, row):
dimensions = self._dimensions
metrics = self._metrics
return {
'dimensions':dict([(name, row.dimension_values[i].value) for i, name in enumerate(self._dimensions)]),
'metrics':dict([(name, float(row.metric_values[i].value)) for i, name in enumerate(self._metrics)])
}
# アカウント設定
# credential.jsonを/credentialに設置 && GA4 プロパティIDを設定
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/users/2/lolipop.jp-dp********/web/python/**********.json'
GA_PROPERTY_ID = **********
client = BetaAnalyticsDataClient()
# スプレッドシート設定
scope = ['https://www.googleapis.com/auth/spreadsheets','https://www.googleapis.com/auth/drive']
credentials = Credentials.from_service_account_file("/home/users/2/lolipop.jp-dp20084101/web/python/**********.json", scopes=scope)
gs = gspread.authorize(credentials)
SPREADSHEET_KEY = "******************************"
ss = gs.open_by_key(SPREADSHEET_KEY)
def next_available_row(sheet):
str_list = list(filter(None, sheet.col_values(1)))
return str(len(str_list)+1)
# レポート生成 → スプレッドシートへの送信
def job():
# GA4データ取得
ga4Report = GoogleAnalytics4Report(GA_PROPERTY_ID).setDimensions(
dimensions=['sessionDefaultChannelGroup','landingPage']
).setMetrics(
metrics=['sessions','engagementRate']
).setDateRange(
dateRange=[startdate, enddate]
).run(client)
df = pd.DataFrame.from_records(ga4Report.getRecords())
# ディメンション名、指標名を日本語化
df.columns = ['デフォルトチャネルグループ','ランディングページ','セッション', 'エンゲージメント率']
title = str(lastmonth_1st.strftime('%Y年%m月%d日')) + '〜' + str(lastmonth_end.strftime('%Y年%m月%d日'))
# フィルタ処理はAPI上ではなく、Python取り込み後のデータで実施
df = df[df['デフォルトチャネルグループ'].str.contains('Organic')]
df = df[df['ランディングページ'].str.contains('\/movie\/')]
df = df[df['セッション'] >= 10]
# スプシに送信(同名のシートがあったときはクリアして上書き)
try:
sh_ga4 = ss.add_worksheet(title=title,rows=1000,cols=20)
except:
sh_ga4 = ss.worksheet(title)
sh_ga4.clear()
set_with_dataframe(sh_ga4, df)
# スクリプト実行ログもつけてみた
sh_log = ss.worksheet("スクリプト実行ログ")
next_row2 = next_available_row(sh_log)
log_text = 'スクリプト実行 : ' + str(datetime.datetime.now())
sh_log.update_cell(next_row2, 1, log_text)
job()
GA4をBigQuery経由でデータ取得したバージョン(Python版)もありますので、後日アップします。