diff options
author | CoprDistGit <infra@openeuler.org> | 2023-06-20 06:42:23 +0000 |
---|---|---|
committer | CoprDistGit <infra@openeuler.org> | 2023-06-20 06:42:23 +0000 |
commit | 21490d41ca8795118240f0e43c52cd2dbe8757ac (patch) | |
tree | 1c6d2c3261a4db4e73d38bb893336f5a5b5a2a5a | |
parent | eda0b4f8c00d84997f551da583cc4895f9b277ee (diff) |
automatic import of python-taospyopeneuler20.03
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | python-taospy.spec | 3389 | ||||
-rw-r--r-- | sources | 1 |
3 files changed, 3391 insertions, 0 deletions
@@ -0,0 +1 @@ +/taospy-2.7.8.tar.gz diff --git a/python-taospy.spec b/python-taospy.spec new file mode 100644 index 0000000..bc02bed --- /dev/null +++ b/python-taospy.spec @@ -0,0 +1,3389 @@ +%global _empty_manifest_terminate_build 0 +Name: python-taospy +Version: 2.7.8 +Release: 1 +Summary: TDengine connector for python +License: MIT +URL: https://pypi.org/project/taospy/ +Source0: https://mirrors.aliyun.com/pypi/web/packages/35/ff/cda1530c1d9dee5fc8e1a15c6388ca0a50e52810ba4a17d31395b69b4f5d/taospy-2.7.8.tar.gz +BuildArch: noarch + +Requires: python3-pytz +Requires: python3-iso8601 +Requires: python3-requests +Requires: python3-pytest-cov +Requires: python3-taos-ws-py + +%description +# TDengine Connector for Python + +| Github Workflow | PyPI Version | PyPI Downloads | CodeCov | +| --------------- | ------------ | -------------- | ------- | +|  |  |  | [](https://codecov.io/gh/taosdata/taos-connector-python) | + + + + +[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine, using +an API which is compliant with the Python DB API 2.0 (PEP-249). It contains two modules: + +1. The `taos` module. It uses TDengine C client library for client server communications. +2. The `taosrest` module. It wraps TDengine RESTful API to Python DB API 2.0 (PEP-249). With this module, you do not need to install the TDengine C client library. + +## Install taospy + +You can use `pip` to install the connector from PyPI: + +```bash +pip install taospy +``` + +Or with git url: + +```bash +pip install git+https://github.com/taosdata/taos-connector-python.git +``` + +Note: taospy v2.7.2 requirs Python 3.6+. The early versions of taospy from v2.5.0 to v2.7.1 require Python 3.7+. + +## Source Code + +[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted +on [GitHub](https://github.com/taosdata/taos-connector-python). + +## Install taos-ws-py + +### Install with taospy + +```bash +pip install taospy[ws] +``` + +### Install taos-ws-py only + +```bash +pip install taos-ws-py +``` + +Note: The taosws module is provided by taos-ws-py package separately from v2.7.2. It is part of early version of taospy. +taos-ws-py requires Python 3.7+. + +### Query with PEP-249 API using `taosws` + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method using `taosws` + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas using `taosws` + +#### Method one + +```python +import pandas +import taosws + +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", conn) +df +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", engine) +df +``` + +## Examples for `taosrest` Module + +### Query with PEP-249 API + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method two + +```python +import pandas +import taosrest + +conn = taosrest.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method three + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosrest://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", engine) +``` + +## Examples for `taos` Module + +### Connect options + +Supported config options: + +- **config**: TDengine client configuration directory, by default use "/etc/taos/". +- **host**: TDengine server host, by default use "localhost". +- **user**: TDengine user name, default is "root". +- **password**: TDengine user password, default is "taosdata". +- **database**: Default connection database name, empty if not set. +- **timezone**: Timezone for timestamp type (which is `datetime` object with tzinfo in python) no matter what the host's + timezone is. + +```python +import taos + +# 1. with empty options, connect TDengine by default options +# that means: +# - use /etc/taos/taos.cfg as default configuration file +# - use "localhost" if not set in config file +# - use "root" as default username +# - use "taosdata" as default password +# - use 6030 as default port if not set in config file +# - use local timezone datetime as timestamp +conn = taos.connect() +# 2. with full set options, default db: log, use UTC datetime. +conn = taos.connect(host='localhost', + user='root', + password='taosdata', + database='log', + config='/etc/taos', + timezone='UTC') +``` + +Note that, the datetime formatted string will contain timezone information when timezone set. For example: + +```python +# without timezone(local timezone depends on your machine) +'1969-12-31 16:00:00' +# with timezone UTC +'1969-12-31 16:00:00+00:00' +``` + +### Query with PEP-249 API + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases") +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +#### Execute many + +Method execute_many is supported: + +There are two ways to use execute_many. + +The first way is to pass a data_set as a list of dictionaries, where each dictionary will be expanded into a complete +statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") +cursor.execute(f"CREATE DATABASE {db_name}") +cursor.execute(f"USE {db_name}") + +cursor.execute("create stable stb (ts timestamp, v1 int) tags(t1 int)") + +create_table_data = [ + { + "name": "tb1", + "t1": 1, + }, + { + "name": "tb2", + "t1": 2, + }, + { + "name": "tb3", + "t1": 3, + } +] + +cursor.execute_many( + "create table {name} using stb tags({t1})", + create_table_data, +) + +``` + +The second way is to pass a data_set as a list of tuples, where each tuple represents a different row of the same +table, and each value within the tuple will become a data row in the SQL statement. All the data together will form a +complete SQL statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"USE {db_name}") + +data_set = [ + ('2018-10-03 14:38:05.100', 219), + ('2018-10-03 14:38:15.300', 218), + ('2018-10-03 14:38:16.800', 221), +] + +table_name = "tb1" + +cursor.execute_many(f"insert into {table_name} values", data_set) + +``` + +[Example: Insert many lines in one execute](./examples/cursor_execute_many.py) + +### Query with objective API + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest") + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest", req_id=1) + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +### Query with async API + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter)) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +You can pass an optional req_id in the parameters. + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter), req_id=1) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +### Statement API - Bind row after row + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_bind_params(16) +params[0].timestamp(1626861392589) +params[1].bool(True) +params[2].tinyint(None) +params[3].tinyint(2) +params[4].smallint(3) +params[5].int(4) +params[6].bigint(5) +params[7].tinyint_unsigned(6) +params[8].smallint_unsigned(7) +params[9].int_unsigned(8) +params[10].bigint_unsigned(9) +params[11].float(10.1) +params[12].double(10.11) +params[13].binary("hello") +params[14].nchar("stmt") +params[15].timestamp(1626861392589) +stmt.bind_param(params) + +params[0].timestamp(1626861392590) +params[15].timestamp(None) +stmt.bind_param(params) +stmt.execute() + +assert stmt.affected_rows == 2 + +result = conn.query("select * from log") + +for row in result: + print(row) +``` + +### Statement API - Bind multi rows + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) +stmt.bind_param_batch(params) +stmt.execute() + +assert stmt.affected_rows == 3 + +result = conn.query("select * from log") +for row in result: + print(row) +``` + +### Subscription + +```python +import taos +import random + +conn = taos.connect() +dbname = "pytest_taos_subscribe" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) +conn.execute("create table if not exists log(ts timestamp, n int)") +for i in range(10): + conn.execute("insert into log values(now, %d)" % i) + +sub = conn.subscribe(False, "test", "select * from log", 1000) +print("# consume from begin") +for ts, n in sub.consume(): + print(ts, n) + +print("# consume new data") +for i in range(5): + conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i)) + result = sub.consume() + for ts, n in result: + print(ts, n) + +sub.close(True) +print("# keep progress consume") +sub = conn.subscribe(False, "test", "select * from log", 1000) +result = sub.consume() +rows = result.fetch_all() +# consume from latest subscription needs root privilege(for /var/lib/taos). +assert result.row_count == 0 +print("## consumed ", len(rows), "rows") + +print("# consume with a stop condition") +for i in range(10): + conn.execute("insert into log values(now, %d)" % random.randint(0, 10)) + result = sub.consume() + try: + ts, n = next(result) + print(ts, n) + if n > 5: + result.stop_query() + print("## stopped") + break + except StopIteration: + continue + +sub.close() +# sub.close() + +conn.execute("drop database if exists %s" % dbname) +# conn.close() +``` + +### Subscription asynchronously with callback + +```python +from taos import * +from ctypes import * + +import time + + +def subscribe_callback(p_sub, p_result, p_param, errno): + # type: (c_void_p, c_void_p, c_void_p, c_int) -> None + print("# fetch in callback") + result = TaosResult(c_void_p(p_result)) + result.check_error(errno) + for row in result.rows_iter(): + ts, n = row() + print(ts, n) + + +def test_subscribe_callback(conn): + # type: (TaosConnection) -> None + dbname = "pytest_taos_subscribe_callback" + try: + print("drop if exists") + conn.execute("drop database if exists %s" % dbname) + print("create database") + conn.execute("create database if not exists %s" % dbname) + print("create table") + # conn.execute("use %s" % dbname) + conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname) + + print("# subscribe with callback") + sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback) + + for i in range(10): + conn.execute("insert into %s.log values(now, %d)" % (dbname, i)) + time.sleep(0.7) + + sub.close() + + conn.execute("drop database if exists %s" % dbname) + # conn.close() + except Exception as err: + conn.execute("drop database if exists %s" % dbname) + # conn.close() + raise err + + +if __name__ == "__main__": + test_subscribe_callback(connect()) +``` + +### Insert with line protocol + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +You can pass an optional req_id in the parameters. + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=1) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=2) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +Insert with schemaless raw data. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + res = conn.schemaless_insert_raw(lines, 1, 0) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except SchemalessError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +Pass optional ttl in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err +``` + +Pass optional req_id in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taos://root:taosdata@localhost:6030/log") +df: pandas.DataFrame = pandas.read_sql("select * from logs", engine) +``` + +## Limitation + +- `taosrest` is designed to use with taosAdapter. If your TDengine version is older than v2.4.0.0, taosAdapter may not + be available. + +## License + +We use MIT license for Python connector. + + +%package -n python3-taospy +Summary: TDengine connector for python +Provides: python-taospy +BuildRequires: python3-devel +BuildRequires: python3-setuptools +BuildRequires: python3-pip +%description -n python3-taospy +# TDengine Connector for Python + +| Github Workflow | PyPI Version | PyPI Downloads | CodeCov | +| --------------- | ------------ | -------------- | ------- | +|  |  |  | [](https://codecov.io/gh/taosdata/taos-connector-python) | + + + + +[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine, using +an API which is compliant with the Python DB API 2.0 (PEP-249). It contains two modules: + +1. The `taos` module. It uses TDengine C client library for client server communications. +2. The `taosrest` module. It wraps TDengine RESTful API to Python DB API 2.0 (PEP-249). With this module, you do not need to install the TDengine C client library. + +## Install taospy + +You can use `pip` to install the connector from PyPI: + +```bash +pip install taospy +``` + +Or with git url: + +```bash +pip install git+https://github.com/taosdata/taos-connector-python.git +``` + +Note: taospy v2.7.2 requirs Python 3.6+. The early versions of taospy from v2.5.0 to v2.7.1 require Python 3.7+. + +## Source Code + +[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted +on [GitHub](https://github.com/taosdata/taos-connector-python). + +## Install taos-ws-py + +### Install with taospy + +```bash +pip install taospy[ws] +``` + +### Install taos-ws-py only + +```bash +pip install taos-ws-py +``` + +Note: The taosws module is provided by taos-ws-py package separately from v2.7.2. It is part of early version of taospy. +taos-ws-py requires Python 3.7+. + +### Query with PEP-249 API using `taosws` + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method using `taosws` + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas using `taosws` + +#### Method one + +```python +import pandas +import taosws + +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", conn) +df +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", engine) +df +``` + +## Examples for `taosrest` Module + +### Query with PEP-249 API + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method two + +```python +import pandas +import taosrest + +conn = taosrest.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method three + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosrest://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", engine) +``` + +## Examples for `taos` Module + +### Connect options + +Supported config options: + +- **config**: TDengine client configuration directory, by default use "/etc/taos/". +- **host**: TDengine server host, by default use "localhost". +- **user**: TDengine user name, default is "root". +- **password**: TDengine user password, default is "taosdata". +- **database**: Default connection database name, empty if not set. +- **timezone**: Timezone for timestamp type (which is `datetime` object with tzinfo in python) no matter what the host's + timezone is. + +```python +import taos + +# 1. with empty options, connect TDengine by default options +# that means: +# - use /etc/taos/taos.cfg as default configuration file +# - use "localhost" if not set in config file +# - use "root" as default username +# - use "taosdata" as default password +# - use 6030 as default port if not set in config file +# - use local timezone datetime as timestamp +conn = taos.connect() +# 2. with full set options, default db: log, use UTC datetime. +conn = taos.connect(host='localhost', + user='root', + password='taosdata', + database='log', + config='/etc/taos', + timezone='UTC') +``` + +Note that, the datetime formatted string will contain timezone information when timezone set. For example: + +```python +# without timezone(local timezone depends on your machine) +'1969-12-31 16:00:00' +# with timezone UTC +'1969-12-31 16:00:00+00:00' +``` + +### Query with PEP-249 API + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases") +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +#### Execute many + +Method execute_many is supported: + +There are two ways to use execute_many. + +The first way is to pass a data_set as a list of dictionaries, where each dictionary will be expanded into a complete +statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") +cursor.execute(f"CREATE DATABASE {db_name}") +cursor.execute(f"USE {db_name}") + +cursor.execute("create stable stb (ts timestamp, v1 int) tags(t1 int)") + +create_table_data = [ + { + "name": "tb1", + "t1": 1, + }, + { + "name": "tb2", + "t1": 2, + }, + { + "name": "tb3", + "t1": 3, + } +] + +cursor.execute_many( + "create table {name} using stb tags({t1})", + create_table_data, +) + +``` + +The second way is to pass a data_set as a list of tuples, where each tuple represents a different row of the same +table, and each value within the tuple will become a data row in the SQL statement. All the data together will form a +complete SQL statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"USE {db_name}") + +data_set = [ + ('2018-10-03 14:38:05.100', 219), + ('2018-10-03 14:38:15.300', 218), + ('2018-10-03 14:38:16.800', 221), +] + +table_name = "tb1" + +cursor.execute_many(f"insert into {table_name} values", data_set) + +``` + +[Example: Insert many lines in one execute](./examples/cursor_execute_many.py) + +### Query with objective API + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest") + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest", req_id=1) + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +### Query with async API + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter)) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +You can pass an optional req_id in the parameters. + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter), req_id=1) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +### Statement API - Bind row after row + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_bind_params(16) +params[0].timestamp(1626861392589) +params[1].bool(True) +params[2].tinyint(None) +params[3].tinyint(2) +params[4].smallint(3) +params[5].int(4) +params[6].bigint(5) +params[7].tinyint_unsigned(6) +params[8].smallint_unsigned(7) +params[9].int_unsigned(8) +params[10].bigint_unsigned(9) +params[11].float(10.1) +params[12].double(10.11) +params[13].binary("hello") +params[14].nchar("stmt") +params[15].timestamp(1626861392589) +stmt.bind_param(params) + +params[0].timestamp(1626861392590) +params[15].timestamp(None) +stmt.bind_param(params) +stmt.execute() + +assert stmt.affected_rows == 2 + +result = conn.query("select * from log") + +for row in result: + print(row) +``` + +### Statement API - Bind multi rows + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) +stmt.bind_param_batch(params) +stmt.execute() + +assert stmt.affected_rows == 3 + +result = conn.query("select * from log") +for row in result: + print(row) +``` + +### Subscription + +```python +import taos +import random + +conn = taos.connect() +dbname = "pytest_taos_subscribe" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) +conn.execute("create table if not exists log(ts timestamp, n int)") +for i in range(10): + conn.execute("insert into log values(now, %d)" % i) + +sub = conn.subscribe(False, "test", "select * from log", 1000) +print("# consume from begin") +for ts, n in sub.consume(): + print(ts, n) + +print("# consume new data") +for i in range(5): + conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i)) + result = sub.consume() + for ts, n in result: + print(ts, n) + +sub.close(True) +print("# keep progress consume") +sub = conn.subscribe(False, "test", "select * from log", 1000) +result = sub.consume() +rows = result.fetch_all() +# consume from latest subscription needs root privilege(for /var/lib/taos). +assert result.row_count == 0 +print("## consumed ", len(rows), "rows") + +print("# consume with a stop condition") +for i in range(10): + conn.execute("insert into log values(now, %d)" % random.randint(0, 10)) + result = sub.consume() + try: + ts, n = next(result) + print(ts, n) + if n > 5: + result.stop_query() + print("## stopped") + break + except StopIteration: + continue + +sub.close() +# sub.close() + +conn.execute("drop database if exists %s" % dbname) +# conn.close() +``` + +### Subscription asynchronously with callback + +```python +from taos import * +from ctypes import * + +import time + + +def subscribe_callback(p_sub, p_result, p_param, errno): + # type: (c_void_p, c_void_p, c_void_p, c_int) -> None + print("# fetch in callback") + result = TaosResult(c_void_p(p_result)) + result.check_error(errno) + for row in result.rows_iter(): + ts, n = row() + print(ts, n) + + +def test_subscribe_callback(conn): + # type: (TaosConnection) -> None + dbname = "pytest_taos_subscribe_callback" + try: + print("drop if exists") + conn.execute("drop database if exists %s" % dbname) + print("create database") + conn.execute("create database if not exists %s" % dbname) + print("create table") + # conn.execute("use %s" % dbname) + conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname) + + print("# subscribe with callback") + sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback) + + for i in range(10): + conn.execute("insert into %s.log values(now, %d)" % (dbname, i)) + time.sleep(0.7) + + sub.close() + + conn.execute("drop database if exists %s" % dbname) + # conn.close() + except Exception as err: + conn.execute("drop database if exists %s" % dbname) + # conn.close() + raise err + + +if __name__ == "__main__": + test_subscribe_callback(connect()) +``` + +### Insert with line protocol + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +You can pass an optional req_id in the parameters. + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=1) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=2) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +Insert with schemaless raw data. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + res = conn.schemaless_insert_raw(lines, 1, 0) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except SchemalessError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +Pass optional ttl in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err +``` + +Pass optional req_id in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taos://root:taosdata@localhost:6030/log") +df: pandas.DataFrame = pandas.read_sql("select * from logs", engine) +``` + +## Limitation + +- `taosrest` is designed to use with taosAdapter. If your TDengine version is older than v2.4.0.0, taosAdapter may not + be available. + +## License + +We use MIT license for Python connector. + + +%package help +Summary: Development documents and examples for taospy +Provides: python3-taospy-doc +%description help +# TDengine Connector for Python + +| Github Workflow | PyPI Version | PyPI Downloads | CodeCov | +| --------------- | ------------ | -------------- | ------- | +|  |  |  | [](https://codecov.io/gh/taosdata/taos-connector-python) | + + + + +[TDengine](https://github.com/taosdata/TDengine) connector for Python enables python programs to access TDengine, using +an API which is compliant with the Python DB API 2.0 (PEP-249). It contains two modules: + +1. The `taos` module. It uses TDengine C client library for client server communications. +2. The `taosrest` module. It wraps TDengine RESTful API to Python DB API 2.0 (PEP-249). With this module, you do not need to install the TDengine C client library. + +## Install taospy + +You can use `pip` to install the connector from PyPI: + +```bash +pip install taospy +``` + +Or with git url: + +```bash +pip install git+https://github.com/taosdata/taos-connector-python.git +``` + +Note: taospy v2.7.2 requirs Python 3.6+. The early versions of taospy from v2.5.0 to v2.7.1 require Python 3.7+. + +## Source Code + +[TDengine](https://github.com/taosdata/TDengine) connector for Python source code is hosted +on [GitHub](https://github.com/taosdata/taos-connector-python). + +## Install taos-ws-py + +### Install with taospy + +```bash +pip install taospy[ws] +``` + +### Install taos-ws-py only + +```bash +pip install taos-ws-py +``` + +Note: The taosws module is provided by taos-ws-py package separately from v2.7.2. It is part of early version of taospy. +taos-ws-py requires Python 3.7+. + +### Query with PEP-249 API using `taosws` + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosws + +# all parameters are optional +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method using `taosws` + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosws import * + +conn = connect("taosws://root:taosdata@localhost:6041") +result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas using `taosws` + +#### Method one + +```python +import pandas +import taosws + +conn = taosws.connect("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", conn) +df +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosws://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("show databases", engine) +df +``` + +## Examples for `taosrest` Module + +### Query with PEP-249 API + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases") +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +import taosrest + +# all parameters are optional +conn = taosrest.connect(url="http://localhost:6041", + user="root", + password="taosdata") +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results: list[tuple] = cursor.fetchall() +for row in results: + print(row) +``` + +### Query with query method + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases") + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +You can pass an optional req_id in the parameters. + +```python +from taosrest import connect, TaosRestConnection, Result + +conn: TaosRestConnection = connect() +result: Result = conn.query("show databases", req_id=1) + +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method two + +```python +import pandas +import taosrest + +conn = taosrest.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method three + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taosrest://root:taosdata@localhost:6041") +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", engine) +``` + +## Examples for `taos` Module + +### Connect options + +Supported config options: + +- **config**: TDengine client configuration directory, by default use "/etc/taos/". +- **host**: TDengine server host, by default use "localhost". +- **user**: TDengine user name, default is "root". +- **password**: TDengine user password, default is "taosdata". +- **database**: Default connection database name, empty if not set. +- **timezone**: Timezone for timestamp type (which is `datetime` object with tzinfo in python) no matter what the host's + timezone is. + +```python +import taos + +# 1. with empty options, connect TDengine by default options +# that means: +# - use /etc/taos/taos.cfg as default configuration file +# - use "localhost" if not set in config file +# - use "root" as default username +# - use "taosdata" as default password +# - use 6030 as default port if not set in config file +# - use local timezone datetime as timestamp +conn = taos.connect() +# 2. with full set options, default db: log, use UTC datetime. +conn = taos.connect(host='localhost', + user='root', + password='taosdata', + database='log', + config='/etc/taos', + timezone='UTC') +``` + +Note that, the datetime formatted string will contain timezone information when timezone set. For example: + +```python +# without timezone(local timezone depends on your machine) +'1969-12-31 16:00:00' +# with timezone UTC +'1969-12-31 16:00:00+00:00' +``` + +### Query with PEP-249 API + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases") +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +cursor.execute("show databases", req_id=1) +results = cursor.fetchall() +for row in results: + print(row) + +cursor.close() +conn.close() +``` + +#### Execute many + +Method execute_many is supported: + +There are two ways to use execute_many. + +The first way is to pass a data_set as a list of dictionaries, where each dictionary will be expanded into a complete +statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"DROP DATABASE IF EXISTS {db_name}") +cursor.execute(f"CREATE DATABASE {db_name}") +cursor.execute(f"USE {db_name}") + +cursor.execute("create stable stb (ts timestamp, v1 int) tags(t1 int)") + +create_table_data = [ + { + "name": "tb1", + "t1": 1, + }, + { + "name": "tb2", + "t1": 2, + }, + { + "name": "tb3", + "t1": 3, + } +] + +cursor.execute_many( + "create table {name} using stb tags({t1})", + create_table_data, +) + +``` + +The second way is to pass a data_set as a list of tuples, where each tuple represents a different row of the same +table, and each value within the tuple will become a data row in the SQL statement. All the data together will form a +complete SQL statement. + +```python +import taos + +conn = taos.connect() +cursor = conn.cursor() + +db_name = "test_db" + +cursor.execute(f"USE {db_name}") + +data_set = [ + ('2018-10-03 14:38:05.100', 219), + ('2018-10-03 14:38:15.300', 218), + ('2018-10-03 14:38:16.800', 221), +] + +table_name = "tb1" + +cursor.execute_many(f"insert into {table_name} values", data_set) + +``` + +[Example: Insert many lines in one execute](./examples/cursor_execute_many.py) + +### Query with objective API + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest") + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +You can pass an optional req_id in the parameters. + +```python +import taos + +conn = taos.connect() +conn.execute("create database if not exists pytest", req_id=1) + +result = conn.query("show databases") +num_of_fields = result.field_count +for field in result.fields: + print(field) + +for row in result: + print(row) + +result.close() +conn.execute("drop database pytest") +conn.close() +``` + +### Query with async API + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter)) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +You can pass an optional req_id in the parameters. + +```python +from taos import * +from ctypes import * +import time + + +def fetch_callback(p_param, p_result, num_of_rows): + print("fetched ", num_of_rows, "rows") + p = cast(p_param, POINTER(Counter)) + result = TaosResult(p_result) + + if num_of_rows == 0: + print("fetching completed") + p.contents.done = True + result.close() + return + + if num_of_rows < 0: + p.contents.done = True + result.check_error(num_of_rows) + result.close() + return None + + for row in result.rows_iter(num_of_rows): + # print(row) + pass + + p.contents.count += result.row_count + result.fetch_rows_a(fetch_callback, p_param) + + +def query_callback(p_param, p_result, code): + # type: (c_void_p, c_void_p, c_int) -> None + if p_result is None: + return + + result = TaosResult(p_result) + if code == 0: + result.fetch_rows_a(fetch_callback, p_param) + + result.check_error(code) + + +class Counter(Structure): + _fields_ = [("count", c_int), ("done", c_bool)] + + def __str__(self): + return "{ count: %d, done: %s }" % (self.count, self.done) + + +def test_query(conn): + # type: (TaosConnection) -> None + counter = Counter(count=0) + conn.query_a("select * from log.log", query_callback, byref(counter), req_id=1) + + while not counter.done: + print("wait query callback") + time.sleep(1) + + print(counter) + conn.close() + + +if __name__ == "__main__": + test_query(connect()) +``` + +### Statement API - Bind row after row + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_bind_params(16) +params[0].timestamp(1626861392589) +params[1].bool(True) +params[2].tinyint(None) +params[3].tinyint(2) +params[4].smallint(3) +params[5].int(4) +params[6].bigint(5) +params[7].tinyint_unsigned(6) +params[8].smallint_unsigned(7) +params[9].int_unsigned(8) +params[10].bigint_unsigned(9) +params[11].float(10.1) +params[12].double(10.11) +params[13].binary("hello") +params[14].nchar("stmt") +params[15].timestamp(1626861392589) +stmt.bind_param(params) + +params[0].timestamp(1626861392590) +params[15].timestamp(None) +stmt.bind_param(params) +stmt.execute() + +assert stmt.affected_rows == 2 + +result = conn.query("select * from log") + +for row in result: + print(row) +``` + +### Statement API - Bind multi rows + +```python +from taos import * + +conn = connect() + +dbname = "pytest_taos_stmt" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) + +conn.execute( + "create table if not exists log(ts timestamp, bo bool, nil tinyint, \ + ti tinyint, si smallint, ii int, bi bigint, tu tinyint unsigned, \ + su smallint unsigned, iu int unsigned, bu bigint unsigned, \ + ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", +) + +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") + +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) +stmt.bind_param_batch(params) +stmt.execute() + +assert stmt.affected_rows == 3 + +result = conn.query("select * from log") +for row in result: + print(row) +``` + +### Subscription + +```python +import taos +import random + +conn = taos.connect() +dbname = "pytest_taos_subscribe" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s" % dbname) +conn.select_db(dbname) +conn.execute("create table if not exists log(ts timestamp, n int)") +for i in range(10): + conn.execute("insert into log values(now, %d)" % i) + +sub = conn.subscribe(False, "test", "select * from log", 1000) +print("# consume from begin") +for ts, n in sub.consume(): + print(ts, n) + +print("# consume new data") +for i in range(5): + conn.execute("insert into log values(now, %d)(now+1s, %d)" % (i, i)) + result = sub.consume() + for ts, n in result: + print(ts, n) + +sub.close(True) +print("# keep progress consume") +sub = conn.subscribe(False, "test", "select * from log", 1000) +result = sub.consume() +rows = result.fetch_all() +# consume from latest subscription needs root privilege(for /var/lib/taos). +assert result.row_count == 0 +print("## consumed ", len(rows), "rows") + +print("# consume with a stop condition") +for i in range(10): + conn.execute("insert into log values(now, %d)" % random.randint(0, 10)) + result = sub.consume() + try: + ts, n = next(result) + print(ts, n) + if n > 5: + result.stop_query() + print("## stopped") + break + except StopIteration: + continue + +sub.close() +# sub.close() + +conn.execute("drop database if exists %s" % dbname) +# conn.close() +``` + +### Subscription asynchronously with callback + +```python +from taos import * +from ctypes import * + +import time + + +def subscribe_callback(p_sub, p_result, p_param, errno): + # type: (c_void_p, c_void_p, c_void_p, c_int) -> None + print("# fetch in callback") + result = TaosResult(c_void_p(p_result)) + result.check_error(errno) + for row in result.rows_iter(): + ts, n = row() + print(ts, n) + + +def test_subscribe_callback(conn): + # type: (TaosConnection) -> None + dbname = "pytest_taos_subscribe_callback" + try: + print("drop if exists") + conn.execute("drop database if exists %s" % dbname) + print("create database") + conn.execute("create database if not exists %s" % dbname) + print("create table") + # conn.execute("use %s" % dbname) + conn.execute("create table if not exists %s.log(ts timestamp, n int)" % dbname) + + print("# subscribe with callback") + sub = conn.subscribe(False, "test", "select * from %s.log" % dbname, 1000, subscribe_callback) + + for i in range(10): + conn.execute("insert into %s.log values(now, %d)" % (dbname, i)) + time.sleep(0.7) + + sub.close() + + conn.execute("drop database if exists %s" % dbname) + # conn.close() + except Exception as err: + conn.execute("drop database if exists %s" % dbname) + # conn.close() + raise err + + +if __name__ == "__main__": + test_subscribe_callback(connect()) +``` + +### Insert with line protocol + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +You can pass an optional req_id in the parameters. + +```python +import taos +from taos import SmlProtocol, SmlPrecision + +conn = taos.connect() +dbname = "pytest_line" +conn.execute("drop database if exists %s" % dbname) +conn.execute("create database if not exists %s precision 'us'" % dbname) +conn.select_db(dbname) + +lines = [ + 'st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"pass",c2=false,c4=4f64 1626006833639000000', +] +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=1) +print("inserted") + +conn.schemaless_insert(lines, taos.SmlProtocol.LINE_PROTOCOL, taos.SmlPrecision.NOT_CONFIGURED, req_id=2) + +result = conn.query("show tables") +for row in result: + print(row) + +conn.execute("drop database if exists %s" % dbname) +``` + +Insert with schemaless raw data. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + res = conn.schemaless_insert_raw(lines, 1, 0) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + res = conn.schemaless_insert_raw(lines, 1, 0) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except SchemalessError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +Pass optional ttl in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err +``` + +Pass optional req_id in the parameters. + +```python +import taos +from taos import utils +from taos import TaosConnection +from taos.cinterface import * +from taos.error import OperationalError, SchemalessError + +conn = taos.connect() +dbname = "taos_schemaless_insert" +try: + conn.execute("drop database if exists %s" % dbname) + + if taos.IS_V3: + conn.execute("create database if not exists %s schemaless 1 precision 'ns'" % dbname) + else: + conn.execute("create database if not exists %s update 2 precision 'ns'" % dbname) + + conn.select_db(dbname) + + lines = '''st,t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000 + st,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin, abc",c2=true,c4=5f64,c5=5f64,c6=7u64 1626006933640000000 + stf,t1=4i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 3) + + lines = '''stf,t1=5i64,t3="t4",t2=5f64,t4=5f64 c1=3i64,c3=L"passitagin_stf",c2=false,c5=5f64,c6=7u64 1626006933641000000''' + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print("affected rows: ", res) + assert (res == 1) + + result = conn.query("select * from st") + dict2 = result.fetch_all_into_dict() + print(dict2) + print(result.row_count) + + all = result.rows_iter() + for row in all: + print(row) + result.close() + assert (result.row_count == 2) + + # error test + lines = ''',t1=3i64,t2=4f64,t3="t3" c1=3i64,c3=L"passit",c2=false,c4=4f64 1626006833639000000''' + try: + ttl = 1000 + req_id = utils.gen_req_id() + res = conn.schemaless_insert_raw(lines, 1, 0, ttl=ttl, req_id=req_id) + print(res) + # assert(False) + except SchemalessError as err: + print('**** error: ', err) + # assert (err.msg == 'Invalid data format') + + result = conn.query("select * from st") + print(result.row_count) + all = result.rows_iter() + for row in all: + print(row) + result.close() + + conn.execute("drop database if exists %s" % dbname) + conn.close() +except InterfaceError as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) +except Exception as err: + conn.execute("drop database if exists %s" % dbname) + conn.close() + print(err) + raise err + +``` + +### Read with Pandas + +#### Method one + +```python +import pandas +import taos + +conn = taos.connect() +df: pandas.DataFrame = pandas.read_sql("select * from log.logs", conn) +``` + +#### Method Two + +```python +import pandas +from sqlalchemy import create_engine + +engine = create_engine("taos://root:taosdata@localhost:6030/log") +df: pandas.DataFrame = pandas.read_sql("select * from logs", engine) +``` + +## Limitation + +- `taosrest` is designed to use with taosAdapter. If your TDengine version is older than v2.4.0.0, taosAdapter may not + be available. + +## License + +We use MIT license for Python connector. + + +%prep +%autosetup -n taospy-2.7.8 + +%build +%py3_build + +%install +%py3_install +install -d -m755 %{buildroot}/%{_pkgdocdir} +if [ -d doc ]; then cp -arf doc %{buildroot}/%{_pkgdocdir}; fi +if [ -d docs ]; then cp -arf docs %{buildroot}/%{_pkgdocdir}; fi +if [ -d example ]; then cp -arf example %{buildroot}/%{_pkgdocdir}; fi +if [ -d examples ]; then cp -arf examples %{buildroot}/%{_pkgdocdir}; fi +pushd %{buildroot} +if [ -d usr/lib ]; then + find usr/lib -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/lib64 ]; then + find usr/lib64 -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/bin ]; then + find usr/bin -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +if [ -d usr/sbin ]; then + find usr/sbin -type f -printf "\"/%h/%f\"\n" >> filelist.lst +fi +touch doclist.lst +if [ -d usr/share/man ]; then + find usr/share/man -type f -printf "\"/%h/%f.gz\"\n" >> doclist.lst +fi +popd +mv %{buildroot}/filelist.lst . +mv %{buildroot}/doclist.lst . + +%files -n python3-taospy -f filelist.lst +%dir %{python3_sitelib}/* + +%files help -f doclist.lst +%{_docdir}/* + +%changelog +* Tue Jun 20 2023 Python_Bot <Python_Bot@openeuler.org> - 2.7.8-1 +- Package Spec generated @@ -0,0 +1 @@ +edbc4dbfb2d61d1016a13747faad481c taospy-2.7.8.tar.gz |