In [35]:
import logging
import json
from datetime import datetime
import pandas as pd 
import time
import traceback


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam import (
    DoFn, 
    io,
    ParDo, 
    PTransform,
    WindowInto, 
    WithKeys,
)

from apache_beam.runners import DataflowRunner
import csv
import google.auth
In [36]:
project_auth = google.auth.default()[1] 

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

pipeline_options = pipeline_options.PipelineOptions(
    flags = {},
    project = project_auth, 
    region= 'us-central1', 
    staging_location = "%s/staging" % "data604-project-g3-data"
)
In [37]:
table_schema = { 
    "fields":[ 
        {'name': "user_id", 'type':"STRING", 'mode':"NULLABLE"}, 
        {'name': "phone_number", 'type':"STRING", 'mode':"NULLABLE"}, 
        {'name': "incoming",'type': "BOOLEAN", 'mode':"NULLABLE"},
        {"name": "registered", "type": "BOOLEAN", "mode": "NULLABLE"},
        {'name': "count" , 'type' : "INTEGER" , 'mode' : 'NULLABLE'}
    ]
}
table = "data604-project-g3:Footprint_data.calls"
In [39]:
def define_key(element):
    import json
    del element['time']
    return (json.dumps(element), element)

def expand(element):
    import json
    key, value = element
    dictionary = json.loads(key)
    dictionary['count'] = value
    return dictionary

def parse_csv(line):
    import csv
    return next(csv.reader([line]))

call_data_p = beam.Pipeline(DataflowRunner(), options = pipeline_options)

call_data = (call_data_p | "Read" >> beam.io.ReadFromText('gs://data604-project-g3-data/calls.json')
                         | "Parse JSON" >> beam.Map(json.loads) 
                         | "Define_Key" >> beam.Map(define_key)
                         | 'Count elements per key' >> beam.combiners.Count.PerKey()
                         | 'Turn key back to content' >> beam.Map(expand)
            )


call_data | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=table_schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_TRUNCATE)


call_data_p.run()
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
No cache_root detected. Defaulting to staging_location gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff for cache location.
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>)
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>)
WARNING: The directory '/home/jupyter/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag.
INFO:apache_beam.runners.dataflow.dataflow_runner:Pipeline has additional dependencies to be installed in SDK worker container, consider using the SDK container image pre-building workflow to avoid repetitive installations. Learn more on https://cloud.google.com/dataflow/docs/guides/using-custom-containers#prebuild
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/submission_environment_dependencies.txt...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/submission_environment_dependencies.txt in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/pipeline.pb...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff/beamapp-root-1127230020-915980-p7pejyxo.1732748420.916173/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
 clientRequestId: '20241127230020917062-9738'
 createTime: '2024-11-27T23:00:21.527581Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-11-27_15_00_21-2354144757739365416'
 location: 'us-central1'
 name: 'beamapp-root-1127230020-915980-p7pejyxo'
 projectId: 'data604-project-g3'
 stageStates: []
 startTime: '2024-11-27T23:00:21.527581Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2024-11-27_15_00_21-2354144757739365416]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2024-11-27_15_00_21-2354144757739365416
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2024-11-27_15_00_21-2354144757739365416?project=data604-project-g3
Out[39]:
<DataflowPipelineResult <Job
 clientRequestId: '20241127230020917062-9738'
 createTime: '2024-11-27T23:00:21.527581Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-11-27_15_00_21-2354144757739365416'
 location: 'us-central1'
 name: 'beamapp-root-1127230020-915980-p7pejyxo'
 projectId: 'data604-project-g3'
 stageStates: []
 startTime: '2024-11-27T23:00:21.527581Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7fdf2498a7a0>
In [38]:
def define_key(element):
    import json
    del element['time']
    return (json.dumps(element), element)

def expand(element):
    import json
    key, value = element
    dictionary = json.loads(key)
    dictionary['count'] = value
    return dictionary

def parse_csv(line):
    import csv
    return next(csv.reader([line]))

with beam.Pipeline(runner='DirectRunner') as pipeline:
    lines = pipeline | beam.io.ReadFromText('gs://data604-project-g3-data/calls.json')
    parsed_lines = lines | beam.Map(json.loads)
    keyed_lines = parsed_lines | "Define_Key" >> beam.Map(define_key)
    counted = keyed_lines | 'Count elements per key' >> beam.combiners.Count.PerKey()
    result = counted | 'Turn key back to content' >> beam.Map(expand)
    result | "Write To BigQuery" >> WriteToBigQuery(
        table=table, schema=table_schema,
        custom_gcs_temp_location = "gs://data604-project-g3-data/.cache",
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=BigQueryDisposition.WRITE_APPEND
    )
    result = pipeline.run()
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.UpdateDestinationSchema'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.TriggerCopyJobs'>)
WARNING:apache_beam.transforms.core:('No iterator is returned by the process method in %s.', <class 'apache_beam.io.gcp.bigquery_file_loads.DeleteTablesFn'>)
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.io.gcp.bigquery_file_loads:Load job has 1 files. Job name is beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0.
INFO:apache_beam.io.gcp.bigquery_file_loads:Triggering job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 to load data to BigQuery table <TableReference
 datasetId: 'Footprint_data'
 projectId: 'data604-project-g3'
 tableId: 'calls'>.Schema: {'fields': [{'name': 'user_id', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'phone_number', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'incoming', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'registered', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'count', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}. Additional parameters: {}. Source format: NEWLINE_DELIMITED_JSON
INFO:apache_beam.io.gcp.bigquery_tools:Started BigQuery job: <JobReference
 jobId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0'
 location: 'US'
 projectId: 'data604-project-g3'>
 bq show -j --format=prettyjson --project_id=data604-project-g3 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0
INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: RUNNING
INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_ebdf7a9f317345dd9af236a2dc818441_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: DONE
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.io.gcp.bigquery_file_loads:Load job has 1 files. Job name is beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0.
INFO:apache_beam.io.gcp.bigquery_file_loads:Triggering job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 to load data to BigQuery table <TableReference
 datasetId: 'Footprint_data'
 projectId: 'data604-project-g3'
 tableId: 'calls'>.Schema: {'fields': [{'name': 'user_id', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'phone_number', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'incoming', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'registered', 'type': 'BOOLEAN', 'mode': 'NULLABLE'}, {'name': 'count', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}. Additional parameters: {}. Source format: NEWLINE_DELIMITED_JSON
INFO:apache_beam.io.gcp.bigquery_tools:Started BigQuery job: <JobReference
 jobId: 'beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0'
 location: 'US'
 projectId: 'data604-project-g3'>
 bq show -j --format=prettyjson --project_id=data604-project-g3 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0
INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: RUNNING
INFO:apache_beam.io.gcp.bigquery_tools:Job data604-project-g3:US.beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_914fb6b89d254c15a06ccd7b0e03af0c_236eb1a8c2973147fb3da8f1cd2ff4ec_pane0_partition0 status: DONE
In [18]:
result
Out[18]:
<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fdf2cdb8eb0>
In [ ]: