import logging
import json
from datetime import datetime
import pandas as pd
import time
import traceback
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText
from apache_beam import (
DoFn,
io,
ParDo,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.runners import DataflowRunner
import google.auth
project_auth = google.auth.default()[1]
pipeline_options = pipeline_options.PipelineOptions(
flags = {},
streaming = True,
project = project_auth,
region= 'us-central1',
staging_location = "%s/staging" % "data604-project-g3-data"
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
table_schema = {
"fields":[
{'name': "time", 'type':"TIMESTAMP", 'mode':"NULLABLE"},
{'name': "package_name", 'type':"STRING", 'mode':"NULLABLE"},
{'name': "last_time_used",'type': "TIMESTAMP", 'mode':"NULLABLE"},
{'name': "foreground_time_ms",'type': "INTEGER", 'mode':"NULLABLE"},
{'name': "user_id", 'type':"STRING", 'mode':"NULLABLE"},
],
}
table = "data604-project-g3:Footprint_data.app_usage"
class transformations(beam.DoFn):
def process(self, element):
import pandas as pd
app_usage = pd.DataFrame([element])
try:
app_usage['time'] = pd.to_datetime(app_usage['time'], unit='ms')
app_usage['last_time_used'] = pd.to_datetime(app_usage['last_time_used'] , unit = 'ms')
app_usage = app_usage.rename(columns={'total_fg_time':"foreground_time_ms"})
except Exception as e:
logger.error(f"Error processing record {element}: {e}")
#return not sure if we should keep this or not
yield app_usage.to_dict(orient='records')[0]
app_usage_p = beam.Pipeline(
DataflowRunner(),
options = pipeline_options,
)
app_usage_topic = "projects/data604-project-g3/topics/apps_usage_fake_streaming"
app_usage_pubsub = (app_usage_p | "Read Topic" >> ReadFromPubSub(topic=app_usage_topic)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(size=3600))
| "To Dict" >> beam.Map(json.loads)
| "Transformations" >> beam.ParDo(transformations()))
app_usage_pubsub | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=table_schema,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND)
app_usage_p.run()
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff
WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING:apache_beam.io.gcp.gcsio:Unexpected error occurred when checking soft delete policy for gs://dataflow-staging-us-central1-18675e2485afe7315f18de97c3880eff WARNING: The directory '/home/jupyter/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag.
<DataflowPipelineResult <Job clientRequestId: '20241127221651567027-5878' createTime: '2024-11-27T22:16:52.660779Z' currentStateTime: '1970-01-01T00:00:00Z' id: '2024-11-27_14_16_52-17763565330462161054' location: 'us-central1' name: 'beamapp-root-1127221651-565917-89gabpgk' projectId: 'data604-project-g3' stageStates: [] startTime: '2024-11-27T22:16:52.660779Z' steps: [] tempFiles: [] type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7f203295d780>