Source code for intake_postgres.intake_postgres

from __future__ import absolute_import
from intake.source import base
import pandas as pd
from postgresadapter import PostgresAdapter
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


[docs]class PostgresSource(base.DataSource): """Read data from PostgreSQL to dataframes Parameters ---------- uri: str Connection to PostgreSQL server sql_expr: str The full text of the SQL query to execute pg_kwargs: dict Further args passed to postgresadapter.PostgresAdapter, see https://github.com/ContinuumIO/PostgresAdapter/blob/master/postgresadapter/core/PostgresAdapter.pyx#L281 """ name = 'postgres' container = 'dataframe' version = __version__ partition_access = False def __init__(self, uri, sql_expr, pg_kwargs={}, metadata=None): self._uri = uri self._sql_expr = sql_expr self._pg_kwargs = pg_kwargs self._dataframe = None super(PostgresSource, self).__init__(metadata=metadata) def _get_schema(self): if self._dataframe is None: # This approach is not optimal; LIMIT is know to confuse the query # planner sometimes. If there is a faster approach to gleaning # dtypes from arbitrary SQL queries, we should use it instead. first_rows = PostgresAdapter( self._uri, dataframe=True, query=('({}) limit 10').format(self._sql_expr), **self._pg_kwargs )._to_dataframe() dtype = first_rows[:0] shape = (None, len(first_rows.dtypes.index)) else: dtype = self._dataframe[:0] shape = self._dataframe.shape dtype = {k: str(v) for k, v in dtype.dtypes.to_dict().items()} return base.Schema(datashape=None, dtype=dtype, shape=shape, npartitions=1, extra_metadata={}) def _get_partition(self, _): if self._dataframe is None: part = PostgresAdapter( self._uri, query=self._sql_expr, **self._pg_kwargs ) _arr = part._to_array() self._dataframe = pd.DataFrame() for colname in part.field_names: col = _arr[colname] ncols = col.shape[1] if len(col.shape) > 1 else 1 if ncols > 1: for colct in range(ncols): self._dataframe[colname+str(colct)] = col[:, colct] else: self._dataframe[colname] = col # The schema should be corrected once the data is read. self._schema = None return self._dataframe def _close(self): self._dataframe = None