@@ -156,7 +156,7 @@ async def iterate(
156156 yield Record (row , result_columns , self ._dialect , column_maps )
157157
158158 def transaction (self ) -> "TransactionBackend" :
159- raise NotImplementedError () # pragma: no cover
159+ return PsycopgTransaction ( connection = self )
160160
161161 @property
162162 def raw_connection (self ) -> typing .Any :
@@ -166,13 +166,36 @@ def raw_connection(self) -> typing.Any:
166166
167167
168168class PsycopgTransaction (TransactionBackend ):
169+ _connecttion : PsycopgConnection
170+ _transaction : typing .Optional [psycopg .AsyncTransaction ]
171+
172+ def __init__ (self , connection : PsycopgConnection ):
173+ self ._connection = connection
174+ self ._transaction : typing .Optional [psycopg .AsyncTransaction ] = None
175+
169176 async def start (
170177 self , is_root : bool , extra_options : typing .Dict [typing .Any , typing .Any ]
171178 ) -> None :
172- raise NotImplementedError () # pragma: no cover
179+ if self ._connection ._connection is None :
180+ raise RuntimeError ("Connection is not acquired" )
181+
182+ transaction = psycopg .AsyncTransaction (
183+ self ._connection ._connection , ** extra_options
184+ )
185+ async with transaction ._conn .lock :
186+ await transaction ._conn .wait (transaction ._enter_gen ())
187+ self ._transaction = transaction
173188
174189 async def commit (self ) -> None :
175- raise NotImplementedError () # pragma: no cover
190+ if self ._transaction is None :
191+ raise RuntimeError ("Transaction was not started" )
192+
193+ async with self ._transaction ._conn .lock :
194+ await self ._transaction ._conn .wait (self ._transaction ._commit_gen ())
176195
177196 async def rollback (self ) -> None :
178- raise NotImplementedError () # pragma: no cover
197+ if self ._transaction is None :
198+ raise RuntimeError ("Transaction was not started" )
199+
200+ async with self ._transaction ._conn .lock :
201+ await self ._transaction ._conn .wait (self ._transaction ._rollback_gen (None ))
0 commit comments