In [15]:
import logging
import json
from datetime import datetime
import pandas as pd 
import time
import traceback

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam import (
    DoFn, 
    io,
    ParDo, 
    PTransform,
    WindowInto, 
    WithKeys,
)

from apache_beam.runners import DataflowRunner

import google.auth
In [16]:
project_auth = google.auth.default()[1] 


pipeline_options = pipeline_options.PipelineOptions(
    flags = {},
    streaming = True, 
    project = project_auth, 
    region= 'us-central1', 
    staging_location = "%s/staging" % "data604-project-g3-data"
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
In [18]:
table_schema = { 
    "fields":[ 
        {'name': "time", 'type':"TIMESTAMP", 'mode':"NULLABLE"}, 
        {'name': "package_name", 'type':"STRING", 'mode':"NULLABLE"}, 
        {'name': "last_time_used",'type': "TIMESTAMP", 'mode':"NULLABLE"},
        {'name': "foreground_time_ms",'type': "INTEGER", 'mode':"NULLABLE"}, 
        {'name': "user_id", 'type':"STRING", 'mode':"NULLABLE"}, 
    ],
}

table = "data604-project-g3:Footprint_data.app_usage"
In [17]:
class transformations(beam.DoFn):
    def process(self, element):
        import pandas as pd 
        
        app_usage = pd.DataFrame([element])
        
        try: 
            app_usage['time'] = pd.to_datetime(app_usage['time'], unit='ms') 
            app_usage['last_time_used'] = pd.to_datetime(app_usage['last_time_used'] , unit = 'ms')
            app_usage = app_usage.rename(columns={'total_fg_time':"foreground_time_ms"})
        except Exception as e: 
            logger.error(f"Error processing record {element}: {e}")
            #return not sure if we should keep this or not 
        
        yield app_usage.to_dict(orient='records')[0]
In [20]:
app_usage_p = beam.Pipeline(
    DataflowRunner(),
    options = pipeline_options,
)

app_usage_topic = "projects/data604-project-g3/topics/apps_usage_fake_streaming"

app_usage_pubsub = (app_usage_p | "Read Topic" >> ReadFromPubSub(topic=app_usage_topic)
                                | "Window" >> beam.WindowInto(beam.window.FixedWindows(size=3600))
                                | "To Dict" >> beam.Map(json.loads)
                                | "Transformations" >> beam.ParDo(transformations()))

app_usage_pubsub | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=table_schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND)

app_usage_p.run()
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
No cache_root detected. Defaulting to staging_location gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff for cache location.
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING: The directory '/home/jupyter/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag.
Out[20]:
<DataflowPipelineResult <Job
 clientRequestId: '20241127221651567027-5878'
 createTime: '2024-11-27T22:16:52.660779Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2024-11-27_14_16_52-17763565330462161054'
 location: 'us-central1'
 name: 'beamapp-root-1127221651-565917-89gabpgk'
 projectId: 'data604-project-g3'
 stageStates: []
 startTime: '2024-11-27T22:16:52.660779Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7f203295d780>
In [ ]: