Chapter 6 Data Engineering Spring 2021 Documentation

6.1 Completing the Pipeline

In order to connect data collection process to the rest of the pipeline, it had to be able to read patient information and login credentials from the SQL database, as well as automatically append the collected biometric data to a database. In order to accomplish this, the PythonBot.py code had to be able to connect to SQL. This was done by first importing the necessary sqlachemy pacakges and then establishing a connection to our specific database.

To import packages:

# using sqlalchemy
import sqlalchemy as sal
from sqlalchemy import create_engine
import pandas as pd

To establish connection to SQL database:

# establish connecttion URL
conn = "mysql+pymysql://{0}:{1}@{2}:{3}/{4}".format(
    'cb3i17t0aqn6a4ff', 'e2l4k9zn24shcj42', 'rnr56s6e2uk326pj.cbetxkdyhwsb.us-east-1.rds.amazonaws.com', '3306', 'lfry112yqr3k2dfr')
 
# create engine
engine = sal.create_engine(conn)

If another SQL database is used in the future, please note that 'cb3i17t0aqn6a4ff' corresponds to 'username', 'e2l4k9zn24shcj42' corresponds to 'password', 'rnr56s6e2uk326pj.cbetxkdyhwsb.us-east-1.rds.amazonaws.com' corresponds to 'address', '3306' corresponds to 'port', and 'lfry112yqr3k2dfr' corresponds to 'DB'. Change these values as appropriate in the code.

6.1.1 Reading in user data from patient table

Patient data was imported as a series of arrays. The SQL patient table was read in as df1, which was used to form lists of data corresponding to individual patient's emails, passwords, IDs, and usernames.

df1 = pd.read_sql_query("SELECT * FROM patient", engine)
emails = []
passwords = []
IDs = []
usernames = []
for i in range(len(df1)):
    emails.append(df1.email[i])
    passwords.append(df1.fitbit_password[i])
    IDs.append(df1.patient_id[i])
    usernames.append(df1.username[i])

As was previously the case, this data was iterated through and plugged into the FitbitBot function.

6.1.2 Appending user data to SQL table

Once data was collected (collection functions can be found in BiometricPrevious_getDevice_v2.py), it needed to be appended to the SQL database. A new function was made to reformat the data to have the same column names as the SQL table.

def appendDataBase(DATA,ENGINE, USER, ID):
    obj = pd.DataFrame()
   
    obj['patient_id'] = [ID]
    obj['fbusername'] = [USER]
    obj['collection_date'] = [DATA.get("Date")]
    obj['steps'] = [DATA.get("Steps")]
    obj['floors_climbed'] = [DATA.get("Floors Climbed")]
    obj['total_miles'] = [DATA.get("Total Miles")]
    obj['lightly_active_miles'] = [DATA.get("Lightly Active Miles")]
    obj['moderately_active_miles'] = [DATA.get("Moderately Active Miles")]
    obj['very_active_miles'] = [DATA.get("Very Active Miles")]
    obj['sedentary_minutes'] = [DATA.get("Sedentary Minutes")]
    obj['lightly_active_minutes'] = [DATA.get("Lightly Active Minutes")]
    obj['fairly_active_minutes'] = [DATA.get("Fairly Active Minutes")]
    obj['very_active_minutes'] = [DATA.get("Very Active Minutes")]
    obj['hr30_100_minutes'] = [DATA.get("HR 30-100 Minutes")]
    obj['hr100_140_minutes'] = [DATA.get("HR 100-140 Minutes")]
    obj['hr140_170_minutes'] = [DATA.get("HR 140-170 Minutes")]
    obj['hr170_220_minutes'] = [DATA.get("HR 170-220 Minutes")]
    obj['average_resting_hr'] = [DATA.get("Average Resting HR")]
    obj['bmi'] = DATA.get("BMI")
    obj['sleep_efficiency'] = DATA.get("Sleep Efficiency")
    obj['weight'] = DATA.get("Weight")
    obj["minutes_asleep"] = todaysData.get("Minutes Alseep")
 
    obj.to_sql("fitbit_data", con=ENGINE, if_exists='append', index= False)
   
    return(None)

In this function, the argument DATA is a dataframe returned from biometric_previous_getDevice_v2.py, ENGINE is the connection engine to the SQL database, and USER and ID correspond to the user's Fitbit username and Merck trial userID. The to_sql function at the end is responsible for exporting the reformatted data to the "fitbit_data" table in the SQL database. The if_exists='append' argument is responsible for appending to the database instead of overwriting it, and the index=False argument stops the code from creating the original indexing as a column in SQL.

More documentation on .to_sql can be found here: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html

In order to pass the userID and fitbit username to the SQL database, the FitbitBot function and function call had to be slightly altered.

class FitbitBot:
    def __init__(self, EMAIL, PASSWORD, DATE, usernames, ID):
 
        #Both the Client ID and Client Secret come from when Fitbit site after registering an app
        CLIENT_ID = '22BH28' #Mine:'22BKP3'
        CLIENT_SECRET = '78a4838804c1ff0983591e69196b1c46' #Mine:'1a42e97b6b4cc640572ae5cf10a7d0b0'
        #Authorization Process
        # opens website
        server = Oauth2.OAuth2Server(CLIENT_ID, CLIENT_SECRET)
        # opens website
        server.browser_authorize(EMAIL, PASSWORD)
        ACCESS_TOKEN = str(server.fitbit.client.session.token['access_token'])
        REFRESH_TOKEN = str(server.fitbit.client.session.token['refresh_token'])
        auth2_client = fitbit.Fitbit(CLIENT_ID, CLIENT_SECRET, Oauth2=True, access_token=ACCESS_TOKEN,
        refresh_token=REFRESH_TOKEN)
        BiometricPrev = BiometricPrevious.FitbitModel1(auth2_client)
        bioDict, biometricDF = BiometricPrev.getBiometricData(DATE) #append to data frame
        title = './CSV_Files/user' + str(i) + '_' + DATE + '.csv'
        appendDataBase(bioDict,engine,usernames,ID)
        print("Python Script Executed")

usernames and ID arguments were added to the function. The appendDataBase function call was also added into the function, and BiometricPrev.getBiometricData(DATA) was slightly altered to return both a dictionary and a database.

Since the function arguments were exanded, the function call also had to be adjusted to pass usernames[i] and IDs[i].

today = str((datetime.datetime.now() - datetime.timedelta(1)).strftime("%Y-%m-%d"))
# Run data extraction
for i in range(len(emails)):
    FitbitBot(emails[i], passwords[i], today, usernames[i], IDs[i])

6.2 Automating the Pipeline

Automating the process was done two different ways. One was done through Cronjob and is useful for UNIX (Mac, linux) operating systems. This was used while the code was on Scholar. However, Scholar had period cronjob wipes, which meant that the automated process had to continually be re-established (and that defeated the purpose!). As a result, the code stopped being run on Scholar and was moved onto the Merck E2C AWS server. This was Windows based, so Task Scheduler was used.

6.2.1 Cronjob (for Mac and Linux)

Open Terminal. To see a list of current cronjob, type the following:

crontab -l

To edit a crontab or edit an existing one:

crontab -e

Type i to enter INSERT mode. Enter your cronjob command. For the case of this project on scholar, the following was used:

0 1 * * * cd /class/datamine/corporate/merck/DataEngineers/Fitbit && /class/datamine/apps/python/f2020-s2021/env/bin/python3.8 PythonBot.py

The first 5 characters are timing instructions. The first one (0) corresponds to the minute. The second (1) corresponds to the hour. The 3rd corresponds to the day of the month, then month, then day of the week. An asterisk indicates that the job will run for every value in those corresponding categories. As a result

0 1 * * * 

will run everyday at 1am.

cd navigates to a file directory. Note that if your python functions are in PATH, you can navigate to your file directly before issuing the crontab -e command, in which case you can simply type:

0 1 * * * PythonBot.py

However, since python is not currently in PATH on scholar, the path to both the desired code (PythonBot.py) and had to be indicated in the command.

Type :wq to write the command and quit Insert mode. Type crontab -l in Terminal to confirm set up.

6.2.2 Task Scheduler (Windows)

Windows provides a GUI for their scheduling program. To open it, click on the microscope search icon in the bottom of the screen (or right click on the windows icon and select ‘search’) and type Task Scheduler. Open the application.

In the Actions sidebar, click on Create Task. Name the program and navigate to the Triggers tab. The trigger tab is tells Windows how often to run the program. Add a new trigger by clicking on New. Specify how often to run the program. The current project was made to run everyday at 1 am, and this was accomplished by selecting Daily, adjusting the start time to 1am, and clicking ok.

Note The E2C instance is not based in EST time. Make sure that your entered time is actually your desired time.

Now its time to tell the Task Scheduler what program to actually run. Click on the Actions tab and add a new action. Browse for the script to automate. Click ok. The Conditions and Settings tabs offer more customization, but are not needed for the purposes of this project. Click ok to save the task.

The added ask should show up in the main page of the GUI. Confirm that it does and that its status is Enabled. If its status is Disabled, right click on the task and select Enable.

The program is now ready to run automatically!

6.3 Exploring Apple HealthKit and XCode

Our team decided to start looking into creating the data aquisition script for the Apple Watch. This led us to Apple HealthKit. HealthKit allows access to and the ability to share health and fitness data that is collected using an iPhone and/or Apple Watch. We explored the the documentation that Apple had to offer and began learning how to use HealthKit. To use HealthKit to access the user's health data, we had to begin learning the language Swift and the IDE, XCode The problem we quickly encountered was that this could only be fully explored on Apple devices. In addition to exploring HealthKit and XCode on our own, we also searched for resources that we could use to assist in developing the data aquisition script. A lot of what we found was able to read and write the data but not export the data. Our current plan is to utilize a code template that walks through the authentication and data collection process and then directly send the data to the AWS database.

A few other options exist for collecting the data from the Apple Watch. React Native can be used to develop an IOS application to collect and store the data fron the watch. However, apple devices are still needed to build and deploy the application, and thus not providing much advantage over a Swift based application. Another method to collect the data could be to utilize Google Fit and the pre-existing Google API. This would require the user to download the Google Fit application on their phone and signing in, which creates an intermediary that wouldn't be necessary in the other implementations. In the end, these two other implementations were not used in favor of the Swift implementation.

6.4 Thank you & Acknowledgements

We would like to thank our Merck Corporate Partners, TA, and all the Data Mine staff that have helped us throughout this semester.