diff --git a/civis/io/_tables.py b/civis/io/_tables.py index 9ac2437f..9f8cbdca 100644 --- a/civis/io/_tables.py +++ b/civis/io/_tables.py @@ -36,7 +36,8 @@ CHUNK_SIZE = 32 * 1024 log = logging.getLogger(__name__) __all__ = ['read_civis', 'read_civis_sql', 'civis_to_csv', - 'civis_to_multifile_csv', 'dataframe_to_civis', 'csv_to_civis', + 'civis_to_multifile_csv', 'dataframe_to_civis', + 'dask_dataframe_to_civis', 'csv_to_civis', 'civis_file_to_table', 'split_schema_tablename', 'export_to_civis_file'] @@ -766,6 +767,182 @@ def dataframe_to_civis(df, database, table, api_key=None, client=None, return fut +@deprecate_param('v2.0.0', 'api_key', 'headers') +def dask_dataframe_to_civis(df, database, table, api_key=None, client=None, + max_errors=None, existing_table_rows="fail", + diststyle=None, distkey=None, + sortkey1=None, sortkey2=None, + table_columns=None, + headers=None, credential_id=None, + primary_keys=None, last_modified_keys=None, + execution="immediate", + delimiter=None, polling_interval=None, + archive=False, hidden=True, **kwargs): + """Upload a `dask` `DataFrame` into a Civis table. + + The `DataFrame`'s index will not be included. To store the index + along with the other values, use `df.reset_index()` instead + of `df` as the first argument to this function. + + Parameters + ---------- + df : :class:`dask.dataframe:dask.dataframe.DataFrame` + The `DataFrame` to upload to Civis. + database : str or int + Upload data into this database. Can be the database name or ID. + table : str + The schema and table you want to upload to. E.g., + ``'scratch.table'``. Schemas or tablenames with periods must + be double quoted, e.g. ``'scratch."my.table"'``. + api_key : DEPRECATED str, optional + Your Civis API key. If not given, the :envvar:`CIVIS_API_KEY` + environment variable will be used. + client : :class:`civis.APIClient`, optional + If not provided, an :class:`civis.APIClient` object will be + created from the :envvar:`CIVIS_API_KEY`. + max_errors : int, optional + The maximum number of rows with errors to remove from the import + before failing. + existing_table_rows : str, optional + The behaviour if a table with the requested name already exists. + One of ``'fail'``, ``'truncate'``, ``'append'``, ``'drop'``, or + ``'upsert'``. Defaults to ``'fail'``. + diststyle : str, optional + The distribution style for the table. + One of ``'even'``, ``'all'`` or ``'key'``. + distkey : str, optional + The column to use as the distkey for the table. + sortkey1 : str, optional + The column to use as the sortkey for the table. + sortkey2 : str, optional + The second column in a compound sortkey for the table. + table_columns : list[Dict[str, str]], optional + An array of hashes corresponding to the columns in the order + they appear in the source file. Each hash should have keys for + database column "name" and "sql_type". This parameter is + required if the table does not exist, the table is being dropped, + or the columns in the source file do not appear in the same order + as in the destination table. The "sql_type" key is not required + when appending to an existing table. + headers : bool, optional [DEPRECATED] + Whether or not the first row of the file should be treated as + headers. The default, ``None``, attempts to autodetect whether + or not the first row contains headers. + + This parameter has no effect in versions >= 1.11 and will be + removed in v2.0. Tables will always be written with column + names read from the DataFrame. Use the `header` parameter + (which will be passed directly to :func:`~pandas.DataFrame.to_csv`) + to modify the column names in the Civis Table. + credential_id : str or int, optional + The ID of the database credential. If ``None``, the default + credential will be used. + primary_keys: list[str], optional + A list of the primary key column(s) of the destination table that + uniquely identify a record. These columns must not contain null values. + If existing_table_rows is "upsert", this + field is required. Note that this is true regardless of whether the + destination database itself requires a primary key. + last_modified_keys: list[str], optional + A list of the columns indicating a record has been updated. If + existing_table_rows is "upsert", this field is required. + escaped: bool, optional + A boolean value indicating whether or not the source file has quotes + escaped with a backslash. Defaults to false. + execution: string, optional, default "immediate" + One of "delayed" or "immediate". If "immediate", refresh column + statistics as part of the run. If "delayed", flag the table for a + deferred statistics update; column statistics may not be available + for up to 24 hours. In addition, if existing_table_rows is "upsert", + delayed executions move data from staging table to final table after a + brief delay, in order to accommodate multiple concurrent imports to the + same destination table. + polling_interval : int or float, optional + Number of seconds to wait between checks for job completion. + archive : bool, optional (deprecated) + If ``True``, archive the import job as soon as it completes. + hidden : bool, optional + If ``True`` (the default), this job will not appear in the Civis UI. + **kwargs : kwargs + Extra keyword arguments will be passed to + :meth:`dask.dataframe:dask.dataframe.DataFrame.to_csv`. + + Returns + ------- + futs : :class:`~civis.futures.CivisFuture` + A list of `CivisFuture` objects. + + Examples + -------- + >>> import dask.dataframe as dd + >>> import pandas as pd + >>> df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) + >>> ddf = dd.from_pandas(df, npartitions=20) + >>> futs = civis.io.dask_dataframe_to_civis(ddf, 'my-database', + ... 'scratch.df_table') + >>> [fut.result() for fut in futs] + + See Also + -------- + :func:`~dask.dataframe.DataFrame.to_csv` + """ + if client is None: + client = APIClient(api_key=api_key) + if archive: + warnings.warn("`archive` is deprecated and will be removed in v2.0.0. " + "Use `hidden` instead.", FutureWarning) + + headers = False if kwargs.get('header') is False else True + with TemporaryDirectory() as tmp_dir: + tmp_path = os.path.join(tmp_dir, 'dataframe_to_civis*.csv') + to_csv_kwargs = {'encoding': 'utf-8', 'index': False} + to_csv_kwargs.update(kwargs) + df.to_csv(tmp_path, **to_csv_kwargs) + _, name = split_schema_tablename(table) + file_paths = os.listdir(tmp_dir) + file_ids = [ + file_to_civis( + os.path.join(tmp_dir, file_path), + name, + client=client) for file_path in file_paths] + + delimiter = ',' + futs = [] + if existing_table_rows in ['truncate', 'drop', 'fail']: + first_file_id = file_ids.pop(0) + futs.append(civis_file_to_table( + first_file_id, database, table, + client=client, max_errors=max_errors, + existing_table_rows=existing_table_rows, + diststyle=diststyle, distkey=distkey, + sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, + delimiter=delimiter, headers=headers, + credential_id=credential_id, + primary_keys=primary_keys, + last_modified_keys=last_modified_keys, + escaped=False, execution=execution, + polling_interval=polling_interval, + hidden=hidden)) + existing_table_rows = 'append' + + futs.extend([civis_file_to_table( + file_id, database, table, + client=client, max_errors=max_errors, + existing_table_rows=existing_table_rows, + diststyle=diststyle, distkey=distkey, + sortkey1=sortkey1, sortkey2=sortkey2, + table_columns=table_columns, + delimiter=delimiter, headers=headers, + credential_id=credential_id, + primary_keys=primary_keys, + last_modified_keys=last_modified_keys, + escaped=False, execution=execution, + polling_interval=polling_interval, + hidden=hidden) for file_id in file_ids]) + return futs + + @deprecate_param('v2.0.0', 'api_key') def csv_to_civis(filename, database, table, api_key=None, client=None, max_errors=None, existing_table_rows="fail",