Google Cloud Composer using google-cloud-bigquery python client library
Google Cloud Composer using google-cloud-bigquery python client library
I'm trying to run a DAG in Google Cloud Composer in which the first component is to use a http GET request to call an API and then use the python-client library to insert the json into a BigQuery table. I am trying to run this function: https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.insert_rows_json.html
import requests
import datetime
import ast
import numpy as np
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
import google.cloud.bigquery as bigquery
client = bigquery.Client(project = 'is-flagship-data-api-sand')
dataset_id = 'Mobile_Data_Test'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table('sample_wed')
table = client.get_table(table_ref)
def get_localytics_data():
profiles_requests_command = "https://%s:%s@api.localytics.com/v1/exports/profiles/%d/profile"%(api_key,api_secret,28761)
res_profiles = requests.get(profiles_requests_command)
if res_profiles.status_code == 200:
data = res_profiles.content
data_split = data.split('n')[:-1]
data_split_ast = [ast.literal_eval(x) for x in data_split]
#take out characters from the beginning to have neat columns
data_split_ast_pretty = [dict(zip(map(lambda x: x[4:], item.keys()), item.values())) for item in data_split_ast]
#add current date
current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
for item in data_split_ast_pretty:
item['DateCreated'] = current_time
random_sample = list(np.random.choice(data_split_ast_pretty,5))
print random_sample
client.insert_rows_json(table = table, json_rows = random_sample)
else:
pass
run_api = python_operator.PythonOperator(task_id='call_api',
python_callable=get_localytics_data)
I added the PYPI Packages of :
requests ===2.19.1
numpy ===1.12.0
google-cloud-bigquery ===1.4.0
I get the error of : Broken DAG: [/home/airflow/gcs/dags/composer_test_july30_v2.py] 'Client' object has no attribute 'get_table'
in the Airflow UI Console.
All the code shown works locally but will not work using Cloud Composer.
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
What version of Cloud Composer are you using? You can get this from describing the environment.
– Trevor Edwards
17 mins ago