Skip to content

Commit 0303b3f

Browse files
authored
Merge pull request #112 from InfluxCommunity/fix/103-batch-write-example
fix: 103 batch write example @jstirnaman Concerning the failing test, I see it is failing when run through the IDE. The relative path to the source csv file cannot be resolved, but it does not fail when running `pytest` from the project root, which is how it gets run in the build.
2 parents 43b8811 + 232b4d3 commit 0303b3f

File tree

10 files changed

+670
-312
lines changed

10 files changed

+670
-312
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Features
88

99
1. [#108](https://github.com/InfluxCommunity/influxdb3-python/pull/108): Better expose access to response headers in `InfluxDBError`. Example `handle_http_error` added.
10+
2. [#112](https://github.com/InfluxCommunity/influxdb3-python/pull/112): Update batching examples, add integration tests of batching.
1011

1112
### Bug Fixes
1213

Examples/batching_example.py

Lines changed: 122 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,114 +1,141 @@
11
import datetime
22
import random
3+
import time
34

45
from bson import ObjectId
56

67
import influxdb_client_3 as InfluxDBClient3
78
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
89

10+
from config import Config
11+
912

1013
class BatchingCallback(object):
1114

15+
def __init__(self):
16+
self.write_status_msg = None
17+
self.write_count = 0
18+
self.retry_count = 0
19+
self.start = time.time_ns()
20+
1221
def success(self, conf, data: str):
1322
print(f"Written batch: {conf}, data: {data}")
23+
self.write_count += 1
24+
self.write_status_msg = f"SUCCESS: {self.write_count} writes"
1425

1526
def error(self, conf, data: str, exception: InfluxDBError):
1627
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
28+
self.write_status_msg = f"FAILURE - cause: {exception}"
1729

1830
def retry(self, conf, data: str, exception: InfluxDBError):
1931
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
20-
21-
22-
# Creating 5.000 gatewayId values as MongoDB ObjectIDs
23-
gatewayIds = [ObjectId() for x in range(0, 100)]
24-
25-
# Setting decimal precision to 2
26-
precision = 2
27-
28-
# Setting timestamp for first sensor reading
29-
now = datetime.datetime.now()
30-
now = now - datetime.timedelta(days=30)
31-
teststart = datetime.datetime.now()
32-
33-
# InfluxDB connection details
34-
token = ""
35-
org = ""
36-
database = ""
37-
url = "eu-central-1-1.aws.cloud2.influxdata.com"
38-
39-
callback = BatchingCallback()
40-
41-
write_options = WriteOptions(batch_size=5_000,
42-
flush_interval=10_000,
43-
jitter_interval=2_000,
44-
retry_interval=5_000,
45-
max_retries=5,
46-
max_retry_delay=30_000,
47-
exponential_base=2)
48-
49-
wco = write_client_options(success_callback=callback.success,
50-
error_callback=callback.error,
51-
retry_callback=callback.retry,
52-
write_options=write_options
53-
)
54-
# Opening InfluxDB client with a batch size of 5k points or flush interval
55-
# of 10k ms and gzip compression
56-
with InfluxDBClient3.InfluxDBClient3(token=token,
57-
host=url,
58-
org=org,
59-
database=database, enable_gzip=True, write_client_options=wco) as _client:
60-
# Creating iterator for one hour worth of data (6 sensor readings per
61-
# minute)
62-
for i in range(0, 525600):
63-
# Adding 10 seconds to timestamp of previous sensor reading
64-
now = now + datetime.timedelta(seconds=10)
65-
# Iterating over gateways
66-
for gatewayId in gatewayIds:
67-
# Creating random test data for 12 fields to be stored in
68-
# timeseries database
69-
bcW = random.randrange(1501)
70-
bcWh = round(random.uniform(0, 4.17), precision)
71-
bdW = random.randrange(71)
72-
bdWh = round(random.uniform(0, 0.12), precision)
73-
cPvWh = round(random.uniform(0.51, 27.78), precision)
74-
cW = random.randrange(172, 10001)
75-
cWh = round(random.uniform(0.51, 27.78), precision)
76-
eWh = round(random.uniform(0, 41.67), precision)
77-
iWh = round(random.uniform(0, 16.67), precision)
78-
pW = random.randrange(209, 20001)
79-
pWh = round(random.uniform(0.58, 55.56), precision)
80-
scWh = round(random.uniform(0.58, 55.56), precision)
81-
# Creating point to be ingested into InfluxDB
82-
p = InfluxDBClient3.Point("stream").tag(
83-
"gatewayId",
84-
str(gatewayId)).field(
85-
"bcW",
86-
bcW).field(
87-
"bcWh",
88-
bcWh).field(
89-
"bdW",
90-
bdW).field(
91-
"bdWh",
92-
bdWh).field(
93-
"cPvWh",
94-
cPvWh).field(
95-
"cW",
96-
cW).field(
97-
"cWh",
98-
cWh).field(
99-
"eWh",
100-
eWh).field(
101-
"iWh",
102-
iWh).field(
103-
"pW",
104-
pW).field(
105-
"pWh",
106-
pWh).field(
107-
"scWh",
108-
scWh).time(
109-
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
110-
WritePrecision.S)
111-
112-
# Writing point (InfluxDB automatically batches writes into sets of
113-
# 5k points)
114-
_client.write(record=p)
32+
self.retry_count += 1
33+
34+
def elapsed(self) -> int:
35+
return time.time_ns() - self.start
36+
37+
38+
def main() -> None:
39+
conf = Config()
40+
41+
# Creating 5.000 gatewayId values as MongoDB ObjectIDs
42+
gatewayIds = [ObjectId() for x in range(0, 100)]
43+
44+
# Setting decimal precision to 2
45+
precision = 2
46+
47+
# Setting timestamp for first sensor reading
48+
sample_window_days = 7
49+
now = datetime.datetime.now()
50+
now = now - datetime.timedelta(days=sample_window_days)
51+
target_sample_count = sample_window_days * 24 * 60 * 6
52+
53+
callback = BatchingCallback()
54+
55+
write_options = WriteOptions(batch_size=5_000,
56+
flush_interval=10_000,
57+
jitter_interval=2_000,
58+
retry_interval=5_000,
59+
max_retries=5,
60+
max_retry_delay=30_000,
61+
max_close_wait=600_000,
62+
exponential_base=2)
63+
64+
wco = write_client_options(success_callback=callback.success,
65+
error_callback=callback.error,
66+
retry_callback=callback.retry,
67+
write_options=write_options)
68+
69+
# Opening InfluxDB client with a batch size of 5k points or flush interval
70+
# of 10k ms and gzip compression
71+
with InfluxDBClient3.InfluxDBClient3(token=conf.token,
72+
host=conf.host,
73+
org=conf.org,
74+
database=conf.database,
75+
enable_gzip=True,
76+
write_client_options=wco) as _client:
77+
# Creating iterator for one hour worth of data (6 sensor readings per
78+
# minute)
79+
print(f"Writing {target_sample_count} data points.")
80+
for i in range(0, target_sample_count):
81+
# Adding 10 seconds to timestamp of previous sensor reading
82+
now = now + datetime.timedelta(seconds=10)
83+
# Iterating over gateways
84+
for gatewayId in gatewayIds:
85+
# Creating random test data for 12 fields to be stored in
86+
# timeseries database
87+
bcW = random.randrange(1501)
88+
bcWh = round(random.uniform(0, 4.17), precision)
89+
bdW = random.randrange(71)
90+
bdWh = round(random.uniform(0, 0.12), precision)
91+
cPvWh = round(random.uniform(0.51, 27.78), precision)
92+
cW = random.randrange(172, 10001)
93+
cWh = round(random.uniform(0.51, 27.78), precision)
94+
eWh = round(random.uniform(0, 41.67), precision)
95+
iWh = round(random.uniform(0, 16.67), precision)
96+
pW = random.randrange(209, 20001)
97+
pWh = round(random.uniform(0.58, 55.56), precision)
98+
scWh = round(random.uniform(0.58, 55.56), precision)
99+
# Creating point to be ingested into InfluxDB
100+
p = InfluxDBClient3.Point("stream").tag(
101+
"gatewayId",
102+
str(gatewayId)).field(
103+
"bcW",
104+
bcW).field(
105+
"bcWh",
106+
bcWh).field(
107+
"bdW",
108+
bdW).field(
109+
"bdWh",
110+
bdWh).field(
111+
"cPvWh",
112+
cPvWh).field(
113+
"cW",
114+
cW).field(
115+
"cWh",
116+
cWh).field(
117+
"eWh",
118+
eWh).field(
119+
"iWh",
120+
iWh).field(
121+
"pW",
122+
pW).field(
123+
"pWh",
124+
pWh).field(
125+
"scWh",
126+
scWh).time(
127+
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
128+
WritePrecision.S)
129+
130+
# Writing point (InfluxDB automatically batches writes into sets of
131+
# 5k points)
132+
_client.write(record=p)
133+
134+
print(callback.write_status_msg)
135+
print(f"Write retries: {callback.retry_count}")
136+
print(f"Wrote {target_sample_count} data points.")
137+
print(f"Elapsed time ms: {int(callback.elapsed() / 1_000_000)}")
138+
139+
140+
if __name__ == "__main__":
141+
main()

Examples/file-import/csv_write.py

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
import logging
12
import influxdb_client_3 as InfluxDBClient3
23
from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError
34

45

56
class BatchingCallback(object):
67

8+
def __init__(self):
9+
self.write_count = 0
10+
711
def success(self, conf, data: str):
12+
self.write_count += 1
813
print(f"Written batch: {conf}, data: {data}")
914

1015
def error(self, conf, data: str, exception: InfluxDBError):
@@ -14,27 +19,48 @@ def retry(self, conf, data: str, exception: InfluxDBError):
1419
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
1520

1621

17-
callback = BatchingCallback()
18-
19-
write_options = WriteOptions(batch_size=500,
20-
flush_interval=10_000,
21-
jitter_interval=2_000,
22-
retry_interval=5_000,
23-
max_retries=5,
24-
max_retry_delay=30_000,
25-
exponential_base=2)
26-
27-
wco = write_client_options(success_callback=callback.success,
28-
error_callback=callback.error,
29-
retry_callback=callback.retry,
30-
write_options=write_options
31-
)
32-
33-
with InfluxDBClient3.InfluxDBClient3(
34-
token="INSERT_TOKEN",
35-
host="eu-central-1-1.aws.cloud2.influxdata.com",
36-
org="6a841c0c08328fb1",
37-
database="python", write_client_options=wco) as client:
38-
client.write_file(
39-
file='./out.csv',
40-
timestamp_column='time', tag_columns=["provider", "machineID"])
22+
def main() -> None:
23+
24+
# allow detailed inspection
25+
logging.basicConfig(level=logging.DEBUG)
26+
27+
callback = BatchingCallback()
28+
29+
write_options = WriteOptions(batch_size=100,
30+
flush_interval=10_000,
31+
jitter_interval=2_000,
32+
retry_interval=5_000,
33+
max_retries=5,
34+
max_retry_delay=30_000,
35+
exponential_base=2)
36+
37+
wco = write_client_options(success_callback=callback.success,
38+
error_callback=callback.error,
39+
retry_callback=callback.retry,
40+
write_options=write_options
41+
)
42+
43+
"""
44+
token: access token generated in cloud
45+
host: ATTN could be another AWS region or even another cloud provider
46+
org: organization associated with account and database
47+
database: should have retention policy 'forever' to handle older sample data timestamps
48+
write_client_options: see above
49+
debug: allows low-level inspection of communications and context-manager termination
50+
"""
51+
with InfluxDBClient3.InfluxDBClient3(
52+
token="INSERT_TOKEN",
53+
host="https://us-east-1-1.aws.cloud2.influxdata.com/",
54+
org="INSERT_ORG",
55+
database="example_data_forever",
56+
write_client_options=wco,
57+
debug=True) as client:
58+
client.write_file(
59+
file='./out.csv',
60+
timestamp_column='time', tag_columns=["provider", "machineID"])
61+
62+
print(f'DONE writing from csv in {callback.write_count} batch(es)')
63+
64+
65+
if __name__ == "__main__":
66+
main()

0 commit comments

Comments
 (0)