National Water Data: HYDAT

The Hydat data is made available through the following Government of Canada website: http://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/.

The daily flow data is extracted from a provided MS access database and after transformation the data is stored into a SQL server database.

[1]:
import pandas as pd
import numpy as np
import warnings
import PythonTools as PT
import os
import sys
%matplotlib inline
%load_ext autoreload
%autoreload 2
[2]:
import urllib.request

sourcefile = urllib.request.urlretrieve("http://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/Hydat_sqlite3_20180117.zip",
                                        "Data/Hydat_sqlite3_20180117.zip")
[3]:
PT.unzip_file(path_to_zip_file='Data/Hydat_sqlite3_20180117.zip', destination='Data')
[4]:
hydat_path = "Data/Hydat.sqlite3"

Hydat data

[5]:
hydat_connection = PT.Sqlite3Connect(hydat_path)
Sqliet version: 2.6.0
Successful Connection

Input Query (data Extraction)

A query to extract a portion of the data to perform required transformation followed by data analysis.

[6]:
query='''
SELECT
    F.STATION_NUMBER
    ,YEAR
    ,MONTH
    ,FLOW1
    ,FLOW_SYMBOL1
    ,FLOW2
    ,FLOW_SYMBOL2
    ,FLOW3
    ,FLOW_SYMBOL3
    ,FLOW4
    ,FLOW_SYMBOL4
    ,FLOW5
    ,FLOW_SYMBOL5
    ,FLOW6
    ,FLOW_SYMBOL6
    ,FLOW7
    ,FLOW_SYMBOL7
    ,FLOW8
    ,FLOW_SYMBOL8
    ,FLOW9
    ,FLOW_SYMBOL9
    ,FLOW10
    ,FLOW_SYMBOL10
    ,FLOW11
    ,FLOW_SYMBOL11
    ,FLOW12
    ,FLOW_SYMBOL12
    ,FLOW13
    ,FLOW_SYMBOL13
    ,FLOW14
    ,FLOW_SYMBOL14
    ,FLOW15
    ,FLOW_SYMBOL15
    ,FLOW16
    ,FLOW_SYMBOL16
    ,FLOW17
    ,FLOW_SYMBOL17
    ,FLOW18
    ,FLOW_SYMBOL18
    ,FLOW19
    ,FLOW_SYMBOL19
    ,FLOW20
    ,FLOW_SYMBOL20
    ,FLOW21
    ,FLOW_SYMBOL21
    ,FLOW22
    ,FLOW_SYMBOL22
    ,FLOW23
    ,FLOW_SYMBOL23
    ,FLOW24
    ,FLOW_SYMBOL24
    ,FLOW25
    ,FLOW_SYMBOL25
    ,FLOW26
    ,FLOW_SYMBOL26
    ,FLOW27
    ,FLOW_SYMBOL27
    ,FLOW28
    ,FLOW_SYMBOL28
    ,FLOW29
    ,FLOW_SYMBOL29
    ,FLOW30
    ,FLOW_SYMBOL30
    ,FLOW31
    ,FLOW_SYMBOL31
    ,MIN
    ,MAX
FROM DLY_FLOWS F
INNER JOIN STATIONS S
    ON F.STATION_NUMBER = S.STATION_NUMBER
'''
df_hydat = hydat_connection.execute_query(query)
df_hydat.head()
[6]:
STATION_NUMBER YEAR MONTH FLOW1 FLOW_SYMBOL1 FLOW2 FLOW_SYMBOL2 FLOW3 FLOW_SYMBOL3 FLOW4 ... FLOW28 FLOW_SYMBOL28 FLOW29 FLOW_SYMBOL29 FLOW30 FLOW_SYMBOL30 FLOW31 FLOW_SYMBOL31 MIN MAX
0 01AD001 1928 10 16.600000 E 16.600000 E 16.600000 E 16.600000 ... 29.700001 E 29.700001 E 29.700001 E 29.700001 E 15.300000 29.700001
1 01AD001 1928 11 29.700001 E 29.700001 E 29.700001 E 29.700001 ... 31.400000 None 31.400000 None 29.200001 None NaN None 29.200001 34.000000
2 01AD001 1928 12 29.200001 None 29.200001 None 29.200001 None 29.200001 ... 17.600000 None 17.600000 None 17.600000 None 17.600000 None 17.600000 29.200001
3 01AD001 1929 1 16.100000 B 16.100000 B 16.100000 B 16.100000 ... 17.400000 B 17.400000 B 17.400000 B 17.400000 B 16.100000 17.400000
4 01AD001 1929 2 16.299999 B 16.299999 B 16.299999 B 16.299999 ... 13.500000 B NaN None NaN None NaN None 13.500000 16.299999

5 rows × 67 columns

Choose a subset of data

[7]:
df_hydat = df_hydat.head(1000)

Break table (Days, Symbols)

Each day has a flow value along with a symbol. The transformation un-pivots the source data based on daily flow and symbol. This is implemented below and a preview of the results are provided.

