import logging
import json
from datetime import datetime
import pandas as pd
import time
import traceback
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText
from apache_beam import (
DoFn,
io,
ParDo,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.runners import DataflowRunner
import csv
import google.auth
project_auth = google.auth.default()[1]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
pipeline_options = pipeline_options.PipelineOptions(
flags = {},
project = project_auth,
region= 'us-central1',
staging_location = "%s/staging" % "data604-project-g3-data"
)
table_schema = {
"fields":[
{'name': "user_id", 'type':"STRING", 'mode':"NULLABLE"},
{'name': "phone_number", 'type':"STRING", 'mode':"NULLABLE"},
{'name': "incoming",'type': "BOOLEAN", 'mode':"NULLABLE"},
{"name": "registered", "type": "BOOLEAN", "mode": "NULLABLE"},
{'name': "count" , 'type' : "INTEGER" , 'mode' : 'NULLABLE'}
]
}
table = "data604-project-g3:Footprint_data.calls"
def define_key(element):
import json
del element['time']
return (json.dumps(element), element)
def expand(element):
import json
key, value = element
dictionary = json.loads(key)
dictionary['count'] = value
return dictionary
def parse_csv(line):
import csv
return next(csv.reader([line]))
call_data_p = beam.Pipeline(DataflowRunner(), options = pipeline_options)
call_data = (call_data_p | "Read" >> beam.io.ReadFromText('gs://data604-project-g3-data/calls.json')
| "Parse JSON" >> beam.Map(json.loads)
| "Define_Key" >> beam.Map(define_key)
| 'Count elements per key' >> beam.combiners.Count.PerKey()
| 'Turn key back to content' >> beam.Map(expand)
)
call_data | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=table_schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_TRUNCATE)
call_data_p.run()
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>) WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>) WARNING: The directory '/home/jupyter/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag. INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/submission_environment_dependencies.txt... INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/submission_environment_dependencies.txt in 0 seconds. INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/pipeline.pb... INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/pipeline.pb in 0 seconds. INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job clientRequestId: '20241127230020917062-9738' createTime: '2024-11-27T23:00:21.527581Z' currentStateTime: '1970-01-01T00:00:00Z' id: '2024-11-27_15_00_21-2354144757739365416' location: 'us-central1' name: 'beamapp-root-1127230020-915980-p7pejyxo' projectId: 'data604-project-g3' stageStates: [] startTime: '2024-11-27T23:00:21.527581Z' steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2024-11-27_15_00_21-2354144757739365416] INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2024-11-27_15_00_21-2354144757739365416 INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2024-11-27_15_00_21-2354144757739365416?project=data604-project-g3
<DataflowPipelineResult <Job clientRequestId: '20241127230020917062-9738' createTime: '2024-11-27T23:00:21.527581Z' currentStateTime: '1970-01-01T00:00:00Z' id: '2024-11-27_15_00_21-2354144757739365416' location: 'us-central1' name: 'beamapp-root-1127230020-915980-p7pejyxo' projectId: 'data604-project-g3' stageStates: [] startTime: '2024-11-27T23:00:21.527581Z' steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7fdf2498a7a0>
def define_key(element):
import json
del element['time']
return (json.dumps(element), element)
def expand(element):
import json
key, value = element
dictionary = json.loads(key)
dictionary['count'] = value
return dictionary
def parse_csv(line):
import csv
return next(csv.reader([line]))
with beam.Pipeline(runner='DirectRunner') as pipeline:
lines = pipeline | beam.io.ReadFromText('gs://data604-project-g3-data/calls.json')
parsed_lines = lines | beam.Map(json.loads)
keyed_lines = parsed_lines | "Define_Key" >> beam.Map(define_key)
counted = keyed_lines | 'Count elements per key' >> beam.combiners.Count.PerKey()
result = counted | 'Turn key back to content' >> beam.Map(expand)
result | "Write To BigQuery" >> WriteToBigQuery(
table=table, schema=table_schema,
custom_gcs_temp_location = "gs://data604-project-g3-data/.cache",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
)
result = pipeline.run()
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>) WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>) INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600 INFO:apache_beam.io.gcp.bigquery_file_loads:Load job has 1 files. Job name is beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0. INFO:apache_beam.io.gcp.bigquery_file_loads:Triggering job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 to load data to BigQuery table <TableReference datasetId: 'Footprint_data' projectId: 'data604-project-g3' tableId: 'calls'>.Schema: {'fields': [{'name': 'user_id', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'phone_number', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'incoming', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'registered', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'count', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}. Additional parameters: {}. Source format: NEWLINE_DELIMITED_JSON INFO:apache_beam.io.gcp.bigquery_tools:Started BigQuery job: <JobReference jobId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0' location: 'US' projectId: 'data604-project-g3'> bq show -j --format=prettyjson --project_id=data604-project-g3 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: RUNNING INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: DONE INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600 INFO:apache_beam.io.gcp.bigquery_file_loads:Load job has 1 files. Job name is beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0. INFO:apache_beam.io.gcp.bigquery_file_loads:Triggering job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 to load data to BigQuery table <TableReference datasetId: 'Footprint_data' projectId: 'data604-project-g3' tableId: 'calls'>.Schema: {'fields': [{'name': 'user_id', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'phone_number', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'incoming', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'registered', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'count', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}. Additional parameters: {}. Source format: NEWLINE_DELIMITED_JSON INFO:apache_beam.io.gcp.bigquery_tools:Started BigQuery job: <JobReference jobId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0' location: 'US' projectId: 'data604-project-g3'> bq show -j --format=prettyjson --project_id=data604-project-g3 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: RUNNING INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: DONE
result
<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fdf2cdb8eb0>