forked from jinef-john/google-maps-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
353 lines (315 loc) · 13 KB
/
Copy pathdb.py
File metadata and controls
353 lines (315 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import json
import logging
import os
import sqlite3
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
_INIT_SQL = """
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA temp_store = MEMORY;
PRAGMA cache_size = -64000;
CREATE TABLE IF NOT EXISTS places (
place_id TEXT PRIMARY KEY,
name TEXT,
address TEXT,
address_components TEXT,
plus_code TEXT,
lat REAL,
lng REAL,
rating REAL,
review_count INTEGER,
website TEXT,
phone TEXT,
email TEXT,
fax TEXT,
price_level TEXT,
description TEXT,
categories TEXT,
primary_type TEXT,
hours TEXT,
photos TEXT,
about TEXT,
menu TEXT,
booking_links TEXT,
social_links TEXT,
reviews_fetched INTEGER DEFAULT 0,
reviews_cursor TEXT DEFAULT '',
reviews_total_saved INTEGER DEFAULT 0,
scraped_at TEXT DEFAULT (datetime('now')),
-- Hotel / lodging
hotel_class TEXT,
-- Status
business_status TEXT
);
CREATE TABLE IF NOT EXISTS reviews (
review_id TEXT PRIMARY KEY,
place_id TEXT NOT NULL REFERENCES places(place_id) ON DELETE CASCADE,
reviewer_name TEXT,
reviewer_profile_url TEXT,
reviewer_avatar_url TEXT,
reviewer_user_id TEXT,
reviewer_review_count TEXT,
reviewer_is_local_guide INTEGER,
rating INTEGER,
text TEXT,
date TEXT,
language TEXT,
photos TEXT,
owner_reply TEXT,
owner_reply_date TEXT,
scraped_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
query TEXT,
status TEXT DEFAULT 'running',
places_total INTEGER DEFAULT 0,
places_done INTEGER DEFAULT 0,
reviews_total INTEGER DEFAULT 0,
reviews_done INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now')),
error TEXT
);
CREATE TABLE IF NOT EXISTS job_places (
job_id TEXT NOT NULL REFERENCES jobs(job_id) ON DELETE CASCADE,
place_id TEXT NOT NULL,
status TEXT DEFAULT 'pending',
reviews_cursor TEXT DEFAULT '',
reviews_count INTEGER DEFAULT 0,
UNIQUE(job_id, place_id)
);
CREATE INDEX IF NOT EXISTS idx_reviews_place ON reviews(place_id);
CREATE INDEX IF NOT EXISTS idx_places_reviews_fetched ON places(reviews_fetched);
CREATE INDEX IF NOT EXISTS idx_places_name ON places(name);
CREATE INDEX IF NOT EXISTS idx_job_places_job ON job_places(job_id);
CREATE INDEX IF NOT EXISTS idx_job_places_status ON job_places(status);
"""
def _to_json(obj):
return json.dumps(obj) if obj is not None else None
def _bool_int(val):
if val is None:
return None
return 1 if val else 0
class Database:
"""Thread-safe SQLite database with WAL mode and resume support."""
def __init__(self, path):
self.path = path
Path(path).parent.mkdir(parents=True, exist_ok=True)
self._local = threading.local()
self._init_db()
def _conn(self):
if not hasattr(self._local, "conn") or self._local.conn is None:
self._local.conn = sqlite3.connect(self.path, check_same_thread=False)
self._local.conn.row_factory = sqlite3.Row
self._local.conn.executescript(_INIT_SQL)
self._local.conn.commit()
return self._local.conn
def _init_db(self):
conn = sqlite3.connect(self.path)
conn.executescript(_INIT_SQL)
conn.commit()
conn.close()
def upsert_place(self, place):
hours = None
if place.opening_hours:
hours = json.dumps({
"periods": place.opening_hours.periods,
"weekday_text": place.opening_hours.weekday_text,
"open_now": place.opening_hours.open_now,
"next_opening": place.opening_hours.next_opening,
})
self._conn().execute(
"""INSERT INTO places
(place_id, name, address, address_components, plus_code, lat, lng, rating, review_count,
website, phone, email, fax, price_level, description, categories, primary_type, hours, photos,
about, menu, booking_links, social_links,
hotel_class, business_status)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(place_id) DO UPDATE SET
name=excluded.name,
address=excluded.address,
address_components=excluded.address_components,
plus_code=excluded.plus_code,
lat=excluded.lat,
lng=excluded.lng,
rating=excluded.rating,
review_count=excluded.review_count,
website=excluded.website,
phone=excluded.phone,
email=excluded.email,
fax=excluded.fax,
price_level=excluded.price_level,
description=excluded.description,
categories=excluded.categories,
primary_type=excluded.primary_type,
hours=excluded.hours,
photos=excluded.photos,
about=excluded.about,
menu=excluded.menu,
booking_links=excluded.booking_links,
social_links=excluded.social_links,
hotel_class=excluded.hotel_class,
business_status=excluded.business_status,
scraped_at=datetime('now')""",
(
place.place_id, place.name, place.address,
_to_json(place.address_components),
place.plus_code,
place.lat, place.lng, place.rating, place.review_count,
place.website, place.phone, place.email, place.fax,
place.price_level, place.description,
_to_json(place.categories), place.primary_type,
hours,
_to_json(place.photos),
_to_json(place.about),
_to_json(place.menu),
_to_json(place.booking_links),
_to_json(place.social_links),
place.hotel_class,
place.business_status,
),
)
self._conn().commit()
def insert_review(self, place_id, review):
if not review.review_id:
return
self._conn().execute(
"""INSERT OR IGNORE INTO reviews
(review_id, place_id, reviewer_name, reviewer_profile_url, reviewer_avatar_url,
reviewer_user_id, reviewer_review_count, reviewer_is_local_guide,
rating, text, date, language, photos, owner_reply, owner_reply_date)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
review.review_id, place_id,
review.reviewer.name, review.reviewer.profile_url, review.reviewer.avatar_url,
review.reviewer.user_id, review.reviewer.review_count,
1 if review.reviewer.is_local_guide else 0,
review.rating, review.text, review.date, review.language,
_to_json(review.photos),
review.owner_reply, review.owner_reply_date,
),
)
self._conn().commit()
def mark_reviews_fetched(self, place_id, cursor="", total_saved=0):
self._conn().execute(
"UPDATE places SET reviews_fetched = 1, reviews_cursor = ?, reviews_total_saved = ? WHERE place_id = ?",
(cursor, total_saved, place_id),
)
self._conn().commit()
def get_place_cursor(self, place_id):
row = self._conn().execute(
"SELECT reviews_cursor, reviews_total_saved FROM places WHERE place_id = ?", (place_id,)
).fetchone()
return {"cursor": row[0] if row else "", "total_saved": row[1] if row else 0}
def get_pending_places(self):
rows = self._conn().execute(
"SELECT place_id, name, reviews_cursor, reviews_total_saved FROM places WHERE reviews_fetched = 0"
).fetchall()
return [{"place_id": r[0], "name": r[1], "cursor": r[2] or "", "total_saved": r[3] or 0} for r in rows]
def get_stats(self):
c = self._conn().cursor()
places = c.execute("SELECT COUNT(*) FROM places").fetchone()[0]
reviews = c.execute("SELECT COUNT(*) FROM reviews").fetchone()[0]
pending = c.execute("SELECT COUNT(*) FROM places WHERE reviews_fetched = 0").fetchone()[0]
return {"places": places, "reviews": reviews, "pending_reviews": pending}
def get_place(self, place_id):
row = self._conn().execute("SELECT * FROM places WHERE place_id = ?", (place_id,)).fetchone()
return dict(row) if row else None
# --- Job tracking ---
def create_job(self, job_id, query):
self._conn().execute(
"INSERT OR IGNORE INTO jobs (job_id, query, status) VALUES (?, ?, 'running')",
(job_id, query),
)
self._conn().commit()
def add_job_places(self, job_id, place_ids):
conn = self._conn()
conn.executemany(
"INSERT OR IGNORE INTO job_places (job_id, place_id, status) VALUES (?, ?, 'pending')",
[(job_id, pid) for pid in place_ids],
)
conn.execute(
"UPDATE jobs SET places_total = (SELECT COUNT(*) FROM job_places WHERE job_id = ?), updated_at = datetime('now') WHERE job_id = ?",
(job_id, job_id),
)
conn.commit()
def get_pending_job_places(self, job_id):
rows = self._conn().execute(
"""SELECT jp.place_id, jp.reviews_cursor, p.name, p.reviews_total_saved
FROM job_places jp
LEFT JOIN places p ON jp.place_id = p.place_id
WHERE jp.job_id = ? AND jp.status = 'pending'
ORDER BY jp.rowid""",
(job_id,),
).fetchall()
return [{"place_id": r[0], "cursor": r[1] or "", "name": r[2] or "", "total_saved": r[3] or 0} for r in rows]
def reopen_job_places_for_reviews(self, job_id, max_reviews):
"""Reset 'done' places to 'pending' if they have fewer reviews than max_reviews."""
if max_reviews is None or max_reviews <= 0:
return
conn = self._conn()
c = conn.execute(
"""UPDATE job_places SET status = 'pending'
WHERE job_id = ? AND status = 'done' AND reviews_count < ?""",
(job_id, max_reviews),
)
conn.commit()
return c.rowcount
def get_job_place_cursor(self, job_id, place_id):
row = self._conn().execute(
"SELECT reviews_cursor FROM job_places WHERE job_id = ? AND place_id = ?",
(job_id, place_id),
).fetchone()
return row[0] if row else ""
def mark_job_place_done(self, job_id, place_id, reviews_count=0, cursor=""):
conn = self._conn()
conn.execute(
"""UPDATE job_places SET status = 'done', reviews_count = ?, reviews_cursor = ?
WHERE job_id = ? AND place_id = ?""",
(reviews_count, cursor, job_id, place_id),
)
conn.execute(
"""UPDATE jobs SET
places_done = (SELECT COUNT(*) FROM job_places WHERE job_id = ? AND status = 'done'),
reviews_done = (SELECT COALESCE(SUM(reviews_count), 0) FROM job_places WHERE job_id = ?),
updated_at = datetime('now')
WHERE job_id = ?""",
(job_id, job_id, job_id),
)
conn.commit()
def update_job_status(self, job_id, status, error=None):
self._conn().execute(
"UPDATE jobs SET status = ?, error = ?, updated_at = datetime('now') WHERE job_id = ?",
(status, error, job_id),
)
self._conn().commit()
def get_job(self, job_id):
row = self._conn().execute(
"SELECT * FROM jobs WHERE job_id = ?", (job_id,)
).fetchone()
return dict(row) if row else None
def list_jobs(self, limit=20):
rows = self._conn().execute(
"SELECT * FROM jobs ORDER BY updated_at DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def cleanup_old_jobs(self, days=7):
c = self._conn().execute(
"DELETE FROM jobs WHERE created_at < datetime('now', '-{} days')".format(days)
)
self._conn().commit()
return c.rowcount
def vacuum(self):
self._conn().execute("VACUUM")
self._conn().commit()
def close(self):
if hasattr(self._local, "conn") and self._local.conn:
self._local.conn.close()
self._local.conn = None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()