[8]:
basecolumn = ['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX']
column_flow_day = basecolumn+['FLOW%i'%(i) for i in range(1,32)]
column_symbol_day= basecolumn+['FLOW_SYMBOL%i'%(i) for i in range(1,32)]

df_hydat_flow = df_hydat[column_flow_day]
df_hydat_symbol = df_hydat[column_symbol_day]

Unpivot Days (melt)

[9]:
df_hydat_flow_melt = pd.melt(df_hydat_flow,
                             id_vars=['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX'],
                             var_name = 'Day',
                             value_name="FlowValue")
df_hydat_flow_melt.head()
[9]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue
0 01AD001 1928 10 15.300000 29.700001 FLOW1 16.600000
1 01AD001 1928 11 29.200001 34.000000 FLOW1 29.700001
2 01AD001 1928 12 17.600000 29.200001 FLOW1 29.200001
3 01AD001 1929 1 16.100000 17.400000 FLOW1 16.100000
4 01AD001 1929 2 13.500000 16.299999 FLOW1 16.299999

Unpivot Symbols

[10]:
df_hydat_symbol_melt = pd.melt(df_hydat_symbol,
                               id_vars=['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX'],
                               var_name = 'FlowSymbol',
                               value_name="Symbol")
df_hydat_symbol_melt['Day'] = df_hydat_symbol_melt['FlowSymbol'].apply(lambda s: 'FLOW'+s.split('FLOW_SYMBOL')[1])
df_hydat_symbol_melt.head()
[10]:
STATION_NUMBER YEAR MONTH MIN MAX FlowSymbol Symbol Day
0 01AD001 1928 10 15.300000 29.700001 FLOW_SYMBOL1 E FLOW1
1 01AD001 1928 11 29.200001 34.000000 FLOW_SYMBOL1 E FLOW1
2 01AD001 1928 12 17.600000 29.200001 FLOW_SYMBOL1 None FLOW1
3 01AD001 1929 1 16.100000 17.400000 FLOW_SYMBOL1 B FLOW1
4 01AD001 1929 2 13.500000 16.299999 FLOW_SYMBOL1 B FLOW1

Join tables

The un-pivoted tables for daily flow and flow symbol are merged/joined together and the extra columns are excluded.

[11]:
hydat_initial_join = df_hydat_flow_melt.merge(df_hydat_symbol_melt,
                                 left_on=['STATION_NUMBER','YEAR', 'MONTH', 'Day'],
                                 right_on=['STATION_NUMBER','YEAR', 'MONTH', 'Day'],
                                 how='inner',suffixes=('', '_'))
hydat_initial_join.head()
[11]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue MIN_ MAX_ FlowSymbol Symbol
0 01AD001 1928 10 15.300000 29.700001 FLOW1 16.600000 15.300000 29.700001 FLOW_SYMBOL1 E
1 01AD001 1928 11 29.200001 34.000000 FLOW1 29.700001 29.200001 34.000000 FLOW_SYMBOL1 E
2 01AD001 1928 12 17.600000 29.200001 FLOW1 29.200001 17.600000 29.200001 FLOW_SYMBOL1 None
3 01AD001 1929 1 16.100000 17.400000 FLOW1 16.100000 16.100000 17.400000 FLOW_SYMBOL1 B
4 01AD001 1929 2 13.500000 16.299999 FLOW1 16.299999 13.500000 16.299999 FLOW_SYMBOL1 B

Drop/rename columns

[12]:
hydat_initial_join.drop(['MIN_', 'MAX_','FlowSymbol'], axis=1, inplace=True)
hydat_initial_join.head()
[12]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue Symbol
0 01AD001 1928 10 15.300000 29.700001 FLOW1 16.600000 E
1 01AD001 1928 11 29.200001 34.000000 FLOW1 29.700001 E
2 01AD001 1928 12 17.600000 29.200001 FLOW1 29.200001 None
3 01AD001 1929 1 16.100000 17.400000 FLOW1 16.100000 B
4 01AD001 1929 2 13.500000 16.299999 FLOW1 16.299999 B
[13]:
hydat_initial_join['Day'] = hydat_initial_join['Day'].apply( lambda d: int( d.split('FLOW')[1] ) )
hydat_initial_join.head()
[13]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue Symbol
0 01AD001 1928 10 15.300000 29.700001 1 16.600000 E
1 01AD001 1928 11 29.200001 34.000000 1 29.700001 E
2 01AD001 1928 12 17.600000 29.200001 1 29.200001 None
3 01AD001 1929 1 16.100000 17.400000 1 16.100000 B
4 01AD001 1929 2 13.500000 16.299999 1 16.299999 B

Adding date

Year, month and day columns are combined to provide a date column that can be used for data analysis.

[14]:
hydat_initial_join['DateKey'] = hydat_initial_join[['YEAR', 'MONTH', 'Day']].apply(lambda x : "{}{}{}".format(x['YEAR'], str(x['MONTH']).zfill(2), str(x['Day']).zfill(2)), axis=1)
hydat_initial_join.head()
[14]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue Symbol DateKey
0 01AD001 1928 10 15.300000 29.700001 1 16.600000 E 19281001
1 01AD001 1928 11 29.200001 34.000000 1 29.700001 E 19281101
2 01AD001 1928 12 17.600000 29.200001 1 29.200001 None 19281201
3 01AD001 1929 1 16.100000 17.400000 1 16.100000 B 19290101
4 01AD001 1929 2 13.500000 16.299999 1 16.299999 B 19290201

Export to CSV

The final result of the transformations for hydat data is written to a csv file.

[15]:
PT.write_csv(hydat_initial_join,flname='Hydat.csv', chunksize=10000)

Write to SQL server

Initialize a connection

[16]:
connection = PT.ServerConnect(server='DESKTOP-AN4AQCT\SQLEXPRESS', database='Dev')
Successful Connection

Get server version

[17]:
print(connection.get_server_version())
Microsoft SQL Server 2016 (SP1-GDR) (KB4019089) - 13.0.4206.0 (X64)
        Jul  6 2017 07:55:03
        Copyright (c) Microsoft Corporation
        Express Edition (64-bit) on Windows 10 Home 6.3 <X64> (Build 16299: )

Create a new table

[18]:
cmd = '''
DROP TABLE IF EXISTS [{schema}].[{TableName}]
CREATE TABLE [{schema}].[{TableName}]
(
    [RowId] INT IDENTITY (1, 1) NOT NULL,
    [STATION_NUMBER] NVARCHAR(10) Null,
    [YEAR] INT Null,
    [MONTH] INT Null,
    [MIN] DECIMAL(19,5) Null,
    [MAX] DECIMAL(19,5) Null,
    [Day] INT Null,
    [FlowValue] DECIMAL(19,5) Null,
    [Symbol] NVARCHAR(5) Null,
    [DateKey] INT Null,
)

CREATE CLUSTERED COLUMNSTORE INDEX IX_Hydat_DailyFlow ON [{schema}].[{TableName}]


'''.format(schema = 'dbo', TableName='Hydat_FlowData')
connection.deploy(cmd)

Write to SQL Server

[19]:
PT.write_to_sql(hydat_initial_join,table_name='Hydat_FlowData',if_exists='append', schema='dbo',
                connection_string=connection.connection_string)

Read Back From Server

[20]:
query ='''
SELECT *
FROM [{schema}].[{TableName}]
'''.format(schema = 'dbo', TableName='Hydat_FlowData')
DF_Hydat = connection.execute_query(query)
DF_Hydat.head()
[20]:
RowId STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue Symbol DateKey
0 1 01AD001 1928 10 15.3 29.7 1 16.6 E 19281001
1 2 01AD001 1928 11 29.2 34.0 1 29.7 E 19281101
2 3 01AD001 1928 12 17.6 29.2 1 29.2 None 19281201
3 4 01AD001 1929 1 16.1 17.4 1 16.1 B 19290101
4 5 01AD001 1929 2 13.5 16.3 1 16.3 B 19290201
[21]:
connection.close()

Write to sqlite database

[22]:
sqlite_connection = PT.Sqlite3Connect('Data/Hydat_Transformed.db')
Sqliet version: 2.6.0
Successful Connection
[23]:
cmd1 = '''DROP TABLE IF EXISTS {schema}.{TableName};'''.format(schema='main', TableName='Hydat_FlowData')
cmd2 = '''
CREATE TABLE
IF NOT EXISTS {schema}.{TableName}(
    [STATION_NUMBER] NVARCHAR(10) Null,
    [YEAR] INT Null,
    [MONTH] INT Null,
    [MIN] DECIMAL(19,5) Null,
    [MAX] DECIMAL(19,5) Null,
    [Day] INT Null,
    [FlowValue] DECIMAL(19,5) Null,
    [Symbol] NVARCHAR(5) Null,
    [DateKey] INT Null
);
'''.format(schema='main', TableName='Hydat_FlowData')
[24]:
sqlite_connection.deploy(cmd1)
sqlite_connection.deploy(cmd2)
[25]:
PT.write_to_sql(hydat_initial_join,
                table_name='Hydat_FlowData',
                if_exists='append',
                schema='main',
                driver='sqlite',
                index=False,
                connection_string=sqlite_connection.database)
[26]:
sqlite_connection.execute_query('SELECT * FROM main.Hydat_FlowData').head()
[26]:
STATION_NUMBER YEAR MONTH MIN MAX Day FlowValue Symbol DateKey
0 01AD001 1928 10 15.300000 29.700001 1 16.600000 E 19281001
1 01AD001 1928 11 29.200001 34.000000 1 29.700001 E 19281101
2 01AD001 1928 12 17.600000 29.200001 1 29.200001 None 19281201
3 01AD001 1929 1 16.100000 17.400000 1 16.100000 B 19290101
4 01AD001 1929 2 13.500000 16.299999 1 16.299999 B 19290201
[27]:
sqlite_connection.close()