National Water Data: HYDAT¶
The Hydat data is made available through the following Government of Canada website: http://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/.
The daily flow data is extracted from a provided MS access database and after transformation the data is stored into a SQL server database.
[1]:
import pandas as pd
import numpy as np
import warnings
import PythonTools as PT
import os
import sys
%matplotlib inline
%load_ext autoreload
%autoreload 2
[2]:
import urllib.request
sourcefile = urllib.request.urlretrieve("http://collaboration.cmc.ec.gc.ca/cmc/hydrometrics/www/Hydat_sqlite3_20180117.zip",
"Data/Hydat_sqlite3_20180117.zip")
[3]:
PT.unzip_file(path_to_zip_file='Data/Hydat_sqlite3_20180117.zip', destination='Data')
[4]:
hydat_path = "Data/Hydat.sqlite3"
Hydat data¶
[5]:
hydat_connection = PT.Sqlite3Connect(hydat_path)
Sqliet version: 2.6.0
Successful Connection
Input Query (data Extraction)¶
A query to extract a portion of the data to perform required transformation followed by data analysis.
[6]:
query='''
SELECT
F.STATION_NUMBER
,YEAR
,MONTH
,FLOW1
,FLOW_SYMBOL1
,FLOW2
,FLOW_SYMBOL2
,FLOW3
,FLOW_SYMBOL3
,FLOW4
,FLOW_SYMBOL4
,FLOW5
,FLOW_SYMBOL5
,FLOW6
,FLOW_SYMBOL6
,FLOW7
,FLOW_SYMBOL7
,FLOW8
,FLOW_SYMBOL8
,FLOW9
,FLOW_SYMBOL9
,FLOW10
,FLOW_SYMBOL10
,FLOW11
,FLOW_SYMBOL11
,FLOW12
,FLOW_SYMBOL12
,FLOW13
,FLOW_SYMBOL13
,FLOW14
,FLOW_SYMBOL14
,FLOW15
,FLOW_SYMBOL15
,FLOW16
,FLOW_SYMBOL16
,FLOW17
,FLOW_SYMBOL17
,FLOW18
,FLOW_SYMBOL18
,FLOW19
,FLOW_SYMBOL19
,FLOW20
,FLOW_SYMBOL20
,FLOW21
,FLOW_SYMBOL21
,FLOW22
,FLOW_SYMBOL22
,FLOW23
,FLOW_SYMBOL23
,FLOW24
,FLOW_SYMBOL24
,FLOW25
,FLOW_SYMBOL25
,FLOW26
,FLOW_SYMBOL26
,FLOW27
,FLOW_SYMBOL27
,FLOW28
,FLOW_SYMBOL28
,FLOW29
,FLOW_SYMBOL29
,FLOW30
,FLOW_SYMBOL30
,FLOW31
,FLOW_SYMBOL31
,MIN
,MAX
FROM DLY_FLOWS F
INNER JOIN STATIONS S
ON F.STATION_NUMBER = S.STATION_NUMBER
'''
df_hydat = hydat_connection.execute_query(query)
df_hydat.head()
[6]:
STATION_NUMBER | YEAR | MONTH | FLOW1 | FLOW_SYMBOL1 | FLOW2 | FLOW_SYMBOL2 | FLOW3 | FLOW_SYMBOL3 | FLOW4 | ... | FLOW28 | FLOW_SYMBOL28 | FLOW29 | FLOW_SYMBOL29 | FLOW30 | FLOW_SYMBOL30 | FLOW31 | FLOW_SYMBOL31 | MIN | MAX | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 16.600000 | E | 16.600000 | E | 16.600000 | E | 16.600000 | ... | 29.700001 | E | 29.700001 | E | 29.700001 | E | 29.700001 | E | 15.300000 | 29.700001 |
1 | 01AD001 | 1928 | 11 | 29.700001 | E | 29.700001 | E | 29.700001 | E | 29.700001 | ... | 31.400000 | None | 31.400000 | None | 29.200001 | None | NaN | None | 29.200001 | 34.000000 |
2 | 01AD001 | 1928 | 12 | 29.200001 | None | 29.200001 | None | 29.200001 | None | 29.200001 | ... | 17.600000 | None | 17.600000 | None | 17.600000 | None | 17.600000 | None | 17.600000 | 29.200001 |
3 | 01AD001 | 1929 | 1 | 16.100000 | B | 16.100000 | B | 16.100000 | B | 16.100000 | ... | 17.400000 | B | 17.400000 | B | 17.400000 | B | 17.400000 | B | 16.100000 | 17.400000 |
4 | 01AD001 | 1929 | 2 | 16.299999 | B | 16.299999 | B | 16.299999 | B | 16.299999 | ... | 13.500000 | B | NaN | None | NaN | None | NaN | None | 13.500000 | 16.299999 |
5 rows × 67 columns
Choose a subset of data¶
[7]:
df_hydat = df_hydat.head(1000)
Break table (Days, Symbols)¶
Each day has a flow value along with a symbol. The transformation un-pivots the source data based on daily flow and symbol. This is implemented below and a preview of the results are provided.
[8]:
basecolumn = ['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX']
column_flow_day = basecolumn+['FLOW%i'%(i) for i in range(1,32)]
column_symbol_day= basecolumn+['FLOW_SYMBOL%i'%(i) for i in range(1,32)]
df_hydat_flow = df_hydat[column_flow_day]
df_hydat_symbol = df_hydat[column_symbol_day]
Unpivot Days (melt)¶
[9]:
df_hydat_flow_melt = pd.melt(df_hydat_flow,
id_vars=['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX'],
var_name = 'Day',
value_name="FlowValue")
df_hydat_flow_melt.head()
[9]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | |
---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | FLOW1 | 16.600000 |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | FLOW1 | 29.700001 |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | FLOW1 | 29.200001 |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | FLOW1 | 16.100000 |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | FLOW1 | 16.299999 |
Unpivot Symbols¶
[10]:
df_hydat_symbol_melt = pd.melt(df_hydat_symbol,
id_vars=['STATION_NUMBER', 'YEAR', 'MONTH', 'MIN', 'MAX'],
var_name = 'FlowSymbol',
value_name="Symbol")
df_hydat_symbol_melt['Day'] = df_hydat_symbol_melt['FlowSymbol'].apply(lambda s: 'FLOW'+s.split('FLOW_SYMBOL')[1])
df_hydat_symbol_melt.head()
[10]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | FlowSymbol | Symbol | Day | |
---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | FLOW_SYMBOL1 | E | FLOW1 |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | FLOW_SYMBOL1 | E | FLOW1 |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | FLOW_SYMBOL1 | None | FLOW1 |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | FLOW_SYMBOL1 | B | FLOW1 |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | FLOW_SYMBOL1 | B | FLOW1 |
Join tables¶
The un-pivoted tables for daily flow and flow symbol are merged/joined together and the extra columns are excluded.
[11]:
hydat_initial_join = df_hydat_flow_melt.merge(df_hydat_symbol_melt,
left_on=['STATION_NUMBER','YEAR', 'MONTH', 'Day'],
right_on=['STATION_NUMBER','YEAR', 'MONTH', 'Day'],
how='inner',suffixes=('', '_'))
hydat_initial_join.head()
[11]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | MIN_ | MAX_ | FlowSymbol | Symbol | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | FLOW1 | 16.600000 | 15.300000 | 29.700001 | FLOW_SYMBOL1 | E |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | FLOW1 | 29.700001 | 29.200001 | 34.000000 | FLOW_SYMBOL1 | E |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | FLOW1 | 29.200001 | 17.600000 | 29.200001 | FLOW_SYMBOL1 | None |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | FLOW1 | 16.100000 | 16.100000 | 17.400000 | FLOW_SYMBOL1 | B |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | FLOW1 | 16.299999 | 13.500000 | 16.299999 | FLOW_SYMBOL1 | B |
Drop/rename columns¶
[12]:
hydat_initial_join.drop(['MIN_', 'MAX_','FlowSymbol'], axis=1, inplace=True)
hydat_initial_join.head()
[12]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | Symbol | |
---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | FLOW1 | 16.600000 | E |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | FLOW1 | 29.700001 | E |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | FLOW1 | 29.200001 | None |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | FLOW1 | 16.100000 | B |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | FLOW1 | 16.299999 | B |
[13]:
hydat_initial_join['Day'] = hydat_initial_join['Day'].apply( lambda d: int( d.split('FLOW')[1] ) )
hydat_initial_join.head()
[13]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | Symbol | |
---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | 1 | 16.600000 | E |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | 1 | 29.700001 | E |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | 1 | 29.200001 | None |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | 1 | 16.100000 | B |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | 1 | 16.299999 | B |
Adding date¶
Year, month and day columns are combined to provide a date column that can be used for data analysis.
[14]:
hydat_initial_join['DateKey'] = hydat_initial_join[['YEAR', 'MONTH', 'Day']].apply(lambda x : "{}{}{}".format(x['YEAR'], str(x['MONTH']).zfill(2), str(x['Day']).zfill(2)), axis=1)
hydat_initial_join.head()
[14]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | Symbol | DateKey | |
---|---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | 1 | 16.600000 | E | 19281001 |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | 1 | 29.700001 | E | 19281101 |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | 1 | 29.200001 | None | 19281201 |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | 1 | 16.100000 | B | 19290101 |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | 1 | 16.299999 | B | 19290201 |
Export to CSV¶
The final result of the transformations for hydat data is written to a csv file.
[15]:
PT.write_csv(hydat_initial_join,flname='Hydat.csv', chunksize=10000)
Write to SQL server¶
Initialize a connection¶
[16]:
connection = PT.ServerConnect(server='DESKTOP-AN4AQCT\SQLEXPRESS', database='Dev')
Successful Connection
Get server version¶
[17]:
print(connection.get_server_version())
Microsoft SQL Server 2016 (SP1-GDR) (KB4019089) - 13.0.4206.0 (X64)
Jul 6 2017 07:55:03
Copyright (c) Microsoft Corporation
Express Edition (64-bit) on Windows 10 Home 6.3 <X64> (Build 16299: )
Create a new table¶
[18]:
cmd = '''
DROP TABLE IF EXISTS [{schema}].[{TableName}]
CREATE TABLE [{schema}].[{TableName}]
(
[RowId] INT IDENTITY (1, 1) NOT NULL,
[STATION_NUMBER] NVARCHAR(10) Null,
[YEAR] INT Null,
[MONTH] INT Null,
[MIN] DECIMAL(19,5) Null,
[MAX] DECIMAL(19,5) Null,
[Day] INT Null,
[FlowValue] DECIMAL(19,5) Null,
[Symbol] NVARCHAR(5) Null,
[DateKey] INT Null,
)
CREATE CLUSTERED COLUMNSTORE INDEX IX_Hydat_DailyFlow ON [{schema}].[{TableName}]
'''.format(schema = 'dbo', TableName='Hydat_FlowData')
connection.deploy(cmd)
Write to SQL Server¶
[19]:
PT.write_to_sql(hydat_initial_join,table_name='Hydat_FlowData',if_exists='append', schema='dbo',
connection_string=connection.connection_string)
Read Back From Server¶
[20]:
query ='''
SELECT *
FROM [{schema}].[{TableName}]
'''.format(schema = 'dbo', TableName='Hydat_FlowData')
DF_Hydat = connection.execute_query(query)
DF_Hydat.head()
[20]:
RowId | STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | Symbol | DateKey | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 01AD001 | 1928 | 10 | 15.3 | 29.7 | 1 | 16.6 | E | 19281001 |
1 | 2 | 01AD001 | 1928 | 11 | 29.2 | 34.0 | 1 | 29.7 | E | 19281101 |
2 | 3 | 01AD001 | 1928 | 12 | 17.6 | 29.2 | 1 | 29.2 | None | 19281201 |
3 | 4 | 01AD001 | 1929 | 1 | 16.1 | 17.4 | 1 | 16.1 | B | 19290101 |
4 | 5 | 01AD001 | 1929 | 2 | 13.5 | 16.3 | 1 | 16.3 | B | 19290201 |
[21]:
connection.close()
Write to sqlite database¶
[22]:
sqlite_connection = PT.Sqlite3Connect('Data/Hydat_Transformed.db')
Sqliet version: 2.6.0
Successful Connection
[23]:
cmd1 = '''DROP TABLE IF EXISTS {schema}.{TableName};'''.format(schema='main', TableName='Hydat_FlowData')
cmd2 = '''
CREATE TABLE
IF NOT EXISTS {schema}.{TableName}(
[STATION_NUMBER] NVARCHAR(10) Null,
[YEAR] INT Null,
[MONTH] INT Null,
[MIN] DECIMAL(19,5) Null,
[MAX] DECIMAL(19,5) Null,
[Day] INT Null,
[FlowValue] DECIMAL(19,5) Null,
[Symbol] NVARCHAR(5) Null,
[DateKey] INT Null
);
'''.format(schema='main', TableName='Hydat_FlowData')
[24]:
sqlite_connection.deploy(cmd1)
sqlite_connection.deploy(cmd2)
[25]:
PT.write_to_sql(hydat_initial_join,
table_name='Hydat_FlowData',
if_exists='append',
schema='main',
driver='sqlite',
index=False,
connection_string=sqlite_connection.database)
[26]:
sqlite_connection.execute_query('SELECT * FROM main.Hydat_FlowData').head()
[26]:
STATION_NUMBER | YEAR | MONTH | MIN | MAX | Day | FlowValue | Symbol | DateKey | |
---|---|---|---|---|---|---|---|---|---|
0 | 01AD001 | 1928 | 10 | 15.300000 | 29.700001 | 1 | 16.600000 | E | 19281001 |
1 | 01AD001 | 1928 | 11 | 29.200001 | 34.000000 | 1 | 29.700001 | E | 19281101 |
2 | 01AD001 | 1928 | 12 | 17.600000 | 29.200001 | 1 | 29.200001 | None | 19281201 |
3 | 01AD001 | 1929 | 1 | 16.100000 | 17.400000 | 1 | 16.100000 | B | 19290101 |
4 | 01AD001 | 1929 | 2 | 13.500000 | 16.299999 | 1 | 16.299999 | B | 19290201 |
[27]:
sqlite_connection.close